1use std::collections::{HashMap, HashSet};
2
3pub use cliquenet::Config as CliquenetConfig;
4use cliquenet::{
5 NetAddr, NetworkError as CliquenetError, Role, Slot, noise::Protocol, x25519::PublicKey,
6};
7use hotshot_types::{
8 PeerConnectInfo,
9 data::{EpochNumber, ViewNumber},
10 epoch_membership::EpochMembershipCoordinator,
11 message::{EXTERNAL_MESSAGE_VERSION, MessageKind, UpgradeLock},
12 traits::{
13 metrics::{Counter, CounterFamily, Gauge, GaugeFamily, Metrics},
14 node_implementation::NodeType,
15 },
16 x25519::Keypair,
17};
18use parking_lot::RwLock;
19use tracing::{error, info};
20
21use crate::{
22 message::{Message, MessageType, Unchecked, Validated},
23 network::{Network, NetworkError, PeerRole},
24};
25
26pub struct Cliquenet<T: NodeType> {
27 my_keys: (T::SignatureKey, PublicKey),
28 inner: cliquenet::Network,
29 peers: HashMap<T::SignatureKey, PeerConnectInfo>,
30 epoch: EpochNumber,
31 upgrade_lock: UpgradeLock<T>,
32}
33
34impl<T: NodeType> Cliquenet<T> {
35 pub async fn create<A, P>(
36 name: &str,
37 signing_key: T::SignatureKey,
38 keypair: Keypair,
39 addr: A,
40 parties: P,
41 upgrade_lock: UpgradeLock<T>,
42 metrics: Box<dyn Metrics>,
43 ) -> Result<Self, NetworkError>
44 where
45 A: Into<cliquenet::NetAddr>,
46 P: IntoIterator<Item = (T::SignatureKey, PeerConnectInfo)>,
47 {
48 let parties: HashMap<T::SignatureKey, PeerConnectInfo> = parties.into_iter().collect();
49
50 let cfg = cliquenet::Config::builder()
51 .name(name)
52 .keypair(keypair.into())
53 .bind(addr.into())
54 .parties(
55 parties
56 .values()
57 .map(|info| (info.x25519_key.into(), info.p2p_addr.clone())),
58 )
59 .noise_protocols([(1.into(), Protocol::IK_25519_AesGcm_Blake2s)])
60 .build();
61
62 Self::create_with_config(signing_key, upgrade_lock, cfg, parties, metrics).await
63 }
64
65 pub(crate) async fn create_with_config<P>(
66 signing_key: T::SignatureKey,
67 upgrade_lock: UpgradeLock<T>,
68 config: cliquenet::Config,
69 parties: P,
70 metrics: Box<dyn Metrics>,
71 ) -> Result<Self, NetworkError>
72 where
73 P: IntoIterator<Item = (T::SignatureKey, PeerConnectInfo)>,
74 {
75 let public_key = config.public_key();
76 let metrics = CliquenetMetrics::new(metrics);
77 let network = cliquenet::Network::create(config.with_metrics(metrics))
78 .await
79 .map_err(to_network_error)?;
80
81 let peers: HashMap<_, _> = parties.into_iter().collect();
82
83 info!(peers = %peers.len(), "cliquenet created");
84
85 Ok(Self {
86 my_keys: (signing_key, public_key),
87 inner: network,
88 peers,
89 epoch: EpochNumber::new(0),
90 upgrade_lock,
91 })
92 }
93}
94
95impl<T: NodeType> Network<T> for Cliquenet<T> {
96 type PeerData = (PublicKey, NetAddr);
97
98 fn unicast(
99 &mut self,
100 v: ViewNumber,
101 to: &T::SignatureKey,
102 m: &Message<T, Validated>,
103 ) -> Result<(), NetworkError> {
104 let target = if *to == self.my_keys.0 {
105 self.my_keys.1
106 } else if let Some(info) = self.peers.get(to) {
107 info.x25519_key.into()
108 } else {
109 error!(peer = %to, "unicast target not found");
110 return Ok(());
111 };
112 let bytes = self.serialize(m)?;
113 self.inner
114 .unicast(Slot::new(*v), target, bytes)
115 .map_err(to_network_error)?;
116 Ok(())
117 }
118
119 fn multicast(
120 &mut self,
121 v: ViewNumber,
122 to: Vec<&T::SignatureKey>,
123 m: &Message<T, Validated>,
124 ) -> Result<(), NetworkError> {
125 let bytes = self.serialize(m)?;
126 let mut targets = Vec::new();
127 for t in to {
128 if let Some(info) = self.peers.get(t) {
129 targets.push(info.x25519_key.into())
130 } else if *t == self.my_keys.0 {
131 targets.push(self.my_keys.1)
132 } else {
133 error!(peer = %t, "multicast target not found");
134 }
135 }
136 self.inner
137 .multicast(Slot::new(*v), targets, bytes)
138 .map_err(to_network_error)?;
139 Ok(())
140 }
141
142 fn broadcast(&mut self, v: ViewNumber, m: &Message<T, Validated>) -> Result<(), NetworkError> {
143 let bytes = self.serialize(m)?;
144 self.inner
145 .broadcast(Slot::new(*v), bytes)
146 .map_err(to_network_error)?;
147 Ok(())
148 }
149
150 async fn receive(&mut self) -> Result<Message<T, Unchecked>, NetworkError> {
151 let (_src, bytes) = self
152 .inner
153 .receive()
154 .await
155 .ok_or_else(|| NetworkError::Critical("cliquenet has shutdown".into()))?;
156 let m = self.deserialize(&bytes)?;
157 Ok(m)
158 }
159
160 async fn shutdown(&mut self) {
161 if let Ok(done) = self.inner.shutdown() {
162 done.await
163 }
164 }
165
166 fn gc(&mut self, v: ViewNumber) -> Result<(), NetworkError> {
167 self.inner.gc(Slot::new(*v)).map_err(to_network_error)
168 }
169
170 fn add_peers(
171 &mut self,
172 r: PeerRole,
173 ps: Vec<(T::SignatureKey, Self::PeerData)>,
174 ) -> Result<(), NetworkError> {
175 let mut targets = Vec::new();
176 for (k, (x, a)) in ps {
177 self.peers.insert(
178 k,
179 PeerConnectInfo {
180 x25519_key: x.into(),
181 p2p_addr: a.clone(),
182 },
183 );
184 targets.push((x, a))
185 }
186 self.inner
187 .add_peers(map_peer_role(r), targets)
188 .map_err(to_network_error)?;
189 Ok(())
190 }
191
192 fn remove_peers(&mut self, ps: Vec<&T::SignatureKey>) -> Result<(), NetworkError> {
193 let mut targets = Vec::new();
194 for k in ps {
195 if let Some(info) = self.peers.remove(k) {
196 targets.push(info.x25519_key.into())
197 }
198 }
199 self.inner.remove_peers(targets).map_err(to_network_error)?;
200 Ok(())
201 }
202
203 fn assign_role(&mut self, r: PeerRole, ps: Vec<&T::SignatureKey>) -> Result<(), NetworkError> {
204 let mut targets = Vec::new();
205 for k in ps {
206 if let Some(info) = self.peers.get(k) {
207 targets.push(info.x25519_key.into())
208 }
209 }
210 self.inner
211 .assign_peers(map_peer_role(r), targets)
212 .map_err(to_network_error)?;
213 Ok(())
214 }
215
216 fn apply_epoch(
224 &mut self,
225 epoch: EpochNumber,
226 coord: &EpochMembershipCoordinator<T>,
227 ) -> Result<(), NetworkError> {
228 if epoch <= self.epoch {
229 info!(%epoch, ours = %self.epoch, "epoch already seen");
230 return Ok(());
231 }
232
233 let Some(curr_infos) = coord.epoch_peers(Some(epoch)) else {
235 error!(%epoch, "no stake table available");
236 return Ok(());
237 };
238
239 let prev_infos = if *epoch > 0 {
241 coord.epoch_peers(Some(epoch - 1)).unwrap_or_else(|| {
242 info!(%epoch, "previous epoch's stake table unavailable");
243 HashMap::new()
244 })
245 } else {
246 HashMap::new()
247 };
248
249 let next_infos = coord.epoch_peers(Some(epoch + 1)).unwrap_or_else(|| {
251 info!(%epoch, "next epoch's stake table not available");
252 HashMap::new()
253 });
254
255 let mut merged_infos = prev_infos.clone();
258 for (k, v) in curr_infos.iter().chain(&next_infos) {
259 merged_infos.insert(k.clone(), v.clone());
260 }
261
262 let wanted: HashSet<T::SignatureKey> = curr_infos
263 .keys()
264 .chain(next_infos.keys())
265 .cloned()
266 .collect();
267
268 let retained: HashSet<T::SignatureKey> = curr_infos
269 .keys()
270 .chain(prev_infos.keys())
271 .cloned()
272 .collect();
273
274 let mut to_add: Vec<(T::SignatureKey, PeerConnectInfo)> = Vec::new();
275 let mut to_del: Vec<(T::SignatureKey, PeerConnectInfo)> = Vec::new();
276
277 for k in &wanted {
278 if let Some(Some(new_info)) = merged_infos.get(k) {
279 if Some(new_info) != self.peers.get(k) {
280 info!(%epoch, peer = %k, "adding/updating network peer");
281 to_add.push((k.clone(), new_info.clone()));
282 } else {
283 info!(%epoch, peer = %k, "peer unchanged");
284 }
285 } else {
286 info!(%epoch, peer = %k, "ignoring peer without connection info");
287 }
288 }
289
290 for (k, info) in &self.peers {
292 if !(retained.contains(k) || wanted.contains(k)) {
293 info!(%epoch, peer = %k, "removing network peer");
294 to_del.push((k.clone(), info.clone()));
295 }
296 }
297
298 for (k, _) in &to_del {
299 self.peers.remove(k);
300 }
301 for (k, info) in &to_add {
302 self.peers.insert(k.clone(), info.clone());
303 }
304
305 let add_targets: Vec<(PublicKey, NetAddr)> = to_add
306 .iter()
307 .map(|(_, i)| (i.x25519_key.into(), i.p2p_addr.clone()))
308 .collect();
309 let del_targets: Vec<PublicKey> = to_del.iter().map(|(_, i)| i.x25519_key.into()).collect();
310
311 if let Err(err) = self.inner.add_peers(Role::Active, add_targets) {
312 error!(%epoch, %err, "network down; could not add peers to network");
313 return Err(to_network_error(err));
314 }
315
316 if let Err(err) = self.inner.remove_peers(del_targets) {
317 error!(%epoch, %err, "network down; could not remove peers from network");
318 return Err(to_network_error(err));
319 }
320
321 info!(%epoch, peers = %self.peers.len());
322
323 self.epoch = epoch;
324 Ok(())
325 }
326}
327
328impl<T: NodeType> Cliquenet<T> {
329 fn serialize(&self, m: &Message<T, Validated>) -> Result<Vec<u8>, NetworkError> {
330 if let MessageType::External(bytes) = &m.message_type {
331 return Ok(bytes.clone());
332 }
333 self.upgrade_lock
334 .serialize(m)
335 .map_err(|e| NetworkError::Io(format!("serialization error: {e}").into()))
336 }
337
338 fn deserialize(&self, bytes: &[u8]) -> Result<Message<T, Unchecked>, NetworkError> {
339 match self
340 .upgrade_lock
341 .deserialize::<Message<T, Unchecked>>(bytes)
342 {
343 Ok((m, v)) => {
344 if v == EXTERNAL_MESSAGE_VERSION && !m.is_external() {
345 let e = "received a non-external message with version 0.0".to_string();
346 return Err(NetworkError::Io(e.into()));
347 }
348 Ok(m)
349 },
350 Err(primary_err) => {
351 if let Ok((_v, hs_msg)) =
358 versions::decode::<hotshot_types::message::Message<T>>(bytes)
359 && let MessageKind::External(data) = hs_msg.kind
360 {
361 return Ok(Message {
362 sender: hs_msg.sender,
363 message_type: MessageType::External(data),
364 });
365 }
366 Err(NetworkError::Io(primary_err.to_string().into()))
367 },
368 }
369 }
370}
371
372fn map_peer_role(r: PeerRole) -> Role {
373 match r {
374 PeerRole::Active => Role::Active,
375 PeerRole::Passive => Role::Passive,
376 }
377}
378
379fn to_network_error(e: CliquenetError) -> NetworkError {
380 match e {
381 e @ CliquenetError::Bind(..) => NetworkError::Critical(e.into()),
382 e @ CliquenetError::ChannelClosed => NetworkError::Critical(e.into()),
383 e @ CliquenetError::BudgetClosed => NetworkError::Critical(e.into()),
384 e => NetworkError::Io(e.into()),
385 }
386}
387
388struct CliquenetMetrics {
389 metrics: Box<dyn Metrics>,
390 gauges: RwLock<Gauges>,
391 counters: RwLock<Counters>,
392}
393
394#[derive(Default)]
395struct Gauges {
396 gauges: HashMap<PublicKey, HashMap<String, Box<dyn Gauge>>>,
397 family: HashMap<String, Box<dyn GaugeFamily>>,
398}
399
400#[derive(Default)]
401struct Counters {
402 counters: HashMap<PublicKey, HashMap<String, Box<dyn Counter>>>,
403 family: HashMap<String, Box<dyn CounterFamily>>,
404}
405
406impl CliquenetMetrics {
407 pub fn new(m: Box<dyn Metrics>) -> Self {
408 Self {
409 metrics: m.subgroup("cliquenet".to_string()),
410 gauges: RwLock::new(Gauges::default()),
411 counters: RwLock::new(Counters::default()),
412 }
413 }
414}
415
416impl cliquenet::Metrics for CliquenetMetrics {
422 fn set(&self, key: &PublicKey, label: &str, val: usize) {
423 if let Some(g) = self
424 .gauges
425 .read()
426 .gauges
427 .get(key)
428 .and_then(|m| m.get(label))
429 {
430 return g.set(val);
431 }
432
433 let mut gauges = self.gauges.write();
434
435 if let Some(g) = gauges.gauges.get(key).and_then(|m| m.get(label)) {
437 return g.set(val);
438 }
439
440 let g = gauges
441 .family
442 .entry(label.to_string())
443 .or_insert_with(|| {
444 self.metrics
445 .gauge_family(label.to_string(), vec!["peer".to_string()])
446 })
447 .create(vec![key.to_string()]);
448
449 gauges
450 .gauges
451 .entry(*key)
452 .or_default()
453 .entry(label.to_string())
454 .or_insert(g)
455 .set(val)
456 }
457
458 fn add(&self, key: &PublicKey, label: &str, val: usize) {
459 if let Some(c) = self
460 .counters
461 .read()
462 .counters
463 .get(key)
464 .and_then(|m| m.get(label))
465 {
466 return c.add(val);
467 }
468
469 let mut counters = self.counters.write();
470
471 if let Some(c) = counters.counters.get(key).and_then(|m| m.get(label)) {
473 return c.add(val);
474 }
475
476 let c = counters
477 .family
478 .entry(label.to_string())
479 .or_insert_with(|| {
480 self.metrics
481 .counter_family(label.to_string(), vec!["peer".to_string()])
482 })
483 .create(vec![key.to_string()]);
484
485 counters
486 .counters
487 .entry(*key)
488 .or_default()
489 .entry(label.to_string())
490 .or_insert(c)
491 .add(val)
492 }
493
494 fn del(&self, key: &PublicKey) {
495 let key_string = key.to_string();
496
497 {
498 let mut gauges = self.gauges.write();
499 for (label, _) in gauges.gauges.remove(key).into_iter().flatten() {
500 if let Some(f) = gauges.family.get(&label) {
501 f.destroy(&[&key_string]);
502 }
503 }
504 }
505
506 {
507 let mut counters = self.counters.write();
508 for (label, _) in counters.counters.remove(key).into_iter().flatten() {
509 if let Some(f) = counters.family.get(&label) {
510 f.destroy(&[&key_string]);
511 }
512 }
513 }
514 }
515}