1#[cfg(feature = "hotshot-testing")]
8use std::{
9 collections::HashMap,
10 sync::atomic::{AtomicBool, Ordering},
11};
12use std::{collections::VecDeque, marker::PhantomData, sync::Arc};
13#[cfg(feature = "hotshot-testing")]
14use std::{path::Path, time::Duration};
15
16use async_trait::async_trait;
17use bincode::config::Options;
18use cdn_broker::reexports::{
19 connection::protocols::{Quic, Tcp},
20 def::{ConnectionDef, RunDef, Topic as TopicTrait, hook::NoMessageHook},
21 discovery::{Embedded, Redis},
22};
23#[cfg(feature = "hotshot-testing")]
24use cdn_broker::{Broker, Config as BrokerConfig};
25pub use cdn_client::reexports::crypto::signature::KeyPair;
26use cdn_client::{
27 Client, Config as ClientConfig,
28 reexports::{
29 crypto::signature::{Serializable, SignatureScheme},
30 message::{Broadcast, Direct, Message as PushCdnMessage},
31 },
32};
33#[cfg(feature = "hotshot-testing")]
34use cdn_marshal::{Config as MarshalConfig, Marshal};
35use hotshot_types::{
36 BoxSyncFuture, boxed_sync,
37 data::ViewNumber,
38 traits::{
39 metrics::{Counter, Metrics, NoMetrics},
40 network::{BroadcastDelay, ConnectedNetwork, Topic as HotShotTopic},
41 signature_key::SignatureKey,
42 },
43 utils::bincode_opts,
44};
45#[cfg(feature = "hotshot-testing")]
46use hotshot_types::{
47 PeerConnectInfo,
48 traits::network::{AsyncGenerator, NetworkReliability, TestableNetworkingImplementation},
49};
50use num_enum::{IntoPrimitive, TryFromPrimitive};
51use parking_lot::Mutex;
52#[cfg(feature = "hotshot-testing")]
53use rand::{RngCore, SeedableRng, rngs::StdRng};
54#[cfg(feature = "hotshot-testing")]
55use test_utils::reserve_tcp_port;
56use tokio::sync::mpsc::error::TrySendError;
57#[cfg(feature = "hotshot-testing")]
58use tokio::{spawn, time::sleep};
59#[cfg(feature = "hotshot-testing")]
60use tracing::error;
61
62use super::NetworkError;
63#[cfg(feature = "hotshot-testing")]
64use crate::NodeType;
65
66#[derive(Clone)]
68pub struct CdnMetricsValue {
69 pub num_failed_messages: Box<dyn Counter>,
71}
72
73impl CdnMetricsValue {
74 pub fn new(metrics: &dyn Metrics) -> Self {
76 let subgroup = metrics.subgroup("cdn".into());
78
79 Self {
81 num_failed_messages: subgroup.create_counter("num_failed_messages".into(), None),
82 }
83 }
84}
85
86impl Default for CdnMetricsValue {
87 fn default() -> Self {
89 Self::new(&*NoMetrics::boxed())
90 }
91}
92
93#[derive(Clone, Eq, PartialEq)]
96pub struct WrappedSignatureKey<T: SignatureKey + 'static>(pub T);
97impl<T: SignatureKey> SignatureScheme for WrappedSignatureKey<T> {
98 type PrivateKey = T::PrivateKey;
99 type PublicKey = Self;
100
101 fn sign(
103 private_key: &Self::PrivateKey,
104 namespace: &str,
105 message: &[u8],
106 ) -> anyhow::Result<Vec<u8>> {
107 let message = [namespace.as_bytes(), message].concat();
109
110 let signature = T::sign(private_key, &message)?;
111 Ok(bincode_opts().serialize(&signature)?)
112 }
113
114 fn verify(
116 public_key: &Self::PublicKey,
117 namespace: &str,
118 message: &[u8],
119 signature: &[u8],
120 ) -> bool {
121 let signature: T::PureAssembledSignatureType = match bincode_opts().deserialize(signature) {
123 Ok(key) => key,
124 Err(_) => return false,
125 };
126
127 let message = [namespace.as_bytes(), message].concat();
129
130 public_key.0.validate(&signature, &message)
131 }
132}
133
134impl<T: SignatureKey> Serializable for WrappedSignatureKey<T> {
137 fn serialize(&self) -> anyhow::Result<Vec<u8>> {
138 Ok(self.0.to_bytes())
139 }
140
141 fn deserialize(serialized: &[u8]) -> anyhow::Result<Self> {
142 Ok(WrappedSignatureKey(T::from_bytes(serialized)?))
143 }
144}
145
146pub struct ProductionDef<K: SignatureKey + 'static>(PhantomData<K>);
149impl<K: SignatureKey + 'static> RunDef for ProductionDef<K> {
150 type User = UserDefQuic<K>;
151 type User2 = UserDefTcp<K>;
152 type Broker = BrokerDef<K>;
153 type DiscoveryClientType = Redis;
154 type Topic = Topic;
155}
156
157pub struct UserDefQuic<K: SignatureKey + 'static>(PhantomData<K>);
161impl<K: SignatureKey + 'static> ConnectionDef for UserDefQuic<K> {
162 type Scheme = WrappedSignatureKey<K>;
163 type Protocol = Quic;
164 type MessageHook = NoMessageHook;
165}
166
167pub struct UserDefTcp<K: SignatureKey + 'static>(PhantomData<K>);
170impl<K: SignatureKey + 'static> ConnectionDef for UserDefTcp<K> {
171 type Scheme = WrappedSignatureKey<K>;
172 type Protocol = Tcp;
173 type MessageHook = NoMessageHook;
174}
175
176pub struct BrokerDef<K: SignatureKey + 'static>(PhantomData<K>);
179impl<K: SignatureKey> ConnectionDef for BrokerDef<K> {
180 type Scheme = WrappedSignatureKey<K>;
181 type Protocol = Tcp;
182 type MessageHook = NoMessageHook;
183}
184
185#[derive(Clone)]
189pub struct ClientDef<K: SignatureKey + 'static>(PhantomData<K>);
190impl<K: SignatureKey> ConnectionDef for ClientDef<K> {
191 type Scheme = WrappedSignatureKey<K>;
192 type Protocol = Tcp;
193 type MessageHook = NoMessageHook;
194}
195
196pub struct TestingDef<K: SignatureKey + 'static>(PhantomData<K>);
199impl<K: SignatureKey + 'static> RunDef for TestingDef<K> {
200 type User = UserDefQuic<K>;
201 type User2 = UserDefTcp<K>;
202 type Broker = BrokerDef<K>;
203 type DiscoveryClientType = Embedded;
204 type Topic = Topic;
205}
206
207#[derive(Clone)]
210pub struct PushCdnNetwork<K: SignatureKey + 'static> {
212 client: Client<ClientDef<K>>,
214 metrics: Arc<CdnMetricsValue>,
216 internal_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
218 public_key: K,
220 #[cfg(feature = "hotshot-testing")]
222 is_paused: Arc<AtomicBool>,
223 }
226
227#[repr(u8)]
229#[derive(IntoPrimitive, TryFromPrimitive, Clone, PartialEq, Eq)]
230pub enum Topic {
231 Global = 0,
233 Da = 1,
235}
236
237impl TopicTrait for Topic {}
240
241impl<K: SignatureKey + 'static> PushCdnNetwork<K> {
242 pub fn new(
249 marshal_endpoint: String,
250 topics: Vec<Topic>,
251 keypair: KeyPair<WrappedSignatureKey<K>>,
252 metrics: CdnMetricsValue,
253 ) -> anyhow::Result<Self> {
254 let config = ClientConfig {
256 endpoint: marshal_endpoint,
257 subscribed_topics: topics.into_iter().map(|t| t as u8).collect(),
258 keypair: keypair.clone(),
259 use_local_authority: true,
260 };
261
262 let client = Client::new(config);
264
265 Ok(Self {
266 client,
267 metrics: Arc::from(metrics),
268 internal_queue: Arc::new(Mutex::new(VecDeque::new())),
269 public_key: keypair.public_key.0,
270 #[cfg(feature = "hotshot-testing")]
272 is_paused: Arc::from(AtomicBool::new(false)),
273 })
274 }
275
276 async fn broadcast_message(&self, message: Vec<u8>, topic: Topic) -> Result<(), NetworkError> {
282 if self
284 .client
285 .subscribed_topics
286 .read()
287 .await
288 .contains(&(topic.clone() as u8))
289 {
290 self.internal_queue.lock().push_back(message.clone());
291 }
292
293 #[cfg(feature = "hotshot-testing")]
295 if self.is_paused.load(Ordering::Relaxed) {
296 return Ok(());
297 }
298
299 if let Err(err) = self
301 .client
302 .send_broadcast_message(vec![topic as u8], message)
303 .await
304 {
305 return Err(NetworkError::MessageReceiveError(format!(
306 "failed to send broadcast message: {err}"
307 )));
308 };
309
310 Ok(())
311 }
312}
313
314#[cfg(feature = "hotshot-testing")]
315impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES>
316 for PushCdnNetwork<TYPES::SignatureKey>
317{
318 #[allow(clippy::too_many_lines)]
321 fn generator(
322 _expected_node_count: usize,
323 _num_bootstrap: usize,
324 _network_id: usize,
325 da_committee_size: usize,
326 _reliability_config: Option<Box<dyn NetworkReliability>>,
327 _secondary_network_delay: Duration,
328 _connect_infos: &mut HashMap<TYPES::SignatureKey, PeerConnectInfo>,
329 ) -> AsyncGenerator<Arc<Self>> {
330 let (broker_public_key, broker_private_key) =
334 TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], 1337);
335
336 let temp_dir = std::env::temp_dir();
338
339 let discovery_endpoint = temp_dir
341 .join(Path::new(&format!(
342 "test-{}.sqlite",
343 StdRng::from_entropy().next_u64()
344 )))
345 .to_string_lossy()
346 .into_owned();
347
348 let public_port_1 = reserve_tcp_port().expect("OS should have ephemeral ports available");
350 let public_port_2 = reserve_tcp_port().expect("OS should have ephemeral ports available");
351 let public_address_1 = format!("127.0.0.1:{public_port_1}");
352 let public_address_2 = format!("127.0.0.1:{public_port_2}");
353
354 for i in 0..2 {
356 let private_port =
358 reserve_tcp_port().expect("OS should have ephemeral ports available");
359
360 let private_address = format!("127.0.0.1:{private_port}");
362 let (public_address, other_public_address) = if i == 0 {
363 (public_address_1.clone(), public_address_2.clone())
364 } else {
365 (public_address_2.clone(), public_address_1.clone())
366 };
367
368 let broker_identifier = format!("{public_address}/{public_address}");
370 let other_broker_identifier = format!("{other_public_address}/{other_public_address}");
371
372 let config: BrokerConfig<TestingDef<TYPES::SignatureKey>> = BrokerConfig {
374 public_advertise_endpoint: public_address.clone(),
375 public_bind_endpoint: public_address,
376 private_advertise_endpoint: private_address.clone(),
377 private_bind_endpoint: private_address,
378 metrics_bind_endpoint: None,
379 keypair: KeyPair {
380 public_key: WrappedSignatureKey(broker_public_key.clone()),
381 private_key: broker_private_key.clone(),
382 },
383 discovery_endpoint: discovery_endpoint.clone(),
384
385 user_message_hook: NoMessageHook,
386 broker_message_hook: NoMessageHook,
387
388 ca_cert_path: None,
389 ca_key_path: None,
390 global_memory_pool_size: Some(1024 * 1024 * 1024),
392 };
393
394 spawn(async move {
396 let broker: Broker<TestingDef<TYPES::SignatureKey>> =
397 Broker::new(config).await.expect("broker failed to start");
398
399 if other_broker_identifier > broker_identifier {
402 sleep(Duration::from_secs(2)).await;
403 }
404
405 if let Err(err) = broker.start().await {
407 error!("broker stopped: {err}");
408 }
409 });
410 }
411
412 let marshal_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
414
415 let marshal_endpoint = format!("127.0.0.1:{marshal_port}");
417 let marshal_config = MarshalConfig {
418 bind_endpoint: marshal_endpoint.clone(),
419 discovery_endpoint,
420 metrics_bind_endpoint: None,
421 ca_cert_path: None,
422 ca_key_path: None,
423 global_memory_pool_size: Some(1024 * 1024 * 1024),
425 };
426
427 spawn(async move {
429 let marshal: Marshal<TestingDef<TYPES::SignatureKey>> = Marshal::new(marshal_config)
430 .await
431 .expect("failed to spawn marshal");
432
433 if let Err(err) = marshal.start().await {
435 error!("marshal stopped: {err}");
436 }
437 });
438
439 Box::pin({
441 move |node_id| {
442 let marshal_endpoint = marshal_endpoint.clone();
444
445 Box::pin(async move {
446 let private_key =
448 TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], node_id).1;
449 let public_key = TYPES::SignatureKey::from_private(&private_key);
450
451 let topics = if node_id < da_committee_size as u64 {
453 vec![Topic::Da as u8, Topic::Global as u8]
454 } else {
455 vec![Topic::Global as u8]
456 };
457
458 let client_config: ClientConfig<ClientDef<TYPES::SignatureKey>> =
460 ClientConfig {
461 keypair: KeyPair {
462 public_key: WrappedSignatureKey(public_key.clone()),
463 private_key,
464 },
465 subscribed_topics: topics,
466 endpoint: marshal_endpoint,
467 use_local_authority: true,
468 };
469
470 Arc::new(PushCdnNetwork {
472 client: Client::new(client_config),
473 metrics: Arc::new(CdnMetricsValue::default()),
474 internal_queue: Arc::new(Mutex::new(VecDeque::new())),
475 public_key,
476 #[cfg(feature = "hotshot-testing")]
477 is_paused: Arc::from(AtomicBool::new(false)),
478 })
479 })
480 }
481 })
482 }
483
484 fn in_flight_message_count(&self) -> Option<usize> {
486 None
487 }
488}
489
490#[async_trait]
491impl<K: SignatureKey + 'static> ConnectedNetwork<K> for PushCdnNetwork<K> {
492 fn pause(&self) {
494 #[cfg(feature = "hotshot-testing")]
495 self.is_paused.store(true, Ordering::Relaxed);
496 }
497
498 fn resume(&self) {
500 #[cfg(feature = "hotshot-testing")]
501 self.is_paused.store(false, Ordering::Relaxed);
502 }
503
504 async fn wait_for_ready(&self) {
506 let _ = self.client.ensure_initialized().await;
507 }
508
509 fn shut_down<'a, 'b>(&'a self) -> BoxSyncFuture<'b, ()>
511 where
512 'a: 'b,
513 Self: 'b,
514 {
515 boxed_sync(async move { self.client.close().await })
516 }
517
518 async fn broadcast_message(
524 &self,
525 _: ViewNumber,
526 message: Vec<u8>,
527 topic: HotShotTopic,
528 _broadcast_delay: BroadcastDelay,
529 ) -> Result<(), NetworkError> {
530 #[cfg(feature = "hotshot-testing")]
532 if self.is_paused.load(Ordering::Relaxed) {
533 return Ok(());
534 }
535
536 self.broadcast_message(message, topic.into())
538 .await
539 .inspect_err(|_e| {
540 self.metrics.num_failed_messages.add(1);
541 })
542 }
543
544 async fn da_broadcast_message(
550 &self,
551 _: ViewNumber,
552 message: Vec<u8>,
553 _recipients: Vec<K>,
554 _broadcast_delay: BroadcastDelay,
555 ) -> Result<(), NetworkError> {
556 #[cfg(feature = "hotshot-testing")]
558 if self.is_paused.load(Ordering::Relaxed) {
559 return Ok(());
560 }
561
562 self.broadcast_message(message, Topic::Da)
564 .await
565 .inspect_err(|_e| {
566 self.metrics.num_failed_messages.add(1);
567 })
568 }
569
570 async fn direct_message(
575 &self,
576 _: ViewNumber,
577 message: Vec<u8>,
578 recipient: K,
579 ) -> Result<(), NetworkError> {
580 if recipient == self.public_key {
582 self.internal_queue.lock().push_back(message);
583 return Ok(());
584 }
585
586 #[cfg(feature = "hotshot-testing")]
588 if self.is_paused.load(Ordering::Relaxed) {
589 return Ok(());
590 }
591
592 if let Err(e) = self
594 .client
595 .send_direct_message(&WrappedSignatureKey(recipient), message)
596 .await
597 {
598 self.metrics.num_failed_messages.add(1);
599 return Err(NetworkError::MessageSendError(format!(
600 "failed to send direct message: {e}"
601 )));
602 };
603
604 Ok(())
605 }
606
607 async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
613 let queued_message = self.internal_queue.lock().pop_front();
615 if let Some(message) = queued_message {
616 return Ok(message);
617 }
618
619 let message = self.client.receive_message().await;
621
622 #[cfg(feature = "hotshot-testing")]
624 if self.is_paused.load(Ordering::Relaxed) {
625 sleep(Duration::from_millis(100)).await;
626 return Ok(vec![]);
627 }
628
629 let message = match message {
631 Ok(message) => message,
632 Err(error) => {
633 return Err(NetworkError::MessageReceiveError(format!(
634 "failed to receive message: {error}"
635 )));
636 },
637 };
638
639 let (PushCdnMessage::Broadcast(Broadcast { message, topics: _ })
641 | PushCdnMessage::Direct(Direct {
642 message,
643 recipient: _,
644 })) = message
645 else {
646 return Ok(vec![]);
647 };
648
649 Ok(message)
650 }
651
652 fn queue_node_lookup(
654 &self,
655 _view_number: ViewNumber,
656 _pk: K,
657 ) -> Result<(), TrySendError<Option<(ViewNumber, K)>>> {
658 Ok(())
659 }
660}
661
662impl From<HotShotTopic> for Topic {
663 fn from(topic: HotShotTopic) -> Self {
664 match topic {
665 HotShotTopic::Global => Topic::Global,
666 HotShotTopic::Da => Topic::Da,
667 }
668 }
669}