Skip to main content

hotshot_new_protocol/network/
cliquenet.rs

1use std::collections::{HashMap, HashSet};
2
3pub use cliquenet::Config as CliquenetConfig;
4use cliquenet::{
5    NetAddr, NetworkError as CliquenetError, Role, Slot, noise::Protocol, x25519::PublicKey,
6};
7use hotshot_types::{
8    PeerConnectInfo,
9    data::{EpochNumber, ViewNumber},
10    epoch_membership::EpochMembershipCoordinator,
11    message::{EXTERNAL_MESSAGE_VERSION, MessageKind, UpgradeLock},
12    traits::{
13        metrics::{Counter, CounterFamily, Gauge, GaugeFamily, Metrics},
14        node_implementation::NodeType,
15    },
16    x25519::Keypair,
17};
18use parking_lot::RwLock;
19use tracing::{error, info};
20
21use crate::{
22    message::{Message, MessageType, Unchecked, Validated},
23    network::{Network, NetworkError, PeerRole},
24};
25
26pub struct Cliquenet<T: NodeType> {
27    my_keys: (T::SignatureKey, PublicKey),
28    inner: cliquenet::Network,
29    peers: HashMap<T::SignatureKey, PeerConnectInfo>,
30    epoch: EpochNumber,
31    upgrade_lock: UpgradeLock<T>,
32}
33
34impl<T: NodeType> Cliquenet<T> {
35    pub async fn create<A, P>(
36        name: &str,
37        signing_key: T::SignatureKey,
38        keypair: Keypair,
39        addr: A,
40        parties: P,
41        upgrade_lock: UpgradeLock<T>,
42        metrics: Box<dyn Metrics>,
43    ) -> Result<Self, NetworkError>
44    where
45        A: Into<cliquenet::NetAddr>,
46        P: IntoIterator<Item = (T::SignatureKey, PeerConnectInfo)>,
47    {
48        let parties: HashMap<T::SignatureKey, PeerConnectInfo> = parties.into_iter().collect();
49
50        let cfg = cliquenet::Config::builder()
51            .name(name)
52            .keypair(keypair.into())
53            .bind(addr.into())
54            .parties(
55                parties
56                    .values()
57                    .map(|info| (info.x25519_key.into(), info.p2p_addr.clone())),
58            )
59            .noise_protocols([(1.into(), Protocol::IK_25519_AesGcm_Blake2s)])
60            .build();
61
62        Self::create_with_config(signing_key, upgrade_lock, cfg, parties, metrics).await
63    }
64
65    pub(crate) async fn create_with_config<P>(
66        signing_key: T::SignatureKey,
67        upgrade_lock: UpgradeLock<T>,
68        config: cliquenet::Config,
69        parties: P,
70        metrics: Box<dyn Metrics>,
71    ) -> Result<Self, NetworkError>
72    where
73        P: IntoIterator<Item = (T::SignatureKey, PeerConnectInfo)>,
74    {
75        let public_key = config.public_key();
76        let metrics = CliquenetMetrics::new(metrics);
77        let network = cliquenet::Network::create(config.with_metrics(metrics))
78            .await
79            .map_err(to_network_error)?;
80
81        let peers: HashMap<_, _> = parties.into_iter().collect();
82
83        info!(peers = %peers.len(), "cliquenet created");
84
85        Ok(Self {
86            my_keys: (signing_key, public_key),
87            inner: network,
88            peers,
89            epoch: EpochNumber::new(0),
90            upgrade_lock,
91        })
92    }
93}
94
95impl<T: NodeType> Network<T> for Cliquenet<T> {
96    type PeerData = (PublicKey, NetAddr);
97
98    fn unicast(
99        &mut self,
100        v: ViewNumber,
101        to: &T::SignatureKey,
102        m: &Message<T, Validated>,
103    ) -> Result<(), NetworkError> {
104        let target = if *to == self.my_keys.0 {
105            self.my_keys.1
106        } else if let Some(info) = self.peers.get(to) {
107            info.x25519_key.into()
108        } else {
109            error!(peer = %to, "unicast target not found");
110            return Ok(());
111        };
112        let bytes = self.serialize(m)?;
113        self.inner
114            .unicast(Slot::new(*v), target, bytes)
115            .map_err(to_network_error)?;
116        Ok(())
117    }
118
119    fn multicast(
120        &mut self,
121        v: ViewNumber,
122        to: Vec<&T::SignatureKey>,
123        m: &Message<T, Validated>,
124    ) -> Result<(), NetworkError> {
125        let bytes = self.serialize(m)?;
126        let mut targets = Vec::new();
127        for t in to {
128            if let Some(info) = self.peers.get(t) {
129                targets.push(info.x25519_key.into())
130            } else if *t == self.my_keys.0 {
131                targets.push(self.my_keys.1)
132            } else {
133                error!(peer = %t, "multicast target not found");
134            }
135        }
136        self.inner
137            .multicast(Slot::new(*v), targets, bytes)
138            .map_err(to_network_error)?;
139        Ok(())
140    }
141
142    fn broadcast(&mut self, v: ViewNumber, m: &Message<T, Validated>) -> Result<(), NetworkError> {
143        let bytes = self.serialize(m)?;
144        self.inner
145            .broadcast(Slot::new(*v), bytes)
146            .map_err(to_network_error)?;
147        Ok(())
148    }
149
150    async fn receive(&mut self) -> Result<Message<T, Unchecked>, NetworkError> {
151        let (_src, bytes) = self
152            .inner
153            .receive()
154            .await
155            .ok_or_else(|| NetworkError::Critical("cliquenet has shutdown".into()))?;
156        let m = self.deserialize(&bytes)?;
157        Ok(m)
158    }
159
160    async fn shutdown(&mut self) {
161        if let Ok(done) = self.inner.shutdown() {
162            done.await
163        }
164    }
165
166    fn gc(&mut self, v: ViewNumber) -> Result<(), NetworkError> {
167        self.inner.gc(Slot::new(*v)).map_err(to_network_error)
168    }
169
170    fn add_peers(
171        &mut self,
172        r: PeerRole,
173        ps: Vec<(T::SignatureKey, Self::PeerData)>,
174    ) -> Result<(), NetworkError> {
175        let mut targets = Vec::new();
176        for (k, (x, a)) in ps {
177            self.peers.insert(
178                k,
179                PeerConnectInfo {
180                    x25519_key: x.into(),
181                    p2p_addr: a.clone(),
182                },
183            );
184            targets.push((x, a))
185        }
186        self.inner
187            .add_peers(map_peer_role(r), targets)
188            .map_err(to_network_error)?;
189        Ok(())
190    }
191
192    fn remove_peers(&mut self, ps: Vec<&T::SignatureKey>) -> Result<(), NetworkError> {
193        let mut targets = Vec::new();
194        for k in ps {
195            if let Some(info) = self.peers.remove(k) {
196                targets.push(info.x25519_key.into())
197            }
198        }
199        self.inner.remove_peers(targets).map_err(to_network_error)?;
200        Ok(())
201    }
202
203    fn assign_role(&mut self, r: PeerRole, ps: Vec<&T::SignatureKey>) -> Result<(), NetworkError> {
204        let mut targets = Vec::new();
205        for k in ps {
206            if let Some(info) = self.peers.get(k) {
207                targets.push(info.x25519_key.into())
208            }
209        }
210        self.inner
211            .assign_peers(map_peer_role(r), targets)
212            .map_err(to_network_error)?;
213        Ok(())
214    }
215
216    /// Update peers on every epoch change.
217    ///
218    /// For any given epoch `e` we collect the validators of `e`, `e-1` and
219    /// `e+1` from the stake tables and merge their connection information.
220    ///
221    /// We keep validators that were in `e-1` but not in `e` for one additional
222    /// epoch and eagerly connect to new validators of `e+1`.
223    fn apply_epoch(
224        &mut self,
225        epoch: EpochNumber,
226        coord: &EpochMembershipCoordinator<T>,
227    ) -> Result<(), NetworkError> {
228        if epoch <= self.epoch {
229            info!(%epoch, ours = %self.epoch, "epoch already seen");
230            return Ok(());
231        }
232
233        // Validators of the new epoch.
234        let Some(curr_infos) = coord.epoch_peers(Some(epoch)) else {
235            error!(%epoch, "no stake table available");
236            return Ok(());
237        };
238
239        // Validators leaving are retained as peers for one additional epoch.
240        let prev_infos = if *epoch > 0 {
241            coord.epoch_peers(Some(epoch - 1)).unwrap_or_else(|| {
242                info!(%epoch, "previous epoch's stake table unavailable");
243                HashMap::new()
244            })
245        } else {
246            HashMap::new()
247        };
248
249        // Validators joining in the next epoch are connected to early.
250        let next_infos = coord.epoch_peers(Some(epoch + 1)).unwrap_or_else(|| {
251            info!(%epoch, "next epoch's stake table not available");
252            HashMap::new()
253        });
254
255        // Since connection information may be updated, we need to merge them,
256        // preferring the newest epoch's data, i.e. `next(curr(prev))`.
257        let mut merged_infos = prev_infos.clone();
258        for (k, v) in curr_infos.iter().chain(&next_infos) {
259            merged_infos.insert(k.clone(), v.clone());
260        }
261
262        let wanted: HashSet<T::SignatureKey> = curr_infos
263            .keys()
264            .chain(next_infos.keys())
265            .cloned()
266            .collect();
267
268        let retained: HashSet<T::SignatureKey> = curr_infos
269            .keys()
270            .chain(prev_infos.keys())
271            .cloned()
272            .collect();
273
274        let mut to_add: Vec<(T::SignatureKey, PeerConnectInfo)> = Vec::new();
275        let mut to_del: Vec<(T::SignatureKey, PeerConnectInfo)> = Vec::new();
276
277        for k in &wanted {
278            if let Some(Some(new_info)) = merged_infos.get(k) {
279                if Some(new_info) != self.peers.get(k) {
280                    info!(%epoch, peer = %k, "adding/updating network peer");
281                    to_add.push((k.clone(), new_info.clone()));
282                } else {
283                    info!(%epoch, peer = %k, "peer unchanged");
284                }
285            } else {
286                info!(%epoch, peer = %k, "ignoring peer without connection info");
287            }
288        }
289
290        // Remove peers that have left both the current and previous epochs.
291        for (k, info) in &self.peers {
292            if !(retained.contains(k) || wanted.contains(k)) {
293                info!(%epoch, peer = %k, "removing network peer");
294                to_del.push((k.clone(), info.clone()));
295            }
296        }
297
298        for (k, _) in &to_del {
299            self.peers.remove(k);
300        }
301        for (k, info) in &to_add {
302            self.peers.insert(k.clone(), info.clone());
303        }
304
305        let add_targets: Vec<(PublicKey, NetAddr)> = to_add
306            .iter()
307            .map(|(_, i)| (i.x25519_key.into(), i.p2p_addr.clone()))
308            .collect();
309        let del_targets: Vec<PublicKey> = to_del.iter().map(|(_, i)| i.x25519_key.into()).collect();
310
311        if let Err(err) = self.inner.add_peers(Role::Active, add_targets) {
312            error!(%epoch, %err, "network down; could not add peers to network");
313            return Err(to_network_error(err));
314        }
315
316        if let Err(err) = self.inner.remove_peers(del_targets) {
317            error!(%epoch, %err, "network down; could not remove peers from network");
318            return Err(to_network_error(err));
319        }
320
321        info!(%epoch, peers = %self.peers.len());
322
323        self.epoch = epoch;
324        Ok(())
325    }
326}
327
328impl<T: NodeType> Cliquenet<T> {
329    fn serialize(&self, m: &Message<T, Validated>) -> Result<Vec<u8>, NetworkError> {
330        if let MessageType::External(bytes) = &m.message_type {
331            return Ok(bytes.clone());
332        }
333        self.upgrade_lock
334            .serialize(m)
335            .map_err(|e| NetworkError::Io(format!("serialization error: {e}").into()))
336    }
337
338    fn deserialize(&self, bytes: &[u8]) -> Result<Message<T, Unchecked>, NetworkError> {
339        match self
340            .upgrade_lock
341            .deserialize::<Message<T, Unchecked>>(bytes)
342        {
343            Ok((m, v)) => {
344                if v == EXTERNAL_MESSAGE_VERSION && !m.is_external() {
345                    let e = "received a non-external message with version 0.0".to_string();
346                    return Err(NetworkError::Io(e.into()));
347                }
348                Ok(m)
349            },
350            Err(primary_err) => {
351                // Fallback: bytes may be a hotshot-types `Message<T>` carrying
352                // an `External` payload (this is how `Leaf2Fetcher` in the
353                // membership layer frames leaf-catchup requests/responses).
354                // If so, surface it as `MessageType::External` so the
355                // Coordinator can route it to the membership external
356                // channel just like a native new-protocol external message.
357                if let Ok((_v, hs_msg)) =
358                    versions::decode::<hotshot_types::message::Message<T>>(bytes)
359                    && let MessageKind::External(data) = hs_msg.kind
360                {
361                    return Ok(Message {
362                        sender: hs_msg.sender,
363                        message_type: MessageType::External(data),
364                    });
365                }
366                Err(NetworkError::Io(primary_err.to_string().into()))
367            },
368        }
369    }
370}
371
372fn map_peer_role(r: PeerRole) -> Role {
373    match r {
374        PeerRole::Active => Role::Active,
375        PeerRole::Passive => Role::Passive,
376    }
377}
378
379fn to_network_error(e: CliquenetError) -> NetworkError {
380    match e {
381        e @ CliquenetError::Bind(..) => NetworkError::Critical(e.into()),
382        e @ CliquenetError::ChannelClosed => NetworkError::Critical(e.into()),
383        e @ CliquenetError::BudgetClosed => NetworkError::Critical(e.into()),
384        e => NetworkError::Io(e.into()),
385    }
386}
387
388struct CliquenetMetrics {
389    metrics: Box<dyn Metrics>,
390    gauges: RwLock<Gauges>,
391    counters: RwLock<Counters>,
392}
393
394#[derive(Default)]
395struct Gauges {
396    gauges: HashMap<PublicKey, HashMap<String, Box<dyn Gauge>>>,
397    family: HashMap<String, Box<dyn GaugeFamily>>,
398}
399
400#[derive(Default)]
401struct Counters {
402    counters: HashMap<PublicKey, HashMap<String, Box<dyn Counter>>>,
403    family: HashMap<String, Box<dyn CounterFamily>>,
404}
405
406impl CliquenetMetrics {
407    pub fn new(m: Box<dyn Metrics>) -> Self {
408        Self {
409            metrics: m.subgroup("cliquenet".to_string()),
410            gauges: RwLock::new(Gauges::default()),
411            counters: RwLock::new(Counters::default()),
412        }
413    }
414}
415
416// In here we lazily create counters and gauges based on their labels.
417// If not found, we create a family using the label, e.g. "connect_attempts",
418// indexed by the peer (key). Afterwards we create the actual counter or gauge,
419// and update its value. On the next call, the metric would be found and
420// updated right away.
421impl cliquenet::Metrics for CliquenetMetrics {
422    fn set(&self, key: &PublicKey, label: &str, val: usize) {
423        if let Some(g) = self
424            .gauges
425            .read()
426            .gauges
427            .get(key)
428            .and_then(|m| m.get(label))
429        {
430            return g.set(val);
431        }
432
433        let mut gauges = self.gauges.write();
434
435        // Check again, in case a concurrent write has created the gauge:
436        if let Some(g) = gauges.gauges.get(key).and_then(|m| m.get(label)) {
437            return g.set(val);
438        }
439
440        let g = gauges
441            .family
442            .entry(label.to_string())
443            .or_insert_with(|| {
444                self.metrics
445                    .gauge_family(label.to_string(), vec!["peer".to_string()])
446            })
447            .create(vec![key.to_string()]);
448
449        gauges
450            .gauges
451            .entry(*key)
452            .or_default()
453            .entry(label.to_string())
454            .or_insert(g)
455            .set(val)
456    }
457
458    fn add(&self, key: &PublicKey, label: &str, val: usize) {
459        if let Some(c) = self
460            .counters
461            .read()
462            .counters
463            .get(key)
464            .and_then(|m| m.get(label))
465        {
466            return c.add(val);
467        }
468
469        let mut counters = self.counters.write();
470
471        // Check again, in case a concurrent write has created the counter:
472        if let Some(c) = counters.counters.get(key).and_then(|m| m.get(label)) {
473            return c.add(val);
474        }
475
476        let c = counters
477            .family
478            .entry(label.to_string())
479            .or_insert_with(|| {
480                self.metrics
481                    .counter_family(label.to_string(), vec!["peer".to_string()])
482            })
483            .create(vec![key.to_string()]);
484
485        counters
486            .counters
487            .entry(*key)
488            .or_default()
489            .entry(label.to_string())
490            .or_insert(c)
491            .add(val)
492    }
493
494    fn del(&self, key: &PublicKey) {
495        let key_string = key.to_string();
496
497        {
498            let mut gauges = self.gauges.write();
499            for (label, _) in gauges.gauges.remove(key).into_iter().flatten() {
500                if let Some(f) = gauges.family.get(&label) {
501                    f.destroy(&[&key_string]);
502                }
503            }
504        }
505
506        {
507            let mut counters = self.counters.write();
508            for (label, _) in counters.counters.remove(key).into_iter().flatten() {
509                if let Some(f) = counters.family.get(&label) {
510                    f.destroy(&[&key_string]);
511                }
512            }
513        }
514    }
515}