1use std::{
11 cmp::min,
12 collections::{BTreeSet, HashSet},
13 fmt::Debug,
14 net::{IpAddr, ToSocketAddrs},
15 num::NonZeroUsize,
16 sync::{
17 Arc,
18 atomic::{AtomicBool, AtomicU64, Ordering},
19 },
20 time::Duration,
21};
22#[cfg(feature = "hotshot-testing")]
23use std::{collections::HashMap, str::FromStr};
24
25use anyhow::{Context, anyhow};
26use async_lock::RwLock;
27use async_trait::async_trait;
28use bimap::BiMap;
29use futures::future::join_all;
30#[cfg(feature = "hotshot-testing")]
31use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtNoPersistence;
32pub use hotshot_libp2p_networking::network::{GossipConfig, RequestResponseConfig};
33use hotshot_libp2p_networking::{
34 network::{
35 DEFAULT_REPLICATION_FACTOR,
36 NetworkEvent::{self, DirectRequest, DirectResponse, GossipMsg},
37 NetworkNodeConfig, NetworkNodeConfigBuilder, NetworkNodeHandle, NetworkNodeReceiver,
38 behaviours::dht::{
39 record::{Namespace, RecordKey, RecordValue},
40 store::persistent::DhtPersistentStorage,
41 },
42 log_summary::LogEvent,
43 spawn_network_node,
44 transport::construct_auth_message,
45 },
46 reexport::Multiaddr,
47};
48use hotshot_types::{
49 BoxSyncFuture, boxed_sync,
50 constants::LOOK_AHEAD,
51 data::{EpochNumber, ViewNumber},
52 network::NetworkConfig,
53 traits::{
54 metrics::{Counter, Gauge, Metrics, NoMetrics},
55 network::{ConnectedNetwork, NetworkError, Topic},
56 node_implementation::NodeType,
57 signature_key::{PrivateSignatureKey, SignatureKey},
58 },
59};
60#[cfg(feature = "hotshot-testing")]
61use hotshot_types::{
62 PeerConnectInfo,
63 traits::network::{AsyncGenerator, NetworkReliability, TestableNetworkingImplementation},
64};
65use libp2p_identity::{
66 Keypair, PeerId,
67 ed25519::{self, SecretKey},
68};
69use serde::Serialize;
70use tokio::{
71 select, spawn,
72 sync::{
73 Mutex,
74 mpsc::{Receiver, Sender, channel, error::TrySendError},
75 },
76 time::sleep,
77};
78use tracing::{debug, error, info, instrument, trace, warn};
79
80use crate::{BroadcastDelay, EpochMembershipCoordinator};
81
82#[derive(Clone, Debug)]
84pub struct Libp2pMetricsValue {
85 pub num_connected_peers: Box<dyn Gauge>,
87 pub num_failed_messages: Box<dyn Counter>,
89 pub is_ready: Box<dyn Gauge>,
91}
92
93impl Libp2pMetricsValue {
94 pub fn new(metrics: &dyn Metrics) -> Self {
96 let subgroup = metrics.subgroup("libp2p".into());
98
99 Self {
101 num_connected_peers: subgroup.create_gauge("num_connected_peers".into(), None),
102 num_failed_messages: subgroup.create_counter("num_failed_messages".into(), None),
103 is_ready: subgroup.create_gauge("is_ready".into(), None),
104 }
105 }
106}
107
108impl Default for Libp2pMetricsValue {
109 fn default() -> Self {
111 Self::new(&*NoMetrics::boxed())
112 }
113}
114
115pub type BootstrapAddrs = Arc<RwLock<Vec<(PeerId, Multiaddr)>>>;
118
119pub const QC_TOPIC: &str = "global";
121
122#[derive(Serialize)]
131pub struct Empty {
132 byte: u8,
135}
136
137impl<T: NodeType> Debug for Libp2pNetwork<T> {
138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 f.debug_struct("Libp2p").field("inner", &"inner").finish()
140 }
141}
142
143pub type PeerInfoVec = Arc<RwLock<Vec<(PeerId, Multiaddr)>>>;
145
146#[derive(Debug)]
148struct Libp2pNetworkInner<T: NodeType> {
149 pk: T::SignatureKey,
151 handle: Arc<NetworkNodeHandle<T>>,
153 receiver: Mutex<Receiver<Vec<u8>>>,
155 sender: Sender<Vec<u8>>,
157 node_lookup_send: Sender<Option<(ViewNumber, T::SignatureKey)>>,
159 bootstrap_addrs: PeerInfoVec,
162 is_ready: Arc<AtomicBool>,
164 dht_timeout: Duration,
166 is_bootstrapped: Arc<AtomicBool>,
168 metrics: Libp2pMetricsValue,
170 subscribed_topics: HashSet<String>,
172 latest_seen_view: Arc<AtomicU64>,
176 #[cfg(feature = "hotshot-testing")]
177 reliability_config: Option<Box<dyn NetworkReliability>>,
179 kill_switch: Sender<()>,
181}
182
183#[derive(Clone)]
186pub struct Libp2pNetwork<T: NodeType> {
187 inner: Arc<Libp2pNetworkInner<T>>,
189}
190
191#[cfg(feature = "hotshot-testing")]
192impl<T: NodeType> TestableNetworkingImplementation<T> for Libp2pNetwork<T> {
193 #[allow(clippy::panic, clippy::too_many_lines)]
203 fn generator(
204 expected_node_count: usize,
205 num_bootstrap: usize,
206 _network_id: usize,
207 da_committee_size: usize,
208 reliability_config: Option<Box<dyn NetworkReliability>>,
209 _secondary_network_delay: Duration,
210 _connect_infos: &mut HashMap<T::SignatureKey, PeerConnectInfo>,
211 ) -> AsyncGenerator<Arc<Self>> {
212 assert!(
213 da_committee_size <= expected_node_count,
214 "DA committee size must be less than or equal to total # nodes"
215 );
216 let bootstrap_addrs: PeerInfoVec = Arc::default();
217 let node_ids: Arc<RwLock<HashSet<u64>>> = Arc::default();
218
219 Box::pin({
222 move |node_id| {
223 info!(
224 "GENERATOR: Node id {:?}, is bootstrap: {:?}",
225 node_id,
226 node_id < num_bootstrap as u64
227 );
228
229 let port = std::net::UdpSocket::bind("127.0.0.1:0")
231 .expect("UDP socket should bind")
232 .local_addr()
233 .expect("UDP socket should have local addr")
234 .port();
235
236 let addr =
237 Multiaddr::from_str(&format!("/ip4/127.0.0.1/udp/{port}/quic-v1")).unwrap();
238
239 let privkey = T::SignatureKey::generated_from_seed_indexed([0u8; 32], node_id).1;
241 let pubkey = T::SignatureKey::from_private(&privkey);
242
243 let libp2p_keypair = derive_libp2p_keypair::<T::SignatureKey>(&privkey)
245 .expect("Failed to derive libp2p keypair");
246
247 let lookup_record_value = RecordValue::new_signed(
249 &RecordKey::new(Namespace::Lookup, pubkey.to_bytes()),
250 libp2p_keypair.public().to_peer_id().to_bytes(),
251 &privkey,
252 )
253 .expect("Failed to sign DHT lookup record");
254
255 let replication_factor =
257 NonZeroUsize::new((2 * expected_node_count).div_ceil(3)).unwrap();
258
259 let config = NetworkNodeConfigBuilder::default()
261 .keypair(libp2p_keypair)
262 .replication_factor(replication_factor)
263 .bind_address(Some(addr))
264 .to_connect_addrs(HashSet::default())
265 .republication_interval(None)
266 .build()
267 .expect("Failed to build network node config");
268
269 let bootstrap_addrs_ref = Arc::clone(&bootstrap_addrs);
270 let node_ids_ref = Arc::clone(&node_ids);
271 let reliability_config_dup = reliability_config.clone();
272
273 Box::pin(async move {
274 let mut write_ids = node_ids_ref.write().await;
276 if write_ids.contains(&node_id) {
277 write_ids.clear();
278 }
279 write_ids.insert(node_id);
280 drop(write_ids);
281 Arc::new(
282 match Libp2pNetwork::new(
283 Libp2pMetricsValue::default(),
284 DhtNoPersistence,
285 config,
286 pubkey.clone(),
287 lookup_record_value,
288 bootstrap_addrs_ref,
289 usize::try_from(node_id).unwrap(),
290 #[cfg(feature = "hotshot-testing")]
291 reliability_config_dup,
292 )
293 .await
294 {
295 Ok(network) => network,
296 Err(err) => {
297 panic!("Failed to create libp2p network: {err:?}");
298 },
299 },
300 )
301 })
302 }
303 })
304 }
305
306 fn in_flight_message_count(&self) -> Option<usize> {
307 None
308 }
309}
310
311pub fn derive_libp2p_keypair<K: SignatureKey>(
317 private_key: &K::PrivateKey,
318) -> anyhow::Result<Keypair> {
319 let derived_key = blake3::derive_key("libp2p key", &private_key.to_bytes());
321 let derived_key = SecretKey::try_from_bytes(derived_key)?;
322
323 Ok(ed25519::Keypair::from(derived_key).into())
325}
326
327pub fn derive_libp2p_peer_id<K: SignatureKey>(
332 private_key: &K::PrivateKey,
333) -> anyhow::Result<PeerId> {
334 let keypair = derive_libp2p_keypair::<K>(private_key)?;
336
337 Ok(PeerId::from_public_key(&keypair.public()))
339}
340
341pub fn derive_libp2p_multiaddr(addr: &String) -> anyhow::Result<Multiaddr> {
350 let (host, port) = match addr.rfind(':') {
352 Some(idx) => (&addr[..idx], &addr[idx + 1..]),
353 None => return Err(anyhow!("Invalid address format, no port supplied")),
354 };
355
356 let ip = host.parse::<IpAddr>();
358
359 let multiaddr_string = match ip {
361 Ok(IpAddr::V4(ip)) => format!("/ip4/{ip}/udp/{port}/quic-v1"),
362 Ok(IpAddr::V6(ip)) => format!("/ip6/{ip}/udp/{port}/quic-v1"),
363 Err(_) => {
364 let lookup_result = addr.to_socket_addrs();
366
367 let failed = lookup_result
369 .map(|result| result.collect::<Vec<_>>().is_empty())
370 .unwrap_or(true);
371
372 if failed {
374 warn!(
375 "Failed to resolve domain name {host}, assuming it has not yet been \
376 provisioned"
377 );
378 }
379
380 format!("/dns/{host}/udp/{port}/quic-v1")
381 },
382 };
383
384 multiaddr_string.parse().with_context(|| {
386 format!("Failed to convert Multiaddr string to Multiaddr: {multiaddr_string}")
387 })
388}
389
390impl<T: NodeType> Libp2pNetwork<T> {
391 #[allow(clippy::too_many_arguments)]
400 pub async fn from_config<D: DhtPersistentStorage>(
401 mut config: NetworkConfig<T>,
402 dht_persistent_storage: D,
403 gossip_config: GossipConfig,
404 request_response_config: RequestResponseConfig,
405 bind_address: Multiaddr,
406 announce_addresses: Vec<Multiaddr>,
407 pub_key: &T::SignatureKey,
408 priv_key: &<T::SignatureKey as SignatureKey>::PrivateKey,
409 metrics: Libp2pMetricsValue,
410 ) -> anyhow::Result<Self> {
411 let libp2p_config = config
413 .libp2p_config
414 .take()
415 .ok_or(anyhow!("Libp2p config not supplied"))?;
416
417 let keypair = derive_libp2p_keypair::<T::SignatureKey>(priv_key)?;
419
420 let mut config_builder = NetworkNodeConfigBuilder::default();
422
423 config_builder.gossip_config(gossip_config.clone());
425 config_builder.request_response_config(request_response_config);
426
427 let auth_message =
429 construct_auth_message(pub_key, &keypair.public().to_peer_id(), priv_key)
430 .with_context(|| "Failed to construct auth message")?;
431
432 config_builder.auth_message(Some(auth_message));
434
435 let Some(default_replication_factor) = DEFAULT_REPLICATION_FACTOR else {
437 return Err(anyhow!("Default replication factor not supplied"));
438 };
439
440 let replication_factor = NonZeroUsize::new(min(
441 default_replication_factor.get(),
442 config.config.num_nodes_with_stake.get() / 2,
443 ))
444 .with_context(|| "Failed to calculate replication factor")?;
445
446 let lookup_record_value = RecordValue::new_signed(
448 &RecordKey::new(Namespace::Lookup, pub_key.to_bytes()),
449 keypair.public().to_peer_id().to_bytes(),
451 priv_key,
452 )
453 .with_context(|| "Failed to sign DHT lookup record")?;
454
455 config_builder
456 .keypair(keypair)
457 .replication_factor(replication_factor)
458 .bind_address(Some(bind_address.clone()))
459 .announce_addresses(announce_addresses);
460
461 config_builder.to_connect_addrs(HashSet::from_iter(libp2p_config.bootstrap_nodes.clone()));
463
464 let node_config = config_builder.build()?;
466
467 let mut all_keys = BTreeSet::new();
469
470 for node in config.config.known_nodes_with_stake {
472 all_keys.insert(T::SignatureKey::public_key(&node.stake_table_entry));
473 }
474
475 Ok(Libp2pNetwork::new(
476 metrics,
477 dht_persistent_storage,
478 node_config,
479 pub_key.clone(),
480 lookup_record_value,
481 Arc::new(RwLock::new(libp2p_config.bootstrap_nodes)),
482 usize::try_from(config.node_index)?,
483 #[cfg(feature = "hotshot-testing")]
484 None,
485 )
486 .await?)
487 }
488
489 #[must_use]
491 pub fn has_peers(&self) -> bool {
492 self.inner.is_ready.load(Ordering::Relaxed)
493 }
494
495 pub async fn wait_for_peers(&self) {
497 loop {
498 if self.has_peers() {
499 break;
500 }
501 sleep(Duration::from_secs(1)).await;
502 }
503 }
504
505 #[allow(clippy::too_many_arguments)]
518 pub async fn new<D: DhtPersistentStorage>(
519 metrics: Libp2pMetricsValue,
520 dht_persistent_storage: D,
521 config: NetworkNodeConfig,
522 pk: T::SignatureKey,
523 lookup_record_value: RecordValue<T::SignatureKey>,
524 bootstrap_addrs: BootstrapAddrs,
525 id: usize,
526 #[cfg(feature = "hotshot-testing")] reliability_config: Option<Box<dyn NetworkReliability>>,
527 ) -> Result<Libp2pNetwork<T>, NetworkError> {
528 let consensus_key_to_pid_map = Arc::new(parking_lot::Mutex::new(BiMap::new()));
530
531 let (mut rx, network_handle) = spawn_network_node::<T, D>(
532 config.clone(),
533 dht_persistent_storage,
534 Arc::clone(&consensus_key_to_pid_map),
535 id,
536 )
537 .await
538 .map_err(|e| NetworkError::ConfigError(format!("failed to spawn network node: {e}")))?;
539
540 let addr = network_handle.listen_addr();
542 let pid = network_handle.peer_id();
543 bootstrap_addrs.write().await.push((pid, addr));
544
545 let subscribed_topics = HashSet::from_iter(vec![QC_TOPIC.to_string()]);
547
548 let (sender, receiver) = channel(1000);
551 let (node_lookup_send, node_lookup_recv) = channel(10);
552 let (kill_tx, kill_rx) = channel(1);
553 rx.set_kill_switch(kill_rx);
554
555 let mut result = Libp2pNetwork {
556 inner: Arc::new(Libp2pNetworkInner {
557 handle: Arc::new(network_handle),
558 receiver: Mutex::new(receiver),
559 sender: sender.clone(),
560 pk,
561 bootstrap_addrs,
562 is_ready: Arc::new(AtomicBool::new(false)),
563 dht_timeout: config.dht_timeout.unwrap_or(Duration::from_secs(120)),
565 is_bootstrapped: Arc::new(AtomicBool::new(false)),
566 metrics,
567 subscribed_topics,
568 node_lookup_send,
569 latest_seen_view: Arc::new(AtomicU64::new(0)),
573 #[cfg(feature = "hotshot-testing")]
574 reliability_config,
575 kill_switch: kill_tx,
576 }),
577 };
578
579 result.inner.metrics.is_ready.set(0);
581
582 result.handle_event_generator(sender, rx);
583 result.spawn_node_lookup(node_lookup_recv);
584 result.spawn_connect(id, lookup_record_value);
585
586 Ok(result)
587 }
588
589 #[allow(clippy::cast_sign_loss, clippy::cast_precision_loss)]
591 fn spawn_node_lookup(
592 &self,
593 mut node_lookup_recv: Receiver<Option<(ViewNumber, T::SignatureKey)>>,
594 ) {
595 let handle = Arc::clone(&self.inner.handle);
596 let dht_timeout = self.inner.dht_timeout;
597 let latest_seen_view = Arc::clone(&self.inner.latest_seen_view);
598
599 spawn(async move {
601 while let Some(Some((view_number, pk))) = node_lookup_recv.recv().await {
603 #[allow(clippy::cast_possible_truncation)]
605 const THRESHOLD: u64 = (LOOK_AHEAD as f64 * 0.8) as u64;
606
607 trace!("Performing lookup for peer {pk}");
608
609 if latest_seen_view.load(Ordering::Relaxed) + THRESHOLD <= *view_number {
611 if let Err(err) = handle.lookup_node(&pk, dht_timeout).await {
613 LogEvent::DhtLookupFailure.record();
614 debug!("Failed to perform lookup for key {pk}: {err}");
615 };
616 }
617 }
618 });
619 }
620
621 fn spawn_connect(&mut self, id: usize, lookup_record_value: RecordValue<T::SignatureKey>) {
623 let pk = self.inner.pk.clone();
624 let bootstrap_ref = Arc::clone(&self.inner.bootstrap_addrs);
625 let handle = Arc::clone(&self.inner.handle);
626 let is_bootstrapped = Arc::clone(&self.inner.is_bootstrapped);
627 let inner = Arc::clone(&self.inner);
628
629 spawn({
630 let is_ready = Arc::clone(&self.inner.is_ready);
631 async move {
632 let bs_addrs = bootstrap_ref.read().await.clone();
633
634 handle.add_known_peers(bs_addrs).unwrap();
636
637 handle.begin_bootstrap()?;
639 while !is_bootstrapped.load(Ordering::Relaxed) {
640 sleep(Duration::from_secs(1)).await;
641 handle.begin_bootstrap()?;
642 }
643
644 handle.subscribe(QC_TOPIC.to_string()).await.unwrap();
646
647 while handle
650 .put_record(
651 RecordKey::new(Namespace::Lookup, pk.to_bytes()),
652 lookup_record_value.clone(),
653 )
654 .await
655 .is_err()
656 {
657 sleep(Duration::from_secs(1)).await;
658 }
659
660 if let Err(e) = handle.wait_to_connect(1, id).await {
662 error!("Failed to connect to peers: {e:?}");
663 return Err::<(), NetworkError>(e);
664 }
665 info!("Connected to required number of peers");
666
667 is_ready.store(true, Ordering::Relaxed);
669 inner.metrics.is_ready.set(1);
670
671 Ok::<(), NetworkError>(())
672 }
673 });
674 }
675
676 fn handle_recvd_events(
678 &self,
679 msg: NetworkEvent,
680 sender: &Sender<Vec<u8>>,
681 ) -> Result<(), NetworkError> {
682 match msg {
683 GossipMsg(msg) => {
684 sender.try_send(msg).map_err(|err| {
685 NetworkError::ChannelSendError(format!("failed to send gossip message: {err}"))
686 })?;
687 },
688 DirectRequest(msg, _pid, chan) => {
689 sender.try_send(msg).map_err(|err| {
690 NetworkError::ChannelSendError(format!(
691 "failed to send direct request message: {err}"
692 ))
693 })?;
694 if self
695 .inner
696 .handle
697 .direct_response(
698 chan,
699 &bincode::serialize(&Empty { byte: 0u8 }).map_err(|e| {
700 NetworkError::FailedToSerialize(format!(
701 "failed to serialize acknowledgement: {e}"
702 ))
703 })?,
704 )
705 .is_err()
706 {
707 error!("failed to ack!");
708 };
709 },
710 DirectResponse(_msg, _) => {},
711 NetworkEvent::IsBootstrapped => {
712 error!(
713 "handle_recvd_events received `NetworkEvent::IsBootstrapped`, which should be \
714 impossible."
715 );
716 },
717 NetworkEvent::ConnectedPeersUpdate(_) => {},
718 }
719 Ok::<(), NetworkError>(())
720 }
721
722 fn handle_event_generator(&self, sender: Sender<Vec<u8>>, mut network_rx: NetworkNodeReceiver) {
725 let handle = self.clone();
726 let is_bootstrapped = Arc::clone(&self.inner.is_bootstrapped);
727 spawn(async move {
728 let Some(mut kill_switch) = network_rx.take_kill_switch() else {
729 tracing::error!(
730 "`spawn_handle` was called on a network handle that was already closed"
731 );
732 return;
733 };
734
735 loop {
736 select! {
737 msg = network_rx.recv() => {
738 let Ok(message) = msg else {
739 warn!("Network receiver shut down!");
740 return;
741 };
742
743 match message {
744 NetworkEvent::IsBootstrapped => {
745 is_bootstrapped.store(true, Ordering::Relaxed);
746 }
747 GossipMsg(_) | DirectRequest(_, _, _) | DirectResponse(_, _) => {
748 let _ = handle.handle_recvd_events(message, &sender);
749 }
750 NetworkEvent::ConnectedPeersUpdate(num_peers) => {
751 handle.inner.metrics.num_connected_peers.set(num_peers);
752 }
753 }
754 }
755
756 _kill_switch = kill_switch.recv() => {
757 warn!("Event Handler shutdown");
758 return;
759 }
760 }
761 }
762 });
763 }
764}
765
766#[async_trait]
767impl<T: NodeType> ConnectedNetwork<T::SignatureKey> for Libp2pNetwork<T> {
768 #[instrument(name = "Libp2pNetwork::ready_blocking", skip_all)]
769 async fn wait_for_ready(&self) {
770 self.wait_for_peers().await;
771 }
772
773 fn pause(&self) {
774 unimplemented!("Pausing not implemented for the Libp2p network");
775 }
776
777 fn resume(&self) {
778 unimplemented!("Resuming not implemented for the Libp2p network");
779 }
780
781 #[instrument(name = "Libp2pNetwork::shut_down", skip_all)]
782 fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
783 where
784 'a: 'b,
785 Self: 'b,
786 {
787 let closure = async move {
788 let _ = self.inner.handle.shutdown().await;
789 let _ = self.inner.node_lookup_send.send(None).await;
790 let _ = self.inner.kill_switch.send(()).await;
791 };
792 boxed_sync(closure)
793 }
794
795 #[instrument(name = "Libp2pNetwork::broadcast_message", skip_all)]
796 async fn broadcast_message(
797 &self,
798 _: ViewNumber,
799 message: Vec<u8>,
800 topic: Topic,
801 _broadcast_delay: BroadcastDelay,
802 ) -> Result<(), NetworkError> {
803 if !self.has_peers() {
805 self.inner.metrics.num_failed_messages.add(1);
806 return Err(NetworkError::NoPeersYet);
807 };
808
809 let topic = topic.to_string();
811 if self.inner.subscribed_topics.contains(&topic) {
812 self.inner.sender.try_send(message.clone()).map_err(|_| {
814 self.inner.metrics.num_failed_messages.add(1);
815 NetworkError::ShutDown
816 })?;
817 }
818
819 #[cfg(feature = "hotshot-testing")]
821 {
822 let metrics = self.inner.metrics.clone();
823 if let Some(config) = &self.inner.reliability_config {
824 let handle = Arc::clone(&self.inner.handle);
825
826 let fut = config.clone().chaos_send_msg(
827 message,
828 Arc::new(move |msg: Vec<u8>| {
829 let topic_2 = topic.clone();
830 let handle_2 = Arc::clone(&handle);
831 let metrics_2 = metrics.clone();
832 boxed_sync(async move {
833 if let Err(e) = handle_2.gossip_no_serialize(topic_2, msg) {
834 metrics_2.num_failed_messages.add(1);
835 warn!("Failed to broadcast to libp2p: {e:?}");
836 }
837 })
838 }),
839 );
840 spawn(fut);
841 return Ok(());
842 }
843 }
844
845 if let Err(e) = self.inner.handle.gossip(topic, &message) {
846 self.inner.metrics.num_failed_messages.add(1);
847 return Err(e);
848 }
849
850 Ok(())
851 }
852
853 #[instrument(name = "Libp2pNetwork::da_broadcast_message", skip_all)]
854 async fn da_broadcast_message(
855 &self,
856 view: ViewNumber,
857 message: Vec<u8>,
858 recipients: Vec<T::SignatureKey>,
859 _broadcast_delay: BroadcastDelay,
860 ) -> Result<(), NetworkError> {
861 if !self.has_peers() {
863 self.inner.metrics.num_failed_messages.add(1);
864 return Err(NetworkError::NoPeersYet);
865 };
866
867 let topic = Topic::Da.to_string();
869 if self.inner.subscribed_topics.contains(&topic) {
870 self.inner.sender.try_send(message.clone()).map_err(|_| {
871 self.inner.metrics.num_failed_messages.add(1);
872 NetworkError::ShutDown
873 })?;
874 }
875
876 let future_results = recipients
877 .into_iter()
878 .map(|r| self.direct_message(view, message.clone(), r));
879 let results = join_all(future_results).await;
880
881 let errors: Vec<_> = results.into_iter().filter_map(|r| r.err()).collect();
882
883 if errors.is_empty() {
884 Ok(())
885 } else {
886 Err(NetworkError::Multiple(errors))
887 }
888 }
889
890 #[instrument(name = "Libp2pNetwork::direct_message", skip_all)]
891 async fn direct_message(
892 &self,
893 _: ViewNumber,
894 message: Vec<u8>,
895 recipient: T::SignatureKey,
896 ) -> Result<(), NetworkError> {
897 if !self.has_peers() {
899 self.inner.metrics.num_failed_messages.add(1);
900 return Err(NetworkError::NoPeersYet);
901 };
902
903 if recipient == self.inner.pk {
905 self.inner.sender.try_send(message).map_err(|_x| {
907 self.inner.metrics.num_failed_messages.add(1);
908 NetworkError::ShutDown
909 })?;
910 return Ok(());
911 }
912
913 let pid = match self
914 .inner
915 .handle
916 .lookup_node(&recipient, Duration::from_secs(2))
917 .await
918 {
919 Ok(pid) => pid,
920 Err(err) => {
921 self.inner.metrics.num_failed_messages.add(1);
922 return Err(NetworkError::LookupError(format!(
923 "failed to look up node for direct message: {err}"
924 )));
925 },
926 };
927
928 #[cfg(feature = "hotshot-testing")]
929 {
930 let metrics = self.inner.metrics.clone();
931 if let Some(config) = &self.inner.reliability_config {
932 let handle = Arc::clone(&self.inner.handle);
933
934 let fut = config.clone().chaos_send_msg(
935 message,
936 Arc::new(move |msg: Vec<u8>| {
937 let handle_2 = Arc::clone(&handle);
938 let metrics_2 = metrics.clone();
939 boxed_sync(async move {
940 if let Err(e) = handle_2.direct_request_no_serialize(pid, msg) {
941 metrics_2.num_failed_messages.add(1);
942 warn!("Failed to broadcast to libp2p: {e:?}");
943 }
944 })
945 }),
946 );
947 spawn(fut);
948 return Ok(());
949 }
950 }
951
952 match self.inner.handle.direct_request(pid, &message) {
953 Ok(()) => Ok(()),
954 Err(e) => {
955 self.inner.metrics.num_failed_messages.add(1);
956 Err(e)
957 },
958 }
959 }
960
961 #[instrument(name = "Libp2pNetwork::recv_message", skip_all)]
966 async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
967 let result = self
968 .inner
969 .receiver
970 .lock()
971 .await
972 .recv()
973 .await
974 .ok_or(NetworkError::ShutDown)?;
975
976 Ok(result)
977 }
978
979 #[instrument(name = "Libp2pNetwork::queue_node_lookup", skip_all)]
980 #[allow(clippy::type_complexity)]
981 fn queue_node_lookup(
982 &self,
983 view_number: ViewNumber,
984 pk: T::SignatureKey,
985 ) -> Result<(), TrySendError<Option<(ViewNumber, T::SignatureKey)>>> {
986 self.inner
987 .node_lookup_send
988 .try_send(Some((view_number, pk)))
989 }
990
991 async fn update_view<TYPES>(
1004 &self,
1005 view: ViewNumber,
1006 epoch: Option<EpochNumber>,
1007 membership_coordinator: EpochMembershipCoordinator<TYPES>,
1008 ) where
1009 TYPES: NodeType<SignatureKey = T::SignatureKey>,
1010 {
1011 let future_view = ViewNumber::new(*view) + LOOK_AHEAD;
1012 let epoch = epoch.map(|e| EpochNumber::new(*e));
1013
1014 let membership = match membership_coordinator.membership_for_epoch(epoch) {
1015 Ok(m) => m,
1016 Err(e) => {
1017 return tracing::warn!(e.message);
1018 },
1019 };
1020 let future_leader = match membership.leader(future_view) {
1021 Ok(l) => l,
1022 Err(e) => {
1023 return tracing::info!("Failed to calculate leader for view {future_view}: {e}");
1024 },
1025 };
1026
1027 let _ = self
1028 .queue_node_lookup(ViewNumber::new(*future_view), future_leader)
1029 .map_err(|err| tracing::warn!("failed to process node lookup request: {err}"));
1030 }
1031}
1032
1033#[cfg(test)]
1034mod test {
1035 mod derive_multiaddr {
1036 use std::net::Ipv6Addr;
1037
1038 use super::super::*;
1039
1040 #[test]
1042 fn test_v4_valid() {
1043 let addr = "1.1.1.1:8080".to_string();
1045 let multiaddr =
1046 derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1047
1048 assert_eq!(multiaddr.to_string(), "/ip4/1.1.1.1/udp/8080/quic-v1");
1050 }
1051
1052 #[test]
1054 fn test_v6_valid() {
1055 let ipv6_addr = Ipv6Addr::new(1, 2, 3, 4, 5, 6, 7, 8);
1057 let addr = format!("{ipv6_addr}:8080");
1058 let multiaddr =
1059 derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1060
1061 assert_eq!(
1063 multiaddr.to_string(),
1064 format!("/ip6/{ipv6_addr}/udp/8080/quic-v1")
1065 );
1066 }
1067
1068 #[test]
1070 fn test_no_port() {
1071 let addr = "1.1.1.1".to_string();
1073 let multiaddr = derive_libp2p_multiaddr(&addr);
1074
1075 assert!(multiaddr.is_err());
1077 }
1078
1079 #[test]
1081 fn test_fqdn_exists() {
1082 let addr = "example.com:8080".to_string();
1084 let multiaddr =
1085 derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1086
1087 assert_eq!(multiaddr.to_string(), "/dns/example.com/udp/8080/quic-v1");
1089 }
1090
1091 #[test]
1093 fn test_fqdn_does_not_exist() {
1094 let addr = "libp2p.example.com:8080".to_string();
1096 let multiaddr =
1097 derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1098
1099 assert_eq!(
1101 multiaddr.to_string(),
1102 "/dns/libp2p.example.com/udp/8080/quic-v1"
1103 );
1104 }
1105
1106 #[test]
1108 fn test_fqdn_no_port() {
1109 let addr = "example.com".to_string();
1111 let multiaddr = derive_libp2p_multiaddr(&addr);
1112
1113 assert!(multiaddr.is_err());
1115 }
1116 }
1117}