1mod config;
9
10mod handle;
13
14use std::{
15 collections::{HashMap, HashSet},
16 iter,
17 num::{NonZeroU32, NonZeroUsize},
18 sync::Arc,
19 time::{Duration, Instant},
20};
21
22use bimap::BiMap;
23use futures::{SinkExt, StreamExt, channel::mpsc};
24use hotshot_types::{
25 constants::KAD_DEFAULT_REPUB_INTERVAL_SEC, traits::node_implementation::NodeType,
26};
27use libp2p::{
28 Multiaddr, StreamProtocol, Swarm, SwarmBuilder, autonat,
29 core::{transport::ListenerId, upgrade::Version::V1Lazy},
30 gossipsub::{
31 Behaviour as Gossipsub, ConfigBuilder as GossipsubConfigBuilder, Event as GossipEvent,
32 Message as GossipsubMessage, MessageAuthenticity, MessageId, Topic, ValidationMode,
33 },
34 identify::{
35 Behaviour as IdentifyBehaviour, Config as IdentifyConfig, Event as IdentifyEvent,
36 Info as IdentifyInfo,
37 },
38 identity::Keypair,
39 kad::{Behaviour, Config, Mode, Record, store::MemoryStore},
40 request_response::{
41 Behaviour as RequestResponse, Config as Libp2pRequestResponseConfig, ProtocolSupport,
42 },
43 swarm::SwarmEvent,
44};
45use libp2p_identity::PeerId;
46use parking_lot::Mutex;
47use rand::{prelude::SliceRandom, thread_rng};
48use tokio::{
49 select, spawn,
50 sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
51};
52use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
53
54pub use self::{
55 config::{
56 DEFAULT_REPLICATION_FACTOR, GossipConfig, NetworkNodeConfig, NetworkNodeConfigBuilder,
57 NetworkNodeConfigBuilderError, RequestResponseConfig,
58 },
59 handle::{NetworkNodeHandle, NetworkNodeReceiver, spawn_network_node},
60};
61use super::{
62 BoxedTransport, ClientRequest, NetworkDef, NetworkError, NetworkEvent, NetworkEventInternal,
63 behaviours::dht::{
64 bootstrap::{DHTBootstrapTask, InputEvent},
65 store::{
66 persistent::{DhtPersistentStorage, PersistentStore},
67 validated::ValidatedStore,
68 },
69 },
70 cbor::Cbor,
71 gen_transport,
72};
73use crate::network::{
74 behaviours::{
75 dht::{DHTBehaviour, DHTProgress, KadPutQuery},
76 direct_message::{DMBehaviour, DMRequest},
77 exponential_backoff::ExponentialBackoff,
78 },
79 log_summary::LogEvent,
80};
81
82pub const MAX_GOSSIP_MSG_SIZE: usize = 2_000_000_000;
84
85pub const ESTABLISHED_LIMIT: NonZeroU32 = NonZeroU32::new(ESTABLISHED_LIMIT_UNWR).unwrap();
87pub const ESTABLISHED_LIMIT_UNWR: u32 = 10;
89
90const AUTONAT_CONFIDENCE_MAX: usize = 3;
94
95#[derive(derive_more::Debug)]
97pub struct NetworkNode<T: NodeType, D: DhtPersistentStorage> {
98 peer_id: PeerId,
100 #[debug(skip)]
102 swarm: Swarm<NetworkDef<T::SignatureKey, D>>,
103 kademlia_record_ttl: Duration,
105 consensus_key_to_pid_map: Arc<Mutex<BiMap<T::SignatureKey, PeerId>>>,
107 listener_id: Option<ListenerId>,
109 direct_message_state: DMBehaviour,
111 dht_handler: DHTBehaviour<T::SignatureKey, D>,
113 resend_tx: Option<UnboundedSender<ClientRequest>>,
115 autonat_private_logged: bool,
118}
119
120impl<T: NodeType, D: DhtPersistentStorage> NetworkNode<T, D> {
121 pub fn num_connected(&self) -> usize {
123 self.swarm.connected_peers().count()
124 }
125
126 pub fn connected_pids(&self) -> HashSet<PeerId> {
128 self.swarm.connected_peers().copied().collect()
129 }
130
131 #[instrument(skip(self))]
135 pub async fn start_listen(
136 &mut self,
137 listen_addr: Multiaddr,
138 ) -> Result<Multiaddr, NetworkError> {
139 self.listener_id = Some(self.swarm.listen_on(listen_addr).map_err(|err| {
140 NetworkError::ListenError(format!("failed to listen for Libp2p: {err}"))
141 })?);
142 let addr = loop {
143 if let Some(SwarmEvent::NewListenAddr { address, .. }) = self.swarm.next().await {
144 break address;
145 }
146 };
147 info!("Libp2p listening on {addr:?}");
148 Ok(addr)
149 }
150
151 #[instrument(skip(self))]
156 pub fn add_known_peers(&mut self, known_peers: &[(PeerId, Multiaddr)]) {
157 debug!("Adding {} known peers", known_peers.len());
158 let behaviour = self.swarm.behaviour_mut();
159 let mut bs_nodes = HashMap::<PeerId, HashSet<Multiaddr>>::new();
160 let mut shuffled = known_peers.iter().collect::<Vec<_>>();
161 shuffled.shuffle(&mut thread_rng());
162 for (peer_id, addr) in shuffled {
163 if *peer_id != self.peer_id {
164 behaviour.dht.add_address(peer_id, addr.clone());
165 behaviour.autonat.add_server(*peer_id, Some(addr.clone()));
166 bs_nodes.insert(*peer_id, iter::once(addr.clone()).collect());
167 }
168 }
169 }
170
171 #[allow(clippy::too_many_lines)]
185 pub async fn new(
186 config: NetworkNodeConfig,
187 dht_persistent_storage: D,
188 consensus_key_to_pid_map: Arc<Mutex<BiMap<T::SignatureKey, PeerId>>>,
189 ) -> Result<Self, NetworkError> {
190 let keypair = config
192 .keypair
193 .clone()
194 .unwrap_or_else(Keypair::generate_ed25519);
195
196 let peer_id = PeerId::from(keypair.public());
198
199 let transport: BoxedTransport = gen_transport::<T>(
201 keypair.clone(),
202 config.auth_message.clone(),
203 Arc::clone(&consensus_key_to_pid_map),
204 )
205 .await?;
206
207 let kademlia_record_republication_interval = config
209 .republication_interval
210 .unwrap_or(Duration::from_secs(KAD_DEFAULT_REPUB_INTERVAL_SEC));
211
212 let kademlia_ttl = config
214 .ttl
215 .unwrap_or(16 * kademlia_record_republication_interval);
216
217 let mut swarm: Swarm<NetworkDef<T::SignatureKey, D>> = {
219 let message_id_fn = |message: &GossipsubMessage| {
221 let hash = blake3::hash(&message.data);
222 MessageId::from(hash.as_bytes().to_vec())
223 };
224
225 let gossipsub_config = GossipsubConfigBuilder::default()
227 .message_id_fn(message_id_fn) .validation_mode(ValidationMode::Strict) .heartbeat_interval(config.gossip_config.heartbeat_interval) .history_gossip(config.gossip_config.history_gossip) .history_length(config.gossip_config.history_length) .mesh_n(config.gossip_config.mesh_n) .mesh_n_high(config.gossip_config.mesh_n_high) .mesh_n_low(config.gossip_config.mesh_n_low) .mesh_outbound_min(config.gossip_config.mesh_outbound_min) .max_transmit_size(config.gossip_config.max_transmit_size) .max_ihave_length(config.gossip_config.max_ihave_length) .max_ihave_messages(config.gossip_config.max_ihave_messages) .published_message_ids_cache_time(
240 config.gossip_config.published_message_ids_cache_time,
241 ) .iwant_followup_time(config.gossip_config.iwant_followup_time) .max_messages_per_rpc(config.gossip_config.max_messages_per_rpc) .gossip_retransimission(config.gossip_config.gossip_retransmission) .flood_publish(config.gossip_config.flood_publish) .duplicate_cache_time(config.gossip_config.duplicate_cache_time) .fanout_ttl(config.gossip_config.fanout_ttl) .heartbeat_initial_delay(config.gossip_config.heartbeat_initial_delay) .gossip_factor(config.gossip_config.gossip_factor) .gossip_lazy(config.gossip_config.gossip_lazy) .build()
252 .map_err(|err| {
253 NetworkError::ConfigError(format!("error building gossipsub config: {err:?}"))
254 })?;
255
256 let gossipsub: Gossipsub = Gossipsub::new(
258 MessageAuthenticity::Signed(keypair.clone()),
259 gossipsub_config,
260 )
261 .map_err(|err| {
262 NetworkError::ConfigError(format!("error building gossipsub behaviour: {err:?}"))
263 })?;
264
265 let identify_cfg =
270 IdentifyConfig::new("HotShot/identify/1.0".to_string(), keypair.public());
271 let identify = IdentifyBehaviour::new(identify_cfg);
272
273 let mut kconfig = Config::new(StreamProtocol::new("/ipfs/kad/1.0.0"));
275 kconfig
276 .set_parallelism(NonZeroUsize::new(5).unwrap())
277 .set_provider_publication_interval(Some(kademlia_record_republication_interval))
278 .set_publication_interval(Some(kademlia_record_republication_interval))
279 .set_record_ttl(Some(kademlia_ttl));
280
281 #[allow(clippy::panic)]
283 if let Some(factor) = config.replication_factor {
284 kconfig.set_replication_factor(factor);
285 } else {
286 panic!("Replication factor not set");
287 }
288
289 let mut kadem = Behaviour::with_config(
291 peer_id,
292 PersistentStore::new(
293 ValidatedStore::new(MemoryStore::new(peer_id)),
294 dht_persistent_storage,
295 5,
296 )
297 .await,
298 kconfig,
299 );
300 kadem.set_mode(Some(Mode::Server));
301
302 let rrconfig = Libp2pRequestResponseConfig::default();
303
304 let cbor = Cbor::new(
306 config.request_response_config.request_size_maximum,
307 config.request_response_config.response_size_maximum,
308 );
309
310 let direct_message: super::cbor::Behaviour<Vec<u8>, Vec<u8>> =
311 RequestResponse::with_codec(
312 cbor,
313 [(
314 StreamProtocol::new("/HotShot/direct_message/1.0"),
315 ProtocolSupport::Full,
316 )],
317 rrconfig.clone(),
318 );
319
320 let autonat_config = autonat::Config {
321 only_global_ips: false,
322 ..Default::default()
323 };
324
325 let network = NetworkDef::new(
326 gossipsub,
327 kadem,
328 identify,
329 direct_message,
330 autonat::Behaviour::new(peer_id, autonat_config),
331 );
332
333 let swarm = SwarmBuilder::with_existing_identity(keypair.clone());
335 let swarm = swarm.with_tokio();
336
337 swarm
338 .with_other_transport(|_| transport)
339 .unwrap()
340 .with_behaviour(|_| network)
341 .unwrap()
342 .with_swarm_config(|cfg| {
343 cfg.with_idle_connection_timeout(Duration::from_secs(10))
344 .with_substream_upgrade_protocol_override(V1Lazy)
345 })
346 .build()
347 };
348 for (peer, addr) in &config.to_connect_addrs {
349 if peer != swarm.local_peer_id() {
350 swarm.behaviour_mut().add_address(peer, addr.clone());
351 swarm.add_peer_address(*peer, addr.clone());
352 }
353 }
354
355 for addr in &config.announce_addresses {
356 info!("Adding announce address {addr}");
357 swarm.add_external_address(addr.clone());
358 }
359
360 Ok(Self {
361 peer_id,
362 swarm,
363 kademlia_record_ttl: kademlia_ttl,
364 consensus_key_to_pid_map,
365 listener_id: None,
366 direct_message_state: DMBehaviour::default(),
367 dht_handler: DHTBehaviour::new(
368 peer_id,
369 config
370 .replication_factor
371 .unwrap_or(NonZeroUsize::new(4).unwrap()),
372 ),
373 resend_tx: None,
374 autonat_private_logged: false,
375 })
376 }
377
378 pub fn put_record(&mut self, mut query: KadPutQuery) {
383 let mut record = Record::new(query.key.clone(), query.value.clone());
385
386 record.expires = Some(Instant::now() + self.kademlia_record_ttl);
388
389 match self.swarm.behaviour_mut().dht.put_record(
390 record,
391 libp2p::kad::Quorum::N(
392 NonZeroUsize::try_from(self.dht_handler.replication_factor().get() / 2)
393 .expect("replication factor should be bigger than 0"),
394 ),
395 ) {
396 Err(e) => {
397 query.progress = DHTProgress::NotStarted;
399 query.backoff.start_next(false);
400 error!("Error publishing to DHT: {e:?} for peer {:?}", self.peer_id);
401 },
402 Ok(qid) => {
403 debug!("Published record to DHT with qid {qid:?}");
404 let query = KadPutQuery {
405 progress: DHTProgress::InProgress(qid),
406 ..query
407 };
408 self.dht_handler.put_record(qid, query);
409 },
410 }
411 }
412
413 #[instrument(skip(self))]
422 async fn handle_client_requests(
423 &mut self,
424 msg: Option<ClientRequest>,
425 ) -> Result<bool, NetworkError> {
426 let behaviour = self.swarm.behaviour_mut();
427 match msg {
428 Some(msg) => {
429 match msg {
430 ClientRequest::BeginBootstrap => {
431 debug!("Beginning Libp2p bootstrap");
432 let _ = self.swarm.behaviour_mut().dht.bootstrap();
433 },
434 ClientRequest::LookupPeer(pid, chan) => {
435 let id = self.swarm.behaviour_mut().dht.get_closest_peers(pid);
436 self.dht_handler
437 .in_progress_get_closest_peers
438 .insert(id, chan);
439 },
440 ClientRequest::GetRoutingTable(chan) => {
441 self.dht_handler
442 .print_routing_table(&mut self.swarm.behaviour_mut().dht);
443 if chan.send(()).is_err() {
444 warn!("Tried to notify client but client not tracking anymore");
445 }
446 },
447 ClientRequest::PutDHT { key, value, notify } => {
448 let query = KadPutQuery {
449 progress: DHTProgress::NotStarted,
450 notify,
451 key,
452 value,
453 backoff: ExponentialBackoff::default(),
454 };
455 self.put_record(query);
456 },
457 ClientRequest::GetConnectedPeerNum(s) => {
458 if s.send(self.num_connected()).is_err() {
459 error!("error sending peer number to client");
460 }
461 },
462 ClientRequest::GetConnectedPeers(s) => {
463 if s.send(self.connected_pids()).is_err() {
464 error!("error sending peer set to client");
465 }
466 },
467 ClientRequest::GetDHT {
468 key,
469 notify,
470 retry_count,
471 } => {
472 self.dht_handler.get_record(
473 key,
474 notify,
475 ExponentialBackoff::default(),
476 retry_count,
477 &mut self.swarm.behaviour_mut().dht,
478 );
479 },
480 ClientRequest::IgnorePeers(_peers) => {
481 },
483 ClientRequest::Shutdown => {
484 if let Some(listener_id) = self.listener_id {
485 self.swarm.remove_listener(listener_id);
486 }
487
488 return Ok(true);
489 },
490 ClientRequest::GossipMsg(topic, contents) => {
491 behaviour.publish_gossip(Topic::new(topic.clone()), contents.clone());
492 },
493 ClientRequest::Subscribe(t, chan) => {
494 behaviour.subscribe_gossip(&t);
495 if let Some(chan) = chan
496 && chan.send(()).is_err()
497 {
498 error!("finished subscribing but response channel dropped");
499 }
500 },
501 ClientRequest::Unsubscribe(t, chan) => {
502 behaviour.unsubscribe_gossip(&t);
503 if let Some(chan) = chan
504 && chan.send(()).is_err()
505 {
506 error!("finished unsubscribing but response channel dropped");
507 }
508 },
509 ClientRequest::DirectRequest {
510 pid,
511 contents,
512 retry_count,
513 } => {
514 debug!("Sending direct request to {pid:?}");
515 let id = behaviour.add_direct_request(pid, contents.clone());
516 let req = DMRequest {
517 peer_id: pid,
518 data: contents,
519 backoff: ExponentialBackoff::default(),
520 retry_count,
521 };
522 self.direct_message_state.add_direct_request(req, id);
523 },
524 ClientRequest::DirectResponse(chan, msg) => {
525 behaviour.add_direct_response(chan, msg);
526 },
527 ClientRequest::AddKnownPeers(peers) => {
528 self.add_known_peers(&peers);
529 },
530 ClientRequest::Prune(pid) => {
531 if self.swarm.disconnect_peer_id(pid).is_err() {
532 warn!("Could not disconnect from {pid:?}");
533 }
534 },
535 }
536 },
537 None => {
538 error!("Error receiving msg in main behaviour loop: channel closed");
539 },
540 }
541 Ok(false)
542 }
543
544 #[allow(clippy::type_complexity)]
546 #[instrument(skip(self))]
547 async fn handle_swarm_events(
548 &mut self,
549 event: SwarmEvent<NetworkEventInternal>,
550 send_to_client: &UnboundedSender<NetworkEvent>,
551 ) -> Result<(), NetworkError> {
552 debug!("Swarm event observed {:?}", event);
554
555 #[allow(deprecated)]
556 match event {
557 SwarmEvent::ConnectionEstablished {
558 connection_id: _,
559 peer_id,
560 endpoint,
561 num_established,
562 concurrent_dial_errors,
563 established_in: _established_in,
564 } => {
565 if num_established > ESTABLISHED_LIMIT {
566 error!(
567 "Num concurrent connections to a single peer exceeding \
568 {ESTABLISHED_LIMIT:?} at {num_established:?}!"
569 );
570 } else {
571 debug!(
572 "Connection established with {peer_id:?} at {endpoint:?} with \
573 {concurrent_dial_errors:?} concurrent dial errors"
574 );
575 }
576
577 send_to_client
579 .send(NetworkEvent::ConnectedPeersUpdate(self.num_connected()))
580 .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
581 },
582 SwarmEvent::ConnectionClosed {
583 connection_id: _,
584 peer_id,
585 endpoint,
586 num_established,
587 cause,
588 } => {
589 if num_established > ESTABLISHED_LIMIT_UNWR {
590 error!(
591 "Num concurrent connections to a single peer exceeding \
592 {ESTABLISHED_LIMIT:?} at {num_established:?}!"
593 );
594 } else {
595 debug!("Connection closed with {peer_id:?} at {endpoint:?} due to {cause:?}");
596 }
597
598 if num_established == 0 {
600 self.consensus_key_to_pid_map
601 .lock()
602 .remove_by_right(&peer_id);
603 }
604
605 send_to_client
607 .send(NetworkEvent::ConnectedPeersUpdate(self.num_connected()))
608 .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
609 },
610 SwarmEvent::Dialing {
611 peer_id,
612 connection_id: _,
613 } => {
614 debug!("Attempting to dial {peer_id:?}");
615 },
616 SwarmEvent::ListenerClosed {
617 listener_id: _,
618 addresses: _,
619 reason: _,
620 }
621 | SwarmEvent::NewListenAddr {
622 listener_id: _,
623 address: _,
624 }
625 | SwarmEvent::ExpiredListenAddr {
626 listener_id: _,
627 address: _,
628 }
629 | SwarmEvent::NewExternalAddrCandidate { .. }
630 | SwarmEvent::ExternalAddrExpired { .. }
631 | SwarmEvent::IncomingConnection {
632 connection_id: _,
633 local_addr: _,
634 send_back_addr: _,
635 } => {},
636 SwarmEvent::Behaviour(b) => {
637 let maybe_event = match b {
638 NetworkEventInternal::DHTEvent(e) => self
639 .dht_handler
640 .dht_handle_event(e, self.swarm.behaviour_mut().dht.store_mut()),
641 NetworkEventInternal::IdentifyEvent(e) => {
642 if let IdentifyEvent::Received {
644 peer_id,
645 info:
646 IdentifyInfo {
647 listen_addrs,
648 protocols: _,
649 public_key: _,
650 protocol_version: _,
651 agent_version: _,
652 observed_addr: _,
653 signed_peer_record: _,
654 },
655 connection_id: _,
656 } = *e
657 {
658 let behaviour = self.swarm.behaviour_mut();
659
660 for addr in listen_addrs.iter().collect::<HashSet<_>>() {
662 behaviour.dht.add_address(&peer_id, addr.clone());
663 }
664 }
665 None
666 },
667 NetworkEventInternal::GossipEvent(e) => match *e {
668 GossipEvent::Message {
669 propagation_source: _peer_id,
670 message_id: _id,
671 message,
672 } => Some(NetworkEvent::GossipMsg(message.data)),
673 GossipEvent::Subscribed { peer_id, topic } => {
674 debug!("Peer {peer_id:?} subscribed to topic {topic:?}");
675 None
676 },
677 GossipEvent::Unsubscribed { peer_id, topic } => {
678 debug!("Peer {peer_id:?} unsubscribed from topic {topic:?}");
679 None
680 },
681 GossipEvent::GossipsubNotSupported { peer_id } => {
682 LogEvent::GossipsubNotSupported.record();
683 debug!("Peer {peer_id:?} does not support gossipsub");
684 None
685 },
686 GossipEvent::SlowPeer {
687 peer_id,
688 failed_messages: _,
689 } => {
690 LogEvent::GossipsubSlowPeer.record();
691 debug!("Peer {peer_id:?} is slow");
692 None
693 },
694 },
695 NetworkEventInternal::DMEvent(e) => self
696 .direct_message_state
697 .handle_dm_event(e, self.resend_tx.clone()),
698 NetworkEventInternal::AutonatEvent(e) => {
699 match e {
700 autonat::Event::InboundProbe(_) => {},
701 autonat::Event::OutboundProbe(e) => match e {
702 autonat::OutboundProbeEvent::Request { .. }
703 | autonat::OutboundProbeEvent::Response { .. } => {},
704 autonat::OutboundProbeEvent::Error {
705 probe_id: _,
706 peer,
707 error,
708 } => {
709 debug!("AutoNAT outbound probe to {peer:?} failed: {error:?}");
710 },
711 },
712 autonat::Event::StatusChanged { old, new } => match &new {
713 autonat::NatStatus::Public(addr) => {
714 info!(
715 "AutoNAT: this node is publicly reachable at {addr} (was \
716 {old:?})"
717 );
718 self.autonat_private_logged = false;
719 },
720 autonat::NatStatus::Private => {
721 warn!(
722 "AutoNAT: probe reports this node may not be publicly \
723 reachable (was {old:?}). Treating as transient until \
724 confirmed by repeated probes."
725 );
726 },
727 autonat::NatStatus::Unknown => {
728 debug!("AutoNAT status: {old:?} -> Unknown");
729 self.autonat_private_logged = false;
730 },
731 },
732 };
733 let autonat = &self.swarm.behaviour().autonat;
734 if matches!(autonat.nat_status(), autonat::NatStatus::Private)
735 && autonat.confidence() >= AUTONAT_CONFIDENCE_MAX
736 && !self.autonat_private_logged
737 {
738 error!(
739 "AutoNAT: this node is NOT publicly reachable (confirmed by \
740 repeated probes). Peers cannot direct-message us, so leader \
741 views may fail and we may accumulate missed slots. Verify \
742 --libp2p-advertise-address (env \
743 ESPRESSO_NODE_LIBP2P_ADVERTISE_ADDRESS) is set a publicly \
744 reachable host:port, and ensure inbound UDP at that port is open \
745 from the public internet (firewall/NAT/security group)."
746 );
747 self.autonat_private_logged = true;
748 }
749 None
750 },
751 };
752
753 if let Some(event) = maybe_event {
754 send_to_client
756 .send(event)
757 .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
758 }
759 },
760 SwarmEvent::OutgoingConnectionError {
761 connection_id: _,
762 peer_id,
763 error,
764 } => {
765 LogEvent::DialFailure.record();
766 debug!("Outgoing connection error to {peer_id:?}: {error:?}");
767 },
768 SwarmEvent::IncomingConnectionError {
769 connection_id: _,
770 local_addr: _,
771 send_back_addr: _,
772 error,
773 peer_id: _,
774 } => {
775 LogEvent::IncomingConnError.record();
776 debug!("Incoming connection error: {error:?}");
777 },
778 SwarmEvent::ListenerError {
779 listener_id: _,
780 error,
781 } => {
782 LogEvent::ListenerError.record();
783 debug!("Listener error: {error:?}");
784 },
785 SwarmEvent::ExternalAddrConfirmed { address } => {
786 let my_id = *self.swarm.local_peer_id();
787 self.swarm
788 .behaviour_mut()
789 .dht
790 .add_address(&my_id, address.clone());
791 },
792 SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => {
793 self.swarm
794 .behaviour_mut()
795 .dht
796 .add_address(&peer_id, address.clone());
797 },
798 _ => {
799 debug!("Unhandled swarm event {event:?}");
800 },
801 }
802 Ok(())
803 }
804
805 pub fn spawn_listeners(
811 mut self,
812 ) -> Result<
813 (
814 UnboundedSender<ClientRequest>,
815 UnboundedReceiver<NetworkEvent>,
816 ),
817 NetworkError,
818 > {
819 let (s_input, mut s_output) = unbounded_channel::<ClientRequest>();
820 let (r_input, r_output) = unbounded_channel::<NetworkEvent>();
821 let (mut bootstrap_tx, bootstrap_rx) = mpsc::channel(100);
822 self.resend_tx = Some(s_input.clone());
823 self.dht_handler.set_bootstrap_sender(bootstrap_tx.clone());
824
825 DHTBootstrapTask::run(bootstrap_rx, s_input.clone());
826 spawn(
827 async move {
828 loop {
829 select! {
830 event = self.swarm.next() => {
831 debug!("peerid {:?}\t\thandling maybe event {:?}", self.peer_id, event);
832 if let Some(event) = event {
833 debug!("peerid {:?}\t\thandling event {:?}", self.peer_id, event);
834 self.handle_swarm_events(event, &r_input).await?;
835 }
836 },
837 msg = s_output.recv() => {
838 debug!("peerid {:?}\t\thandling msg {:?}", self.peer_id, msg);
839 let shutdown = self.handle_client_requests(msg).await?;
840 if shutdown {
841 let _ = bootstrap_tx.send(InputEvent::ShutdownBootstrap).await;
842 break
843 }
844 }
845 }
846 }
847 Ok::<(), NetworkError>(())
848 }
849 .instrument(info_span!("Libp2p NetworkBehaviour Handler")),
850 );
851 Ok((s_input, r_output))
852 }
853
854 pub fn peer_id(&self) -> PeerId {
856 self.peer_id
857 }
858}