Skip to main content

hotshot/traits/networking/
libp2p_network.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Libp2p based/production networking implementation
8//! This module provides a libp2p based networking implementation where each node in the
9//! network forms a tcp or udp connection to a subset of other nodes in the network
10use std::{
11    cmp::min,
12    collections::{BTreeSet, HashSet},
13    fmt::Debug,
14    net::{IpAddr, ToSocketAddrs},
15    num::NonZeroUsize,
16    sync::{
17        Arc,
18        atomic::{AtomicBool, AtomicU64, Ordering},
19    },
20    time::Duration,
21};
22#[cfg(feature = "hotshot-testing")]
23use std::{collections::HashMap, str::FromStr};
24
25use anyhow::{Context, anyhow};
26use async_lock::RwLock;
27use async_trait::async_trait;
28use bimap::BiMap;
29use futures::future::join_all;
30#[cfg(feature = "hotshot-testing")]
31use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtNoPersistence;
32pub use hotshot_libp2p_networking::network::{GossipConfig, RequestResponseConfig};
33use hotshot_libp2p_networking::{
34    network::{
35        DEFAULT_REPLICATION_FACTOR,
36        NetworkEvent::{self, DirectRequest, DirectResponse, GossipMsg},
37        NetworkNodeConfig, NetworkNodeConfigBuilder, NetworkNodeHandle, NetworkNodeReceiver,
38        behaviours::dht::{
39            record::{Namespace, RecordKey, RecordValue},
40            store::persistent::DhtPersistentStorage,
41        },
42        log_summary::LogEvent,
43        spawn_network_node,
44        transport::construct_auth_message,
45    },
46    reexport::Multiaddr,
47};
48use hotshot_types::{
49    BoxSyncFuture, boxed_sync,
50    constants::LOOK_AHEAD,
51    data::{EpochNumber, ViewNumber},
52    network::NetworkConfig,
53    traits::{
54        metrics::{Counter, Gauge, Metrics, NoMetrics},
55        network::{ConnectedNetwork, NetworkError, Topic},
56        node_implementation::NodeType,
57        signature_key::{PrivateSignatureKey, SignatureKey},
58    },
59};
60#[cfg(feature = "hotshot-testing")]
61use hotshot_types::{
62    PeerConnectInfo,
63    traits::network::{AsyncGenerator, NetworkReliability, TestableNetworkingImplementation},
64};
65use libp2p_identity::{
66    Keypair, PeerId,
67    ed25519::{self, SecretKey},
68};
69use serde::Serialize;
70use tokio::{
71    select, spawn,
72    sync::{
73        Mutex,
74        mpsc::{Receiver, Sender, channel, error::TrySendError},
75    },
76    time::sleep,
77};
78use tracing::{debug, error, info, instrument, trace, warn};
79
80use crate::{BroadcastDelay, EpochMembershipCoordinator};
81
82/// Libp2p-specific metrics
83#[derive(Clone, Debug)]
84pub struct Libp2pMetricsValue {
85    /// The number of currently connected peers
86    pub num_connected_peers: Box<dyn Gauge>,
87    /// The number of failed messages
88    pub num_failed_messages: Box<dyn Counter>,
89    /// Whether or not the network is considered ready
90    pub is_ready: Box<dyn Gauge>,
91}
92
93impl Libp2pMetricsValue {
94    /// Populate the metrics with Libp2p-specific metrics
95    pub fn new(metrics: &dyn Metrics) -> Self {
96        // Create a `libp2p subgroup
97        let subgroup = metrics.subgroup("libp2p".into());
98
99        // Create the metrics
100        Self {
101            num_connected_peers: subgroup.create_gauge("num_connected_peers".into(), None),
102            num_failed_messages: subgroup.create_counter("num_failed_messages".into(), None),
103            is_ready: subgroup.create_gauge("is_ready".into(), None),
104        }
105    }
106}
107
108impl Default for Libp2pMetricsValue {
109    /// Initialize with empty metrics
110    fn default() -> Self {
111        Self::new(&*NoMetrics::boxed())
112    }
113}
114
115/// convenience alias for the type for bootstrap addresses
116/// concurrency primitives are needed for having tests
117pub type BootstrapAddrs = Arc<RwLock<Vec<(PeerId, Multiaddr)>>>;
118
119/// hardcoded topic of QC used
120pub const QC_TOPIC: &str = "global";
121
122/// Stubbed out Ack
123///
124/// Note: as part of versioning for upgradability,
125/// all network messages must begin with a 4-byte version number.
126///
127/// Hence:
128///   * `Empty` *must* be a struct (enums are serialized with a leading byte for the variant), and
129///   * we must have an explicit version field.
130#[derive(Serialize)]
131pub struct Empty {
132    /// This should not be required, but it is. Version automatically gets prepended.
133    /// Perhaps this could be replaced with something zero-sized and serializable.
134    byte: u8,
135}
136
137impl<T: NodeType> Debug for Libp2pNetwork<T> {
138    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139        f.debug_struct("Libp2p").field("inner", &"inner").finish()
140    }
141}
142
143/// Type alias for a shared collection of peerid, multiaddrs
144pub type PeerInfoVec = Arc<RwLock<Vec<(PeerId, Multiaddr)>>>;
145
146/// The underlying state of the libp2p network
147#[derive(Debug)]
148struct Libp2pNetworkInner<T: NodeType> {
149    /// this node's public key
150    pk: T::SignatureKey,
151    /// handle to control the network
152    handle: Arc<NetworkNodeHandle<T>>,
153    /// Message Receiver
154    receiver: Mutex<Receiver<Vec<u8>>>,
155    /// Sender for broadcast messages
156    sender: Sender<Vec<u8>>,
157    /// Sender for node lookup (relevant view number, key of node) (None for shutdown)
158    node_lookup_send: Sender<Option<(ViewNumber, T::SignatureKey)>>,
159    /// this is really cheating to enable local tests
160    /// hashset of (bootstrap_addr, peer_id)
161    bootstrap_addrs: PeerInfoVec,
162    /// whether or not the network is ready to send
163    is_ready: Arc<AtomicBool>,
164    /// max time before dropping message due to DHT error
165    dht_timeout: Duration,
166    /// whether or not we've bootstrapped into the DHT yet
167    is_bootstrapped: Arc<AtomicBool>,
168    /// The Libp2p metrics we're managing
169    metrics: Libp2pMetricsValue,
170    /// The list of topics we're subscribed to
171    subscribed_topics: HashSet<String>,
172    /// the latest view number (for node lookup purposes)
173    /// NOTE: supposed to represent a ViewNumber but we
174    /// haven't made that atomic yet and we prefer lock-free
175    latest_seen_view: Arc<AtomicU64>,
176    #[cfg(feature = "hotshot-testing")]
177    /// reliability_config
178    reliability_config: Option<Box<dyn NetworkReliability>>,
179    /// Killswitch sender
180    kill_switch: Sender<()>,
181}
182
183/// Networking implementation that uses libp2p
184/// generic over `M` which is the message type
185#[derive(Clone)]
186pub struct Libp2pNetwork<T: NodeType> {
187    /// holds the state of the libp2p network
188    inner: Arc<Libp2pNetworkInner<T>>,
189}
190
191#[cfg(feature = "hotshot-testing")]
192impl<T: NodeType> TestableNetworkingImplementation<T> for Libp2pNetwork<T> {
193    /// Returns a boxed function `f(node_id, public_key) -> Libp2pNetwork`
194    /// with the purpose of generating libp2p networks.
195    /// Generates `num_bootstrap` bootstrap nodes. The remainder of nodes are normal
196    /// nodes with sane defaults.
197    /// # Panics
198    /// Returned function may panic either:
199    /// - An invalid configuration
200    ///   (probably an issue with the defaults of this function)
201    /// - An inability to spin up the replica's network
202    #[allow(clippy::panic, clippy::too_many_lines)]
203    fn generator(
204        expected_node_count: usize,
205        num_bootstrap: usize,
206        _network_id: usize,
207        da_committee_size: usize,
208        reliability_config: Option<Box<dyn NetworkReliability>>,
209        _secondary_network_delay: Duration,
210        _connect_infos: &mut HashMap<T::SignatureKey, PeerConnectInfo>,
211    ) -> AsyncGenerator<Arc<Self>> {
212        assert!(
213            da_committee_size <= expected_node_count,
214            "DA committee size must be less than or equal to total # nodes"
215        );
216        let bootstrap_addrs: PeerInfoVec = Arc::default();
217        let node_ids: Arc<RwLock<HashSet<u64>>> = Arc::default();
218
219        // NOTE uncomment this for easier debugging
220        // let start_port = 5000;
221        Box::pin({
222            move |node_id| {
223                info!(
224                    "GENERATOR: Node id {:?}, is bootstrap: {:?}",
225                    node_id,
226                    node_id < num_bootstrap as u64
227                );
228
229                // UDP has no TIME_WAIT, so there's a tiny race before libp2p binds.
230                let port = std::net::UdpSocket::bind("127.0.0.1:0")
231                    .expect("UDP socket should bind")
232                    .local_addr()
233                    .expect("UDP socket should have local addr")
234                    .port();
235
236                let addr =
237                    Multiaddr::from_str(&format!("/ip4/127.0.0.1/udp/{port}/quic-v1")).unwrap();
238
239                // We assign node's public key and stake value rather than read from config file since it's a test
240                let privkey = T::SignatureKey::generated_from_seed_indexed([0u8; 32], node_id).1;
241                let pubkey = T::SignatureKey::from_private(&privkey);
242
243                // Derive the Libp2p keypair from the private key
244                let libp2p_keypair = derive_libp2p_keypair::<T::SignatureKey>(&privkey)
245                    .expect("Failed to derive libp2p keypair");
246
247                // Sign the lookup record
248                let lookup_record_value = RecordValue::new_signed(
249                    &RecordKey::new(Namespace::Lookup, pubkey.to_bytes()),
250                    libp2p_keypair.public().to_peer_id().to_bytes(),
251                    &privkey,
252                )
253                .expect("Failed to sign DHT lookup record");
254
255                // We want at least 2/3 of the nodes to have any given record in the DHT
256                let replication_factor =
257                    NonZeroUsize::new((2 * expected_node_count).div_ceil(3)).unwrap();
258
259                // Build the network node configuration
260                let config = NetworkNodeConfigBuilder::default()
261                    .keypair(libp2p_keypair)
262                    .replication_factor(replication_factor)
263                    .bind_address(Some(addr))
264                    .to_connect_addrs(HashSet::default())
265                    .republication_interval(None)
266                    .build()
267                    .expect("Failed to build network node config");
268
269                let bootstrap_addrs_ref = Arc::clone(&bootstrap_addrs);
270                let node_ids_ref = Arc::clone(&node_ids);
271                let reliability_config_dup = reliability_config.clone();
272
273                Box::pin(async move {
274                    // If it's the second time we are starting this network, clear the bootstrap info
275                    let mut write_ids = node_ids_ref.write().await;
276                    if write_ids.contains(&node_id) {
277                        write_ids.clear();
278                    }
279                    write_ids.insert(node_id);
280                    drop(write_ids);
281                    Arc::new(
282                        match Libp2pNetwork::new(
283                            Libp2pMetricsValue::default(),
284                            DhtNoPersistence,
285                            config,
286                            pubkey.clone(),
287                            lookup_record_value,
288                            bootstrap_addrs_ref,
289                            usize::try_from(node_id).unwrap(),
290                            #[cfg(feature = "hotshot-testing")]
291                            reliability_config_dup,
292                        )
293                        .await
294                        {
295                            Ok(network) => network,
296                            Err(err) => {
297                                panic!("Failed to create libp2p network: {err:?}");
298                            },
299                        },
300                    )
301                })
302            }
303        })
304    }
305
306    fn in_flight_message_count(&self) -> Option<usize> {
307        None
308    }
309}
310
311/// Derive a Libp2p keypair from a given private key
312///
313/// # Errors
314/// If we are unable to derive a new `SecretKey` from the `blake3`-derived
315/// bytes.
316pub fn derive_libp2p_keypair<K: SignatureKey>(
317    private_key: &K::PrivateKey,
318) -> anyhow::Result<Keypair> {
319    // Derive a secondary key from our primary private key
320    let derived_key = blake3::derive_key("libp2p key", &private_key.to_bytes());
321    let derived_key = SecretKey::try_from_bytes(derived_key)?;
322
323    // Create an `ed25519` keypair from the derived key
324    Ok(ed25519::Keypair::from(derived_key).into())
325}
326
327/// Derive a Libp2p Peer ID from a given private key
328///
329/// # Errors
330/// If we are unable to derive a Libp2p keypair
331pub fn derive_libp2p_peer_id<K: SignatureKey>(
332    private_key: &K::PrivateKey,
333) -> anyhow::Result<PeerId> {
334    // Get the derived keypair
335    let keypair = derive_libp2p_keypair::<K>(private_key)?;
336
337    // Return the PeerID derived from the public key
338    Ok(PeerId::from_public_key(&keypair.public()))
339}
340
341/// Parse a Libp2p Multiaddr from a string. The input string should be in the format
342/// `hostname:port` or `ip:port`. This function derives a `Multiaddr` from the input string.
343///
344/// This borrows from Rust's implementation of `to_socket_addrs` but will only warn if the domain
345/// does not yet resolve.
346///
347/// # Errors
348/// - If the input string is not in the correct format
349pub fn derive_libp2p_multiaddr(addr: &String) -> anyhow::Result<Multiaddr> {
350    // Split the address into the host and port parts
351    let (host, port) = match addr.rfind(':') {
352        Some(idx) => (&addr[..idx], &addr[idx + 1..]),
353        None => return Err(anyhow!("Invalid address format, no port supplied")),
354    };
355
356    // Try parsing the host as an IP address
357    let ip = host.parse::<IpAddr>();
358
359    // Conditionally build the multiaddr string
360    let multiaddr_string = match ip {
361        Ok(IpAddr::V4(ip)) => format!("/ip4/{ip}/udp/{port}/quic-v1"),
362        Ok(IpAddr::V6(ip)) => format!("/ip6/{ip}/udp/{port}/quic-v1"),
363        Err(_) => {
364            // Try resolving the host. If it fails, continue but warn the user
365            let lookup_result = addr.to_socket_addrs();
366
367            // See if the lookup failed
368            let failed = lookup_result
369                .map(|result| result.collect::<Vec<_>>().is_empty())
370                .unwrap_or(true);
371
372            // If it did, warn the user
373            if failed {
374                warn!(
375                    "Failed to resolve domain name {host}, assuming it has not yet been \
376                     provisioned"
377                );
378            }
379
380            format!("/dns/{host}/udp/{port}/quic-v1")
381        },
382    };
383
384    // Convert the multiaddr string to a `Multiaddr`
385    multiaddr_string.parse().with_context(|| {
386        format!("Failed to convert Multiaddr string to Multiaddr: {multiaddr_string}")
387    })
388}
389
390impl<T: NodeType> Libp2pNetwork<T> {
391    /// Create and return a Libp2p network from a network config file
392    /// and various other configuration-specific values.
393    ///
394    /// # Errors
395    /// If we are unable to parse a Multiaddress
396    ///
397    /// # Panics
398    /// If we are unable to calculate the replication factor
399    #[allow(clippy::too_many_arguments)]
400    pub async fn from_config<D: DhtPersistentStorage>(
401        mut config: NetworkConfig<T>,
402        dht_persistent_storage: D,
403        gossip_config: GossipConfig,
404        request_response_config: RequestResponseConfig,
405        bind_address: Multiaddr,
406        announce_addresses: Vec<Multiaddr>,
407        pub_key: &T::SignatureKey,
408        priv_key: &<T::SignatureKey as SignatureKey>::PrivateKey,
409        metrics: Libp2pMetricsValue,
410    ) -> anyhow::Result<Self> {
411        // Try to take our Libp2p config from our broader network config
412        let libp2p_config = config
413            .libp2p_config
414            .take()
415            .ok_or(anyhow!("Libp2p config not supplied"))?;
416
417        // Derive our Libp2p keypair from our supplied private key
418        let keypair = derive_libp2p_keypair::<T::SignatureKey>(priv_key)?;
419
420        // Build our libp2p configuration
421        let mut config_builder = NetworkNodeConfigBuilder::default();
422
423        // Set the gossip configuration
424        config_builder.gossip_config(gossip_config.clone());
425        config_builder.request_response_config(request_response_config);
426
427        // Construct the auth message
428        let auth_message =
429            construct_auth_message(pub_key, &keypair.public().to_peer_id(), priv_key)
430                .with_context(|| "Failed to construct auth message")?;
431
432        // Set the auth message and stake table
433        config_builder.auth_message(Some(auth_message));
434
435        // The replication factor is the minimum of [the default and 2/3 the number of nodes]
436        let Some(default_replication_factor) = DEFAULT_REPLICATION_FACTOR else {
437            return Err(anyhow!("Default replication factor not supplied"));
438        };
439
440        let replication_factor = NonZeroUsize::new(min(
441            default_replication_factor.get(),
442            config.config.num_nodes_with_stake.get() / 2,
443        ))
444        .with_context(|| "Failed to calculate replication factor")?;
445
446        // Sign our DHT lookup record
447        let lookup_record_value = RecordValue::new_signed(
448            &RecordKey::new(Namespace::Lookup, pub_key.to_bytes()),
449            // The value is our Libp2p Peer ID
450            keypair.public().to_peer_id().to_bytes(),
451            priv_key,
452        )
453        .with_context(|| "Failed to sign DHT lookup record")?;
454
455        config_builder
456            .keypair(keypair)
457            .replication_factor(replication_factor)
458            .bind_address(Some(bind_address.clone()))
459            .announce_addresses(announce_addresses);
460
461        // Connect to the provided bootstrap nodes
462        config_builder.to_connect_addrs(HashSet::from_iter(libp2p_config.bootstrap_nodes.clone()));
463
464        // Build the node's configuration
465        let node_config = config_builder.build()?;
466
467        // Calculate all keys so we can keep track of direct message recipients
468        let mut all_keys = BTreeSet::new();
469
470        // Insert all known nodes into the set of all keys
471        for node in config.config.known_nodes_with_stake {
472            all_keys.insert(T::SignatureKey::public_key(&node.stake_table_entry));
473        }
474
475        Ok(Libp2pNetwork::new(
476            metrics,
477            dht_persistent_storage,
478            node_config,
479            pub_key.clone(),
480            lookup_record_value,
481            Arc::new(RwLock::new(libp2p_config.bootstrap_nodes)),
482            usize::try_from(config.node_index)?,
483            #[cfg(feature = "hotshot-testing")]
484            None,
485        )
486        .await?)
487    }
488
489    /// Returns whether or not the network has any peers.
490    #[must_use]
491    pub fn has_peers(&self) -> bool {
492        self.inner.is_ready.load(Ordering::Relaxed)
493    }
494
495    /// Returns only when the network is ready.
496    pub async fn wait_for_peers(&self) {
497        loop {
498            if self.has_peers() {
499                break;
500            }
501            sleep(Duration::from_secs(1)).await;
502        }
503    }
504
505    /// Constructs new network for a node. Note that this network is unconnected.
506    /// One must call `connect` in order to connect.
507    /// * `config`: the configuration of the node
508    /// * `pk`: public key associated with the node
509    /// * `bootstrap_addrs`: rwlock containing the bootstrap addrs
510    /// # Errors
511    /// Returns error in the event that the underlying libp2p network
512    /// is unable to create a network.
513    ///
514    /// # Panics
515    ///
516    /// This will panic if there are less than 5 bootstrap nodes
517    #[allow(clippy::too_many_arguments)]
518    pub async fn new<D: DhtPersistentStorage>(
519        metrics: Libp2pMetricsValue,
520        dht_persistent_storage: D,
521        config: NetworkNodeConfig,
522        pk: T::SignatureKey,
523        lookup_record_value: RecordValue<T::SignatureKey>,
524        bootstrap_addrs: BootstrapAddrs,
525        id: usize,
526        #[cfg(feature = "hotshot-testing")] reliability_config: Option<Box<dyn NetworkReliability>>,
527    ) -> Result<Libp2pNetwork<T>, NetworkError> {
528        // Create a map from consensus keys to Libp2p peer IDs
529        let consensus_key_to_pid_map = Arc::new(parking_lot::Mutex::new(BiMap::new()));
530
531        let (mut rx, network_handle) = spawn_network_node::<T, D>(
532            config.clone(),
533            dht_persistent_storage,
534            Arc::clone(&consensus_key_to_pid_map),
535            id,
536        )
537        .await
538        .map_err(|e| NetworkError::ConfigError(format!("failed to spawn network node: {e}")))?;
539
540        // Add our own address to the bootstrap addresses
541        let addr = network_handle.listen_addr();
542        let pid = network_handle.peer_id();
543        bootstrap_addrs.write().await.push((pid, addr));
544
545        // Subscribe to the relevant topics
546        let subscribed_topics = HashSet::from_iter(vec![QC_TOPIC.to_string()]);
547
548        // unbounded channels may not be the best choice (spammed?)
549        // if bounded figure out a way to log dropped msgs
550        let (sender, receiver) = channel(1000);
551        let (node_lookup_send, node_lookup_recv) = channel(10);
552        let (kill_tx, kill_rx) = channel(1);
553        rx.set_kill_switch(kill_rx);
554
555        let mut result = Libp2pNetwork {
556            inner: Arc::new(Libp2pNetworkInner {
557                handle: Arc::new(network_handle),
558                receiver: Mutex::new(receiver),
559                sender: sender.clone(),
560                pk,
561                bootstrap_addrs,
562                is_ready: Arc::new(AtomicBool::new(false)),
563                // This is optimal for 10-30 nodes. TODO: parameterize this for both tests and examples
564                dht_timeout: config.dht_timeout.unwrap_or(Duration::from_secs(120)),
565                is_bootstrapped: Arc::new(AtomicBool::new(false)),
566                metrics,
567                subscribed_topics,
568                node_lookup_send,
569                // Start the latest view from 0. "Latest" refers to "most recent view we are polling for
570                // proposals on". We need this because to have consensus info injected we need a working
571                // network already. In the worst case, we send a few lookups we don't need.
572                latest_seen_view: Arc::new(AtomicU64::new(0)),
573                #[cfg(feature = "hotshot-testing")]
574                reliability_config,
575                kill_switch: kill_tx,
576            }),
577        };
578
579        // Set the network as not ready
580        result.inner.metrics.is_ready.set(0);
581
582        result.handle_event_generator(sender, rx);
583        result.spawn_node_lookup(node_lookup_recv);
584        result.spawn_connect(id, lookup_record_value);
585
586        Ok(result)
587    }
588
589    /// Spawns task for looking up nodes pre-emptively
590    #[allow(clippy::cast_sign_loss, clippy::cast_precision_loss)]
591    fn spawn_node_lookup(
592        &self,
593        mut node_lookup_recv: Receiver<Option<(ViewNumber, T::SignatureKey)>>,
594    ) {
595        let handle = Arc::clone(&self.inner.handle);
596        let dht_timeout = self.inner.dht_timeout;
597        let latest_seen_view = Arc::clone(&self.inner.latest_seen_view);
598
599        // deals with handling lookup queue. should be infallible
600        spawn(async move {
601            // cancels on shutdown
602            while let Some(Some((view_number, pk))) = node_lookup_recv.recv().await {
603                /// defines lookahead threshold based on the constant
604                #[allow(clippy::cast_possible_truncation)]
605                const THRESHOLD: u64 = (LOOK_AHEAD as f64 * 0.8) as u64;
606
607                trace!("Performing lookup for peer {pk}");
608
609                // only run if we are not too close to the next view number
610                if latest_seen_view.load(Ordering::Relaxed) + THRESHOLD <= *view_number {
611                    // look up
612                    if let Err(err) = handle.lookup_node(&pk, dht_timeout).await {
613                        LogEvent::DhtLookupFailure.record();
614                        debug!("Failed to perform lookup for key {pk}: {err}");
615                    };
616                }
617            }
618        });
619    }
620
621    /// Initiates connection to the outside world
622    fn spawn_connect(&mut self, id: usize, lookup_record_value: RecordValue<T::SignatureKey>) {
623        let pk = self.inner.pk.clone();
624        let bootstrap_ref = Arc::clone(&self.inner.bootstrap_addrs);
625        let handle = Arc::clone(&self.inner.handle);
626        let is_bootstrapped = Arc::clone(&self.inner.is_bootstrapped);
627        let inner = Arc::clone(&self.inner);
628
629        spawn({
630            let is_ready = Arc::clone(&self.inner.is_ready);
631            async move {
632                let bs_addrs = bootstrap_ref.read().await.clone();
633
634                // Add known peers to the network
635                handle.add_known_peers(bs_addrs).unwrap();
636
637                // Begin the bootstrap process
638                handle.begin_bootstrap()?;
639                while !is_bootstrapped.load(Ordering::Relaxed) {
640                    sleep(Duration::from_secs(1)).await;
641                    handle.begin_bootstrap()?;
642                }
643
644                // Subscribe to the QC topic
645                handle.subscribe(QC_TOPIC.to_string()).await.unwrap();
646
647                // Map our staking key to our Libp2p Peer ID so we can properly
648                // route direct messages
649                while handle
650                    .put_record(
651                        RecordKey::new(Namespace::Lookup, pk.to_bytes()),
652                        lookup_record_value.clone(),
653                    )
654                    .await
655                    .is_err()
656                {
657                    sleep(Duration::from_secs(1)).await;
658                }
659
660                // Wait for the network to connect to at least 1 peer
661                if let Err(e) = handle.wait_to_connect(1, id).await {
662                    error!("Failed to connect to peers: {e:?}");
663                    return Err::<(), NetworkError>(e);
664                }
665                info!("Connected to required number of peers");
666
667                // Set the network as ready
668                is_ready.store(true, Ordering::Relaxed);
669                inner.metrics.is_ready.set(1);
670
671                Ok::<(), NetworkError>(())
672            }
673        });
674    }
675
676    /// Handle events
677    fn handle_recvd_events(
678        &self,
679        msg: NetworkEvent,
680        sender: &Sender<Vec<u8>>,
681    ) -> Result<(), NetworkError> {
682        match msg {
683            GossipMsg(msg) => {
684                sender.try_send(msg).map_err(|err| {
685                    NetworkError::ChannelSendError(format!("failed to send gossip message: {err}"))
686                })?;
687            },
688            DirectRequest(msg, _pid, chan) => {
689                sender.try_send(msg).map_err(|err| {
690                    NetworkError::ChannelSendError(format!(
691                        "failed to send direct request message: {err}"
692                    ))
693                })?;
694                if self
695                    .inner
696                    .handle
697                    .direct_response(
698                        chan,
699                        &bincode::serialize(&Empty { byte: 0u8 }).map_err(|e| {
700                            NetworkError::FailedToSerialize(format!(
701                                "failed to serialize acknowledgement: {e}"
702                            ))
703                        })?,
704                    )
705                    .is_err()
706                {
707                    error!("failed to ack!");
708                };
709            },
710            DirectResponse(_msg, _) => {},
711            NetworkEvent::IsBootstrapped => {
712                error!(
713                    "handle_recvd_events received `NetworkEvent::IsBootstrapped`, which should be \
714                     impossible."
715                );
716            },
717            NetworkEvent::ConnectedPeersUpdate(_) => {},
718        }
719        Ok::<(), NetworkError>(())
720    }
721
722    /// task to propagate messages to handlers
723    /// terminates on shut down of network
724    fn handle_event_generator(&self, sender: Sender<Vec<u8>>, mut network_rx: NetworkNodeReceiver) {
725        let handle = self.clone();
726        let is_bootstrapped = Arc::clone(&self.inner.is_bootstrapped);
727        spawn(async move {
728            let Some(mut kill_switch) = network_rx.take_kill_switch() else {
729                tracing::error!(
730                    "`spawn_handle` was called on a network handle that was already closed"
731                );
732                return;
733            };
734
735            loop {
736                select! {
737                    msg = network_rx.recv() => {
738                        let Ok(message) = msg else {
739                            warn!("Network receiver shut down!");
740                            return;
741                        };
742
743                        match message {
744                            NetworkEvent::IsBootstrapped => {
745                                is_bootstrapped.store(true, Ordering::Relaxed);
746                            }
747                            GossipMsg(_) | DirectRequest(_, _, _) | DirectResponse(_, _) => {
748                                let _ = handle.handle_recvd_events(message, &sender);
749                            }
750                            NetworkEvent::ConnectedPeersUpdate(num_peers) => {
751                                handle.inner.metrics.num_connected_peers.set(num_peers);
752                            }
753                        }
754                    }
755
756                    _kill_switch = kill_switch.recv() => {
757                        warn!("Event Handler shutdown");
758                        return;
759                    }
760                }
761            }
762        });
763    }
764}
765
766#[async_trait]
767impl<T: NodeType> ConnectedNetwork<T::SignatureKey> for Libp2pNetwork<T> {
768    #[instrument(name = "Libp2pNetwork::ready_blocking", skip_all)]
769    async fn wait_for_ready(&self) {
770        self.wait_for_peers().await;
771    }
772
773    fn pause(&self) {
774        unimplemented!("Pausing not implemented for the Libp2p network");
775    }
776
777    fn resume(&self) {
778        unimplemented!("Resuming not implemented for the Libp2p network");
779    }
780
781    #[instrument(name = "Libp2pNetwork::shut_down", skip_all)]
782    fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
783    where
784        'a: 'b,
785        Self: 'b,
786    {
787        let closure = async move {
788            let _ = self.inner.handle.shutdown().await;
789            let _ = self.inner.node_lookup_send.send(None).await;
790            let _ = self.inner.kill_switch.send(()).await;
791        };
792        boxed_sync(closure)
793    }
794
795    #[instrument(name = "Libp2pNetwork::broadcast_message", skip_all)]
796    async fn broadcast_message(
797        &self,
798        _: ViewNumber,
799        message: Vec<u8>,
800        topic: Topic,
801        _broadcast_delay: BroadcastDelay,
802    ) -> Result<(), NetworkError> {
803        // If we're not ready yet (we don't have any peers, error)
804        if !self.has_peers() {
805            self.inner.metrics.num_failed_messages.add(1);
806            return Err(NetworkError::NoPeersYet);
807        };
808
809        // If we are subscribed to the topic,
810        let topic = topic.to_string();
811        if self.inner.subscribed_topics.contains(&topic) {
812            // Short-circuit-send the message to ourselves
813            self.inner.sender.try_send(message.clone()).map_err(|_| {
814                self.inner.metrics.num_failed_messages.add(1);
815                NetworkError::ShutDown
816            })?;
817        }
818
819        // NOTE: metrics is threadsafe, so clone is fine (and lightweight)
820        #[cfg(feature = "hotshot-testing")]
821        {
822            let metrics = self.inner.metrics.clone();
823            if let Some(config) = &self.inner.reliability_config {
824                let handle = Arc::clone(&self.inner.handle);
825
826                let fut = config.clone().chaos_send_msg(
827                    message,
828                    Arc::new(move |msg: Vec<u8>| {
829                        let topic_2 = topic.clone();
830                        let handle_2 = Arc::clone(&handle);
831                        let metrics_2 = metrics.clone();
832                        boxed_sync(async move {
833                            if let Err(e) = handle_2.gossip_no_serialize(topic_2, msg) {
834                                metrics_2.num_failed_messages.add(1);
835                                warn!("Failed to broadcast to libp2p: {e:?}");
836                            }
837                        })
838                    }),
839                );
840                spawn(fut);
841                return Ok(());
842            }
843        }
844
845        if let Err(e) = self.inner.handle.gossip(topic, &message) {
846            self.inner.metrics.num_failed_messages.add(1);
847            return Err(e);
848        }
849
850        Ok(())
851    }
852
853    #[instrument(name = "Libp2pNetwork::da_broadcast_message", skip_all)]
854    async fn da_broadcast_message(
855        &self,
856        view: ViewNumber,
857        message: Vec<u8>,
858        recipients: Vec<T::SignatureKey>,
859        _broadcast_delay: BroadcastDelay,
860    ) -> Result<(), NetworkError> {
861        // If we're not ready yet (we don't have any peers, error)
862        if !self.has_peers() {
863            self.inner.metrics.num_failed_messages.add(1);
864            return Err(NetworkError::NoPeersYet);
865        };
866
867        // If we are subscribed to the DA topic, send the message to ourselves first
868        let topic = Topic::Da.to_string();
869        if self.inner.subscribed_topics.contains(&topic) {
870            self.inner.sender.try_send(message.clone()).map_err(|_| {
871                self.inner.metrics.num_failed_messages.add(1);
872                NetworkError::ShutDown
873            })?;
874        }
875
876        let future_results = recipients
877            .into_iter()
878            .map(|r| self.direct_message(view, message.clone(), r));
879        let results = join_all(future_results).await;
880
881        let errors: Vec<_> = results.into_iter().filter_map(|r| r.err()).collect();
882
883        if errors.is_empty() {
884            Ok(())
885        } else {
886            Err(NetworkError::Multiple(errors))
887        }
888    }
889
890    #[instrument(name = "Libp2pNetwork::direct_message", skip_all)]
891    async fn direct_message(
892        &self,
893        _: ViewNumber,
894        message: Vec<u8>,
895        recipient: T::SignatureKey,
896    ) -> Result<(), NetworkError> {
897        // If we're not ready yet (we don't have any peers, error)
898        if !self.has_peers() {
899            self.inner.metrics.num_failed_messages.add(1);
900            return Err(NetworkError::NoPeersYet);
901        };
902
903        // short circuit if we're dming ourselves
904        if recipient == self.inner.pk {
905            // panic if we already shut down?
906            self.inner.sender.try_send(message).map_err(|_x| {
907                self.inner.metrics.num_failed_messages.add(1);
908                NetworkError::ShutDown
909            })?;
910            return Ok(());
911        }
912
913        let pid = match self
914            .inner
915            .handle
916            .lookup_node(&recipient, Duration::from_secs(2))
917            .await
918        {
919            Ok(pid) => pid,
920            Err(err) => {
921                self.inner.metrics.num_failed_messages.add(1);
922                return Err(NetworkError::LookupError(format!(
923                    "failed to look up node for direct message: {err}"
924                )));
925            },
926        };
927
928        #[cfg(feature = "hotshot-testing")]
929        {
930            let metrics = self.inner.metrics.clone();
931            if let Some(config) = &self.inner.reliability_config {
932                let handle = Arc::clone(&self.inner.handle);
933
934                let fut = config.clone().chaos_send_msg(
935                    message,
936                    Arc::new(move |msg: Vec<u8>| {
937                        let handle_2 = Arc::clone(&handle);
938                        let metrics_2 = metrics.clone();
939                        boxed_sync(async move {
940                            if let Err(e) = handle_2.direct_request_no_serialize(pid, msg) {
941                                metrics_2.num_failed_messages.add(1);
942                                warn!("Failed to broadcast to libp2p: {e:?}");
943                            }
944                        })
945                    }),
946                );
947                spawn(fut);
948                return Ok(());
949            }
950        }
951
952        match self.inner.handle.direct_request(pid, &message) {
953            Ok(()) => Ok(()),
954            Err(e) => {
955                self.inner.metrics.num_failed_messages.add(1);
956                Err(e)
957            },
958        }
959    }
960
961    /// Receive one or many messages from the underlying network.
962    ///
963    /// # Errors
964    /// If there is a network-related failure.
965    #[instrument(name = "Libp2pNetwork::recv_message", skip_all)]
966    async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
967        let result = self
968            .inner
969            .receiver
970            .lock()
971            .await
972            .recv()
973            .await
974            .ok_or(NetworkError::ShutDown)?;
975
976        Ok(result)
977    }
978
979    #[instrument(name = "Libp2pNetwork::queue_node_lookup", skip_all)]
980    #[allow(clippy::type_complexity)]
981    fn queue_node_lookup(
982        &self,
983        view_number: ViewNumber,
984        pk: T::SignatureKey,
985    ) -> Result<(), TrySendError<Option<(ViewNumber, T::SignatureKey)>>> {
986        self.inner
987            .node_lookup_send
988            .try_send(Some((view_number, pk)))
989    }
990
991    /// The libp2p view update is a special operation intrinsic to its internal behavior.
992    ///
993    /// Libp2p needs to do a lookup because a libp2p address is not related to
994    /// hotshot keys. So in libp2p we store a mapping of HotShot key to libp2p address
995    /// in a distributed hash table.
996    ///
997    /// This means to directly message someone on libp2p we need to lookup in the hash
998    /// table what their libp2p address is, using their HotShot public key as the key.
999    ///
1000    /// So the logic with libp2p is to prefetch upcoming leaders libp2p address to
1001    /// save time when we later need to direct message the leader our vote. Hence the
1002    /// use of the future view and leader to queue the lookups.
1003    async fn update_view<TYPES>(
1004        &self,
1005        view: ViewNumber,
1006        epoch: Option<EpochNumber>,
1007        membership_coordinator: EpochMembershipCoordinator<TYPES>,
1008    ) where
1009        TYPES: NodeType<SignatureKey = T::SignatureKey>,
1010    {
1011        let future_view = ViewNumber::new(*view) + LOOK_AHEAD;
1012        let epoch = epoch.map(|e| EpochNumber::new(*e));
1013
1014        let membership = match membership_coordinator.membership_for_epoch(epoch) {
1015            Ok(m) => m,
1016            Err(e) => {
1017                return tracing::warn!(e.message);
1018            },
1019        };
1020        let future_leader = match membership.leader(future_view) {
1021            Ok(l) => l,
1022            Err(e) => {
1023                return tracing::info!("Failed to calculate leader for view {future_view}: {e}");
1024            },
1025        };
1026
1027        let _ = self
1028            .queue_node_lookup(ViewNumber::new(*future_view), future_leader)
1029            .map_err(|err| tracing::warn!("failed to process node lookup request: {err}"));
1030    }
1031}
1032
1033#[cfg(test)]
1034mod test {
1035    mod derive_multiaddr {
1036        use std::net::Ipv6Addr;
1037
1038        use super::super::*;
1039
1040        /// Test derivation of a valid IPv4 address -> Multiaddr
1041        #[test]
1042        fn test_v4_valid() {
1043            // Derive a multiaddr from a valid IPv4 address
1044            let addr = "1.1.1.1:8080".to_string();
1045            let multiaddr =
1046                derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1047
1048            // Make sure it's the correct (quic) multiaddr
1049            assert_eq!(multiaddr.to_string(), "/ip4/1.1.1.1/udp/8080/quic-v1");
1050        }
1051
1052        /// Test derivation of a valid IPv6 address -> Multiaddr
1053        #[test]
1054        fn test_v6_valid() {
1055            // Derive a multiaddr from a valid IPv6 address
1056            let ipv6_addr = Ipv6Addr::new(1, 2, 3, 4, 5, 6, 7, 8);
1057            let addr = format!("{ipv6_addr}:8080");
1058            let multiaddr =
1059                derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1060
1061            // Make sure it's the correct (quic) multiaddr
1062            assert_eq!(
1063                multiaddr.to_string(),
1064                format!("/ip6/{ipv6_addr}/udp/8080/quic-v1")
1065            );
1066        }
1067
1068        /// Test that an invalid address fails to derive to a Multiaddr
1069        #[test]
1070        fn test_no_port() {
1071            // Derive a multiaddr from an invalid port
1072            let addr = "1.1.1.1".to_string();
1073            let multiaddr = derive_libp2p_multiaddr(&addr);
1074
1075            // Make sure it fails
1076            assert!(multiaddr.is_err());
1077        }
1078
1079        /// Test that an existing domain name resolves to a Multiaddr
1080        #[test]
1081        fn test_fqdn_exists() {
1082            // Derive a multiaddr from a valid FQDN
1083            let addr = "example.com:8080".to_string();
1084            let multiaddr =
1085                derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1086
1087            // Make sure it's the correct (quic) multiaddr
1088            assert_eq!(multiaddr.to_string(), "/dns/example.com/udp/8080/quic-v1");
1089        }
1090
1091        /// Test that a non-existent domain name still resolves to a Multiaddr
1092        #[test]
1093        fn test_fqdn_does_not_exist() {
1094            // Derive a multiaddr from an invalid FQDN
1095            let addr = "libp2p.example.com:8080".to_string();
1096            let multiaddr =
1097                derive_libp2p_multiaddr(&addr).expect("Failed to derive valid multiaddr, {}");
1098
1099            // Make sure it still worked
1100            assert_eq!(
1101                multiaddr.to_string(),
1102                "/dns/libp2p.example.com/udp/8080/quic-v1"
1103            );
1104        }
1105
1106        /// Test that a domain name without a port fails to derive to a Multiaddr
1107        #[test]
1108        fn test_fqdn_no_port() {
1109            // Derive a multiaddr from an invalid port
1110            let addr = "example.com".to_string();
1111            let multiaddr = derive_libp2p_multiaddr(&addr);
1112
1113            // Make sure it fails
1114            assert!(multiaddr.is_err());
1115        }
1116    }
1117}