Skip to main content

hotshot/traits/networking/
push_cdn_network.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7#[cfg(feature = "hotshot-testing")]
8use std::{
9    collections::HashMap,
10    sync::atomic::{AtomicBool, Ordering},
11};
12use std::{collections::VecDeque, marker::PhantomData, sync::Arc};
13#[cfg(feature = "hotshot-testing")]
14use std::{path::Path, time::Duration};
15
16use async_trait::async_trait;
17use bincode::config::Options;
18use cdn_broker::reexports::{
19    connection::protocols::{Quic, Tcp},
20    def::{ConnectionDef, RunDef, Topic as TopicTrait, hook::NoMessageHook},
21    discovery::{Embedded, Redis},
22};
23#[cfg(feature = "hotshot-testing")]
24use cdn_broker::{Broker, Config as BrokerConfig};
25pub use cdn_client::reexports::crypto::signature::KeyPair;
26use cdn_client::{
27    Client, Config as ClientConfig,
28    reexports::{
29        crypto::signature::{Serializable, SignatureScheme},
30        message::{Broadcast, Direct, Message as PushCdnMessage},
31    },
32};
33#[cfg(feature = "hotshot-testing")]
34use cdn_marshal::{Config as MarshalConfig, Marshal};
35use hotshot_types::{
36    BoxSyncFuture, boxed_sync,
37    data::ViewNumber,
38    traits::{
39        metrics::{Counter, Metrics, NoMetrics},
40        network::{BroadcastDelay, ConnectedNetwork, Topic as HotShotTopic},
41        signature_key::SignatureKey,
42    },
43    utils::bincode_opts,
44};
45#[cfg(feature = "hotshot-testing")]
46use hotshot_types::{
47    PeerConnectInfo,
48    traits::network::{AsyncGenerator, NetworkReliability, TestableNetworkingImplementation},
49};
50use num_enum::{IntoPrimitive, TryFromPrimitive};
51use parking_lot::Mutex;
52#[cfg(feature = "hotshot-testing")]
53use rand::{RngCore, SeedableRng, rngs::StdRng};
54#[cfg(feature = "hotshot-testing")]
55use test_utils::reserve_tcp_port;
56use tokio::sync::mpsc::error::TrySendError;
57#[cfg(feature = "hotshot-testing")]
58use tokio::{spawn, time::sleep};
59#[cfg(feature = "hotshot-testing")]
60use tracing::error;
61
62use super::NetworkError;
63#[cfg(feature = "hotshot-testing")]
64use crate::NodeType;
65
66/// CDN-specific metrics
67#[derive(Clone)]
68pub struct CdnMetricsValue {
69    /// The number of failed messages
70    pub num_failed_messages: Box<dyn Counter>,
71}
72
73impl CdnMetricsValue {
74    /// Populate the metrics with the CDN-specific ones
75    pub fn new(metrics: &dyn Metrics) -> Self {
76        // Create a subgroup for the CDN
77        let subgroup = metrics.subgroup("cdn".into());
78
79        // Create the CDN-specific metrics
80        Self {
81            num_failed_messages: subgroup.create_counter("num_failed_messages".into(), None),
82        }
83    }
84}
85
86impl Default for CdnMetricsValue {
87    // The default is empty metrics
88    fn default() -> Self {
89        Self::new(&*NoMetrics::boxed())
90    }
91}
92
93/// A wrapped `SignatureKey`. We need to implement the Push CDN's `SignatureScheme`
94/// trait in order to sign and verify messages to/from the CDN.
95#[derive(Clone, Eq, PartialEq)]
96pub struct WrappedSignatureKey<T: SignatureKey + 'static>(pub T);
97impl<T: SignatureKey> SignatureScheme for WrappedSignatureKey<T> {
98    type PrivateKey = T::PrivateKey;
99    type PublicKey = Self;
100
101    /// Sign a message of arbitrary data and return the serialized signature
102    fn sign(
103        private_key: &Self::PrivateKey,
104        namespace: &str,
105        message: &[u8],
106    ) -> anyhow::Result<Vec<u8>> {
107        // Combine the namespace and message into a single byte array
108        let message = [namespace.as_bytes(), message].concat();
109
110        let signature = T::sign(private_key, &message)?;
111        Ok(bincode_opts().serialize(&signature)?)
112    }
113
114    /// Verify a message of arbitrary data and return the result
115    fn verify(
116        public_key: &Self::PublicKey,
117        namespace: &str,
118        message: &[u8],
119        signature: &[u8],
120    ) -> bool {
121        // Deserialize the signature
122        let signature: T::PureAssembledSignatureType = match bincode_opts().deserialize(signature) {
123            Ok(key) => key,
124            Err(_) => return false,
125        };
126
127        // Combine the namespace and message into a single byte array
128        let message = [namespace.as_bytes(), message].concat();
129
130        public_key.0.validate(&signature, &message)
131    }
132}
133
134/// We need to implement the `Serializable` so the Push CDN can serialize the signatures
135/// and public keys and send them over the wire.
136impl<T: SignatureKey> Serializable for WrappedSignatureKey<T> {
137    fn serialize(&self) -> anyhow::Result<Vec<u8>> {
138        Ok(self.0.to_bytes())
139    }
140
141    fn deserialize(serialized: &[u8]) -> anyhow::Result<Self> {
142        Ok(WrappedSignatureKey(T::from_bytes(serialized)?))
143    }
144}
145
146/// The production run definition for the Push CDN.
147/// Uses the real protocols and a Redis discovery client.
148pub struct ProductionDef<K: SignatureKey + 'static>(PhantomData<K>);
149impl<K: SignatureKey + 'static> RunDef for ProductionDef<K> {
150    type User = UserDefQuic<K>;
151    type User2 = UserDefTcp<K>;
152    type Broker = BrokerDef<K>;
153    type DiscoveryClientType = Redis;
154    type Topic = Topic;
155}
156
157/// The user definition for the Push CDN.
158/// Uses the Quic protocol and untrusted middleware.
159/// RM TODO: Remove this, switching to TCP singularly when everyone has updated
160pub struct UserDefQuic<K: SignatureKey + 'static>(PhantomData<K>);
161impl<K: SignatureKey + 'static> ConnectionDef for UserDefQuic<K> {
162    type Scheme = WrappedSignatureKey<K>;
163    type Protocol = Quic;
164    type MessageHook = NoMessageHook;
165}
166
167/// The user definition for the Push CDN.
168/// Uses the TCP protocol and untrusted middleware.
169pub struct UserDefTcp<K: SignatureKey + 'static>(PhantomData<K>);
170impl<K: SignatureKey + 'static> ConnectionDef for UserDefTcp<K> {
171    type Scheme = WrappedSignatureKey<K>;
172    type Protocol = Tcp;
173    type MessageHook = NoMessageHook;
174}
175
176/// The broker definition for the Push CDN.
177/// Uses the TCP protocol and trusted middleware.
178pub struct BrokerDef<K: SignatureKey + 'static>(PhantomData<K>);
179impl<K: SignatureKey> ConnectionDef for BrokerDef<K> {
180    type Scheme = WrappedSignatureKey<K>;
181    type Protocol = Tcp;
182    type MessageHook = NoMessageHook;
183}
184
185/// The client definition for the Push CDN. Uses the TCP+TLS
186/// protocol and no middleware. Differs from the user
187/// definition in that is on the client-side.
188#[derive(Clone)]
189pub struct ClientDef<K: SignatureKey + 'static>(PhantomData<K>);
190impl<K: SignatureKey> ConnectionDef for ClientDef<K> {
191    type Scheme = WrappedSignatureKey<K>;
192    type Protocol = Tcp;
193    type MessageHook = NoMessageHook;
194}
195
196/// The testing run definition for the Push CDN.
197/// Uses the real protocols, but with an embedded discovery client.
198pub struct TestingDef<K: SignatureKey + 'static>(PhantomData<K>);
199impl<K: SignatureKey + 'static> RunDef for TestingDef<K> {
200    type User = UserDefQuic<K>;
201    type User2 = UserDefTcp<K>;
202    type Broker = BrokerDef<K>;
203    type DiscoveryClientType = Embedded;
204    type Topic = Topic;
205}
206
207/// A communication channel to the Push CDN, which is a collection of brokers and a marshal
208/// that helps organize them all.
209#[derive(Clone)]
210/// Is generic over both the type of key and the network protocol.
211pub struct PushCdnNetwork<K: SignatureKey + 'static> {
212    /// The underlying client
213    client: Client<ClientDef<K>>,
214    /// The CDN-specific metrics
215    metrics: Arc<CdnMetricsValue>,
216    /// The internal queue for messages to ourselves
217    internal_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
218    /// The public key of this node
219    public_key: K,
220    /// Whether or not the underlying network is supposed to be paused
221    #[cfg(feature = "hotshot-testing")]
222    is_paused: Arc<AtomicBool>,
223    // The receiver channel for
224    // request_receiver_channel: TakeReceiver,
225}
226
227/// The enum for the topics we can subscribe to in the Push CDN
228#[repr(u8)]
229#[derive(IntoPrimitive, TryFromPrimitive, Clone, PartialEq, Eq)]
230pub enum Topic {
231    /// The global topic
232    Global = 0,
233    /// The DA topic
234    Da = 1,
235}
236
237/// Implement the `TopicTrait` for our `Topic` enum. We need this to filter
238/// topics that are not implemented at the application level.
239impl TopicTrait for Topic {}
240
241impl<K: SignatureKey + 'static> PushCdnNetwork<K> {
242    /// Create a new `PushCdnNetwork` (really a client) from a marshal endpoint, a list of initial
243    /// topics we are interested in, and our wrapped keypair that we use to authenticate with the
244    /// marshal.
245    ///
246    /// # Errors
247    /// If we fail to build the config
248    pub fn new(
249        marshal_endpoint: String,
250        topics: Vec<Topic>,
251        keypair: KeyPair<WrappedSignatureKey<K>>,
252        metrics: CdnMetricsValue,
253    ) -> anyhow::Result<Self> {
254        // Build config
255        let config = ClientConfig {
256            endpoint: marshal_endpoint,
257            subscribed_topics: topics.into_iter().map(|t| t as u8).collect(),
258            keypair: keypair.clone(),
259            use_local_authority: true,
260        };
261
262        // Create the client from the config
263        let client = Client::new(config);
264
265        Ok(Self {
266            client,
267            metrics: Arc::from(metrics),
268            internal_queue: Arc::new(Mutex::new(VecDeque::new())),
269            public_key: keypair.public_key.0,
270            // Start unpaused
271            #[cfg(feature = "hotshot-testing")]
272            is_paused: Arc::from(AtomicBool::new(false)),
273        })
274    }
275
276    /// Broadcast a message to members of the particular topic. Does not retry.
277    ///
278    /// # Errors
279    /// - If we fail to serialize the message
280    /// - If we fail to send the broadcast message.
281    async fn broadcast_message(&self, message: Vec<u8>, topic: Topic) -> Result<(), NetworkError> {
282        // If the message should also go to us, add it to the internal queue
283        if self
284            .client
285            .subscribed_topics
286            .read()
287            .await
288            .contains(&(topic.clone() as u8))
289        {
290            self.internal_queue.lock().push_back(message.clone());
291        }
292
293        // If we're paused, don't send the message
294        #[cfg(feature = "hotshot-testing")]
295        if self.is_paused.load(Ordering::Relaxed) {
296            return Ok(());
297        }
298
299        // Send the message
300        if let Err(err) = self
301            .client
302            .send_broadcast_message(vec![topic as u8], message)
303            .await
304        {
305            return Err(NetworkError::MessageReceiveError(format!(
306                "failed to send broadcast message: {err}"
307            )));
308        };
309
310        Ok(())
311    }
312}
313
314#[cfg(feature = "hotshot-testing")]
315impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES>
316    for PushCdnNetwork<TYPES::SignatureKey>
317{
318    /// Generate n Push CDN clients, a marshal, and two brokers (that run locally).
319    /// Uses a `SQLite` database instead of Redis.
320    #[allow(clippy::too_many_lines)]
321    fn generator(
322        _expected_node_count: usize,
323        _num_bootstrap: usize,
324        _network_id: usize,
325        da_committee_size: usize,
326        _reliability_config: Option<Box<dyn NetworkReliability>>,
327        _secondary_network_delay: Duration,
328        _connect_infos: &mut HashMap<TYPES::SignatureKey, PeerConnectInfo>,
329    ) -> AsyncGenerator<Arc<Self>> {
330        // The configuration we are using for testing is 2 brokers & 1 marshal
331
332        // A keypair shared between brokers
333        let (broker_public_key, broker_private_key) =
334            TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], 1337);
335
336        // Get the OS temporary directory
337        let temp_dir = std::env::temp_dir();
338
339        // Create an SQLite file inside of the temporary directory
340        let discovery_endpoint = temp_dir
341            .join(Path::new(&format!(
342                "test-{}.sqlite",
343                StdRng::from_entropy().next_u64()
344            )))
345            .to_string_lossy()
346            .into_owned();
347
348        // Atomically bind to unused public ports
349        let public_port_1 = reserve_tcp_port().expect("OS should have ephemeral ports available");
350        let public_port_2 = reserve_tcp_port().expect("OS should have ephemeral ports available");
351        let public_address_1 = format!("127.0.0.1:{public_port_1}");
352        let public_address_2 = format!("127.0.0.1:{public_port_2}");
353
354        // 2 brokers
355        for i in 0..2 {
356            // Atomically bind to a private port
357            let private_port =
358                reserve_tcp_port().expect("OS should have ephemeral ports available");
359
360            // Extrapolate addresses
361            let private_address = format!("127.0.0.1:{private_port}");
362            let (public_address, other_public_address) = if i == 0 {
363                (public_address_1.clone(), public_address_2.clone())
364            } else {
365                (public_address_2.clone(), public_address_1.clone())
366            };
367
368            // Calculate the broker identifiers
369            let broker_identifier = format!("{public_address}/{public_address}");
370            let other_broker_identifier = format!("{other_public_address}/{other_public_address}");
371
372            // Configure the broker
373            let config: BrokerConfig<TestingDef<TYPES::SignatureKey>> = BrokerConfig {
374                public_advertise_endpoint: public_address.clone(),
375                public_bind_endpoint: public_address,
376                private_advertise_endpoint: private_address.clone(),
377                private_bind_endpoint: private_address,
378                metrics_bind_endpoint: None,
379                keypair: KeyPair {
380                    public_key: WrappedSignatureKey(broker_public_key.clone()),
381                    private_key: broker_private_key.clone(),
382                },
383                discovery_endpoint: discovery_endpoint.clone(),
384
385                user_message_hook: NoMessageHook,
386                broker_message_hook: NoMessageHook,
387
388                ca_cert_path: None,
389                ca_key_path: None,
390                // 1GB
391                global_memory_pool_size: Some(1024 * 1024 * 1024),
392            };
393
394            // Create and spawn the broker
395            spawn(async move {
396                let broker: Broker<TestingDef<TYPES::SignatureKey>> =
397                    Broker::new(config).await.expect("broker failed to start");
398
399                // If we are the first broker by identifier, we need to sleep a bit
400                // for discovery to happen first
401                if other_broker_identifier > broker_identifier {
402                    sleep(Duration::from_secs(2)).await;
403                }
404
405                // Error if we stopped unexpectedly
406                if let Err(err) = broker.start().await {
407                    error!("broker stopped: {err}");
408                }
409            });
410        }
411
412        // Atomically bind to an available port for the marshal
413        let marshal_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
414
415        // Configure the marshal
416        let marshal_endpoint = format!("127.0.0.1:{marshal_port}");
417        let marshal_config = MarshalConfig {
418            bind_endpoint: marshal_endpoint.clone(),
419            discovery_endpoint,
420            metrics_bind_endpoint: None,
421            ca_cert_path: None,
422            ca_key_path: None,
423            // 1GB
424            global_memory_pool_size: Some(1024 * 1024 * 1024),
425        };
426
427        // Spawn the marshal
428        spawn(async move {
429            let marshal: Marshal<TestingDef<TYPES::SignatureKey>> = Marshal::new(marshal_config)
430                .await
431                .expect("failed to spawn marshal");
432
433            // Error if we stopped unexpectedly
434            if let Err(err) = marshal.start().await {
435                error!("marshal stopped: {err}");
436            }
437        });
438
439        // This function is called for each client we spawn
440        Box::pin({
441            move |node_id| {
442                // Clone this so we can pin the future
443                let marshal_endpoint = marshal_endpoint.clone();
444
445                Box::pin(async move {
446                    // Derive our public and priate keys from our index
447                    let private_key =
448                        TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], node_id).1;
449                    let public_key = TYPES::SignatureKey::from_private(&private_key);
450
451                    // Calculate if we're DA or not
452                    let topics = if node_id < da_committee_size as u64 {
453                        vec![Topic::Da as u8, Topic::Global as u8]
454                    } else {
455                        vec![Topic::Global as u8]
456                    };
457
458                    // Configure our client
459                    let client_config: ClientConfig<ClientDef<TYPES::SignatureKey>> =
460                        ClientConfig {
461                            keypair: KeyPair {
462                                public_key: WrappedSignatureKey(public_key.clone()),
463                                private_key,
464                            },
465                            subscribed_topics: topics,
466                            endpoint: marshal_endpoint,
467                            use_local_authority: true,
468                        };
469
470                    // Create our client
471                    Arc::new(PushCdnNetwork {
472                        client: Client::new(client_config),
473                        metrics: Arc::new(CdnMetricsValue::default()),
474                        internal_queue: Arc::new(Mutex::new(VecDeque::new())),
475                        public_key,
476                        #[cfg(feature = "hotshot-testing")]
477                        is_paused: Arc::from(AtomicBool::new(false)),
478                    })
479                })
480            }
481        })
482    }
483
484    /// The PushCDN does not support in-flight message counts
485    fn in_flight_message_count(&self) -> Option<usize> {
486        None
487    }
488}
489
490#[async_trait]
491impl<K: SignatureKey + 'static> ConnectedNetwork<K> for PushCdnNetwork<K> {
492    /// Pause sending and receiving on the PushCDN network.
493    fn pause(&self) {
494        #[cfg(feature = "hotshot-testing")]
495        self.is_paused.store(true, Ordering::Relaxed);
496    }
497
498    /// Resume sending and receiving on the PushCDN network.
499    fn resume(&self) {
500        #[cfg(feature = "hotshot-testing")]
501        self.is_paused.store(false, Ordering::Relaxed);
502    }
503
504    /// Wait for the client to initialize the connection
505    async fn wait_for_ready(&self) {
506        let _ = self.client.ensure_initialized().await;
507    }
508
509    /// TODO: shut down the networks. Unneeded for testing.
510    fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
511    where
512        'a: 'b,
513        Self: 'b,
514    {
515        boxed_sync(async move { self.client.close().await })
516    }
517
518    /// Broadcast a message to all members of the quorum.
519    ///
520    /// # Errors
521    /// - If we fail to serialize the message
522    /// - If we fail to send the broadcast message.
523    async fn broadcast_message(
524        &self,
525        _: ViewNumber,
526        message: Vec<u8>,
527        topic: HotShotTopic,
528        _broadcast_delay: BroadcastDelay,
529    ) -> Result<(), NetworkError> {
530        // If we're paused, don't send the message
531        #[cfg(feature = "hotshot-testing")]
532        if self.is_paused.load(Ordering::Relaxed) {
533            return Ok(());
534        }
535
536        // Broadcast the message
537        self.broadcast_message(message, topic.into())
538            .await
539            .inspect_err(|_e| {
540                self.metrics.num_failed_messages.add(1);
541            })
542    }
543
544    /// Broadcast a message to all members of the DA committee.
545    ///
546    /// # Errors
547    /// - If we fail to serialize the message
548    /// - If we fail to send the broadcast message.
549    async fn da_broadcast_message(
550        &self,
551        _: ViewNumber,
552        message: Vec<u8>,
553        _recipients: Vec<K>,
554        _broadcast_delay: BroadcastDelay,
555    ) -> Result<(), NetworkError> {
556        // If we're paused, don't send the message
557        #[cfg(feature = "hotshot-testing")]
558        if self.is_paused.load(Ordering::Relaxed) {
559            return Ok(());
560        }
561
562        // Broadcast the message
563        self.broadcast_message(message, Topic::Da)
564            .await
565            .inspect_err(|_e| {
566                self.metrics.num_failed_messages.add(1);
567            })
568    }
569
570    /// Send a direct message to a node with a particular key. Does not retry.
571    ///
572    /// - If we fail to serialize the message
573    /// - If we fail to send the direct message
574    async fn direct_message(
575        &self,
576        _: ViewNumber,
577        message: Vec<u8>,
578        recipient: K,
579    ) -> Result<(), NetworkError> {
580        // If the message is to ourselves, just add it to the internal queue
581        if recipient == self.public_key {
582            self.internal_queue.lock().push_back(message);
583            return Ok(());
584        }
585
586        // If we're paused, don't send the message
587        #[cfg(feature = "hotshot-testing")]
588        if self.is_paused.load(Ordering::Relaxed) {
589            return Ok(());
590        }
591
592        // Send the message
593        if let Err(e) = self
594            .client
595            .send_direct_message(&WrappedSignatureKey(recipient), message)
596            .await
597        {
598            self.metrics.num_failed_messages.add(1);
599            return Err(NetworkError::MessageSendError(format!(
600                "failed to send direct message: {e}"
601            )));
602        };
603
604        Ok(())
605    }
606
607    /// Receive a message. Is agnostic over `transmit_type`, which has an issue
608    /// to be removed anyway.
609    ///
610    /// # Errors
611    /// - If we fail to receive messages. Will trigger a retry automatically.
612    async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
613        // If we have a message in the internal queue, return it
614        let queued_message = self.internal_queue.lock().pop_front();
615        if let Some(message) = queued_message {
616            return Ok(message);
617        }
618
619        // Receive a message from the network
620        let message = self.client.receive_message().await;
621
622        // If we're paused, receive but don't process messages
623        #[cfg(feature = "hotshot-testing")]
624        if self.is_paused.load(Ordering::Relaxed) {
625            sleep(Duration::from_millis(100)).await;
626            return Ok(vec![]);
627        }
628
629        // If it was an error, wait a bit and retry
630        let message = match message {
631            Ok(message) => message,
632            Err(error) => {
633                return Err(NetworkError::MessageReceiveError(format!(
634                    "failed to receive message: {error}"
635                )));
636            },
637        };
638
639        // Extract the underlying message
640        let (PushCdnMessage::Broadcast(Broadcast { message, topics: _ })
641        | PushCdnMessage::Direct(Direct {
642            message,
643            recipient: _,
644        })) = message
645        else {
646            return Ok(vec![]);
647        };
648
649        Ok(message)
650    }
651
652    /// Do nothing here, as we don't need to look up nodes.
653    fn queue_node_lookup(
654        &self,
655        _view_number: ViewNumber,
656        _pk: K,
657    ) -> Result<(), TrySendError<Option<(ViewNumber, K)>>> {
658        Ok(())
659    }
660}
661
662impl From<HotShotTopic> for Topic {
663    fn from(topic: HotShotTopic) -> Self {
664        match topic {
665            HotShotTopic::Global => Topic::Global,
666            HotShotTopic::Da => Topic::Da,
667        }
668    }
669}