cliquenet/net/server.rs
1use std::{collections::HashMap, mem, net::IpAddr, sync::Arc, time::Duration};
2
3use bytes::{Bytes, BytesMut};
4use tokio::{
5 net::{TcpListener, TcpStream},
6 select, spawn,
7 sync::{
8 mpsc::{UnboundedReceiver, UnboundedSender},
9 watch,
10 },
11 task::{JoinHandle, JoinSet},
12};
13use tokio_util::{sync::CancellationToken, task::JoinMap};
14use tracing::{debug, error, info, trace, warn};
15
16use crate::{
17 Config, Metrics, PublicKey, Role,
18 addr::NetAddr,
19 connection::Connection,
20 error::NetworkError,
21 msg::{MsgId, Slot, Trailer, hello::Hello},
22 net::{Command, PeerCommand, PeerMessage, RetryPolicy, SendAction, peer::Peer},
23 queue::Queue,
24 util::until,
25};
26
27pub struct Server {
28 key: PublicKey,
29 conf: Arc<Config>,
30 role: Role,
31 msgid: MsgId,
32 lower_bound: Slot,
33 parties: HashMap<PublicKey, Party>,
34 ibound: UnboundedSender<PeerMessage>,
35 obound: UnboundedReceiver<Command>,
36 next_slot: watch::Receiver<Slot>,
37 accept_tasks: JoinSet<Result<Connection, NetworkError>>,
38 hello_tasks: JoinMap<PublicKey, Result<(Hello, Connection, Hello), NetworkError>>,
39 connect_tasks: JoinMap<PublicKey, Connection>,
40 peer_tasks: JoinMap<PublicKey, Peer>,
41 metrics: Arc<dyn Metrics>,
42}
43
44struct Party {
45 role: Role,
46 addr: NetAddr,
47 outbox: Queue<(RetryPolicy, Bytes)>,
48 peer: PeerState,
49}
50
51/// The states of a peer.
52///
53/// When a party is created there exists no peer yet. After a connection has
54/// been accepted or a connect attempt succeeded, a peer task is created and
55/// the state transitions to `Connected`. The cancellation token is used if
56/// the peer task should stop in order to replace the peer's connection. When
57/// cancelled the next connection is stored in `Replace` and once the peer
58/// task finishes and returns the peer, its connection is set and a new peer
59/// task (with the same peer object) is spawned and the state transitions to
60/// `Connected` again.
61///
62/// The `Reconnect` state is entered if the peer itself errors and its task
63/// finishes. We keep the peer here until we have a new connection to resume.
64/// Once a connect or accept task finishes we set the connection and spawn
65/// a new peer task again.
66enum PeerState {
67 /// Initial state.
68 None,
69 /// A peer task is running.
70 ///
71 /// The cancellation token can be used to interrupt the peer. The task
72 /// will end and the peer object is returned.
73 Connected(CancellationToken),
74 /// A peer has errored and wants a fresh connection.
75 ///
76 /// We store the peer here until a new connection is available.
77 Reconnect(Peer),
78 /// The server wants to replace a peer's connection.
79 ///
80 /// Only entered from `Connected` after the server has cancelled the
81 /// peer task. While waiting for the task to return the peer object
82 /// we park the connection here.
83 Replace(Connection),
84}
85
86impl Server {
87 pub(super) fn spawn(
88 conf: Arc<Config>,
89 listener: TcpListener,
90 role: Role,
91 tx: UnboundedSender<PeerMessage>,
92 rx: UnboundedReceiver<Command>,
93 sx: watch::Receiver<Slot>,
94 metrics: Arc<dyn Metrics>,
95 ) -> JoinHandle<()> {
96 let our_key = conf.keypair.public_key();
97 let parties = conf
98 .parties
99 .iter()
100 .filter(|&(k, _)| *k != our_key)
101 .map(|(k, a)| {
102 let p = Party::new(Role::Active, a.clone());
103 (*k, p)
104 })
105 .collect();
106
107 let this = Self {
108 key: our_key,
109 conf,
110 role,
111 ibound: tx,
112 obound: rx,
113 parties,
114 accept_tasks: JoinSet::new(),
115 connect_tasks: JoinMap::new(),
116 hello_tasks: JoinMap::new(),
117 peer_tasks: JoinMap::new(),
118 msgid: MsgId::new(0),
119 next_slot: sx,
120 lower_bound: Slot::MIN,
121 metrics,
122 };
123
124 spawn(this.run(listener))
125 }
126
127 async fn run(mut self, listener: TcpListener) {
128 // Connect to all peers.
129 for (k, a) in self
130 .parties
131 .iter()
132 .map(|(k, p)| (*k, p.addr.clone()))
133 .collect::<Vec<_>>()
134 {
135 self.spawn_connect(k, a)
136 }
137
138 loop {
139 select! {
140 x = listener.accept() => match x {
141 Ok((stream, addr)) => {
142 debug!(
143 name = %self.conf.name,
144 node = %self.key,
145 %addr,
146 "accepted new tcp connection"
147 );
148 self.spawn_accept(stream)
149 }
150 Err(err) => {
151 warn!(
152 name = %self.conf.name,
153 node = %self.key,
154 %err,
155 "error accepting tcp connection"
156 )
157 }
158 },
159
160 Some(h) = self.accept_tasks.join_next() => match h {
161 Ok(Ok(conn)) => {
162 if conn.key == self.key {
163 warn!(
164 name = %self.conf.name,
165 node = %self.key,
166 peer = %conn.key,
167 addr = %conn.addr,
168 "rejecting connection with the same key"
169 );
170 self.spawn_hello(conn, Hello::BackOff(Duration::MAX));
171 continue
172 }
173 let Some(party) = self.parties.get_mut(&conn.key) else {
174 info!(
175 name = %self.conf.name,
176 node = %self.key,
177 peer = %conn.key,
178 addr = %conn.addr,
179 "unknown party"
180 );
181 self.spawn_hello(conn, Hello::BackOff(self.conf.backoff_duration));
182 continue
183 };
184 if party.ip_addr_mismatch(conn.addr.ip()) {
185 warn!(
186 name = %self.conf.name,
187 node = %self.key,
188 peer = %conn.key,
189 addr = %conn.addr,
190 "party has invalid ip addr"
191 );
192 self.spawn_hello(conn, Hello::BackOff(self.conf.backoff_duration));
193 continue
194 }
195 self.spawn_hello(conn, Hello::Ok);
196 }
197 Ok(Err(err)) => {
198 warn!(name = %self.conf.name, node = %self.key, %err, "handshake failed")
199 }
200 Err(err) => {
201 if !err.is_cancelled() {
202 error!(
203 name = %self.conf.name,
204 node = %self.key,
205 %err,
206 "handshake task panic"
207 )
208 }
209 }
210 },
211
212 Some(r) = self.hello_tasks.join_next() => match r {
213 (_, Ok(Ok((our_hello, conn, their_hello)))) => {
214 if conn.key == self.key {
215 // This case has been addressed already by rejecting the peer,
216 // i.e. we told the peer to backoff forever.
217 continue
218 }
219 let Some(party) = self.parties.get_mut(&conn.key) else {
220 info!(
221 name = %self.conf.name,
222 node = %self.key,
223 peer = %conn.key,
224 addr = %conn.addr,
225 "unknown party"
226 );
227 continue
228 };
229 if !(our_hello.is_ok() && their_hello.is_ok()) {
230 warn!(
231 name = %self.conf.name,
232 node = %self.key,
233 peer = %conn.key,
234 addr = %conn.addr,
235 ours = ?our_hello,
236 theirs = ?their_hello,
237 "hello failed"
238 );
239 continue
240 }
241 match party.peer.take() {
242 PeerState::None => {
243 self.connect_tasks.abort(&conn.key);
244 let key = conn.key;
245 let peer = Peer::builder()
246 .config(self.conf.clone())
247 .budget(self.conf.peer_budget)
248 .next_slot(self.next_slot.clone())
249 .inbound(self.ibound.clone())
250 .messages(party.outbox.clone())
251 .connection(conn)
252 .metrics(self.metrics.clone())
253 .build();
254 party.peer = PeerState::Connected(peer.cancel_token());
255 self.spawn_peer(key, peer);
256 }
257 PeerState::Reconnect(mut peer) => {
258 self.connect_tasks.abort(&conn.key);
259 let key = conn.key;
260 peer.set_connection(conn);
261 party.peer = PeerState::Connected(peer.cancel_token());
262 self.spawn_peer(key, peer);
263 }
264 PeerState::Connected(cancel) => {
265 if conn.key > self.key {
266 info!(
267 name = %self.conf.name,
268 node = %self.key,
269 peer = %conn.key,
270 addr = %conn.addr,
271 "replacing connection with accepted one"
272 );
273 cancel.cancel();
274 party.peer = PeerState::Replace(conn);
275 } else {
276 party.peer = PeerState::Connected(cancel);
277 }
278 }
279 PeerState::Replace(_) => {
280 party.peer = PeerState::Replace(conn);
281 }
282 }
283 }
284 (key, Ok(Err(err))) => {
285 warn!(
286 name = %self.conf.name,
287 node = %self.key,
288 peer = %key,
289 %err,
290 "hello task error"
291 )
292 }
293 (key, Err(err)) => {
294 if !err.is_cancelled() {
295 error!(
296 name = %self.conf.name,
297 node = %self.key,
298 peer = %key,
299 %err,
300 "hello task panic"
301 )
302 }
303 }
304 },
305
306 Some(x) = self.connect_tasks.join_next() => match x {
307 (_, Ok(conn)) => {
308 let Some(party) = self.parties.get_mut(&conn.key) else {
309 debug!(
310 name = %self.conf.name,
311 node = %self.key,
312 peer = %conn.key,
313 addr = %conn.addr,
314 "party has been removed"
315 );
316 continue
317 };
318 match party.peer.take() {
319 PeerState::None => {
320 let key = conn.key;
321 let peer = Peer::builder()
322 .config(self.conf.clone())
323 .budget(self.conf.peer_budget)
324 .next_slot(self.next_slot.clone())
325 .inbound(self.ibound.clone())
326 .messages(party.outbox.clone())
327 .connection(conn)
328 .metrics(self.metrics.clone())
329 .build();
330 party.peer = PeerState::Connected(peer.cancel_token());
331 self.spawn_peer(key, peer);
332 }
333 PeerState::Reconnect(mut peer) => {
334 let key = conn.key;
335 peer.set_connection(conn);
336 party.peer = PeerState::Connected(peer.cancel_token());
337 self.spawn_peer(key, peer);
338 }
339 PeerState::Connected(cancel) => {
340 if conn.key < self.key {
341 info!(
342 name = %self.conf.name,
343 node = %self.key,
344 peer = %conn.key,
345 addr = %conn.addr,
346 "replacing connection with outgoing one"
347 );
348 cancel.cancel();
349 party.peer = PeerState::Replace(conn);
350 } else {
351 party.peer = PeerState::Connected(cancel);
352 }
353 }
354 PeerState::Replace(_) => {
355 party.peer = PeerState::Replace(conn);
356 }
357 }
358 }
359 (key, Err(err)) => {
360 if !err.is_cancelled() {
361 error!(
362 name = %self.conf.name,
363 node = %self.key,
364 peer = %key,
365 %err,
366 "connect task panic"
367 )
368 }
369 }
370 },
371
372 Some(p) = self.peer_tasks.join_next() => match p {
373 (key, Ok(mut peer)) => {
374 if self.ibound.is_closed() {
375 return
376 }
377 let Some(party) = self.parties.get_mut(peer.public_key()) else {
378 debug!(
379 name = %self.conf.name,
380 node = %self.key,
381 peer = %peer.public_key(),
382 addr = %peer.socket_addr(),
383 "party has been removed"
384 );
385 continue
386 };
387 if let PeerState::Replace(conn) = party.peer.take() {
388 let key = conn.key;
389 peer.set_connection(conn);
390 party.peer = PeerState::Connected(peer.cancel_token());
391 self.spawn_peer(key, peer);
392 } else {
393 let addr = party.addr.clone();
394 party.peer = PeerState::Reconnect(peer);
395 self.spawn_connect(key, addr);
396 }
397 }
398 (key, Err(err)) => {
399 if !err.is_cancelled() {
400 error!(
401 name = %self.conf.name,
402 node = %self.key,
403 peer = %key,
404 %err,
405 "peer task panic"
406 );
407 if self.ibound.is_closed() {
408 return
409 }
410 if let Some(party) = self.parties.get_mut(&key) {
411 let addr = party.addr.clone();
412 party.peer = PeerState::None;
413 self.spawn_connect(key, addr);
414 }
415 }
416 }
417 },
418
419 r = self.next_slot.changed() => {
420 if r.is_err() {
421 return
422 }
423 let s = *self.next_slot.borrow_and_update();
424 debug_assert!(s > self.lower_bound); // ensured by controller
425 self.lower_bound = s;
426 self.metrics.set(&self.key, "lower_bound", u64::from(s) as usize);
427 for party in self.parties.values() {
428 party.outbox.gc(s)
429 }
430 }
431
432 cmd = self.obound.recv() => {
433 self.metrics.set(&self.key, "channel_size", self.obound.len());
434 match cmd {
435 Some(Command::Peer(PeerCommand::Add(role, parties))) => {
436 for (k, a) in parties {
437 if k == self.key {
438 self.role = role;
439 continue
440 }
441 if let Some(p) = self.parties.get_mut(&k) {
442 if p.addr == a {
443 p.role = role;
444 } else {
445 info!(
446 name = %self.conf.name,
447 node = %self.key,
448 peer = %k,
449 addr = %a,
450 "updating party address"
451 );
452 p.addr = a.clone();
453 p.role = role;
454 self.connect_tasks.abort(&k);
455 if let PeerState::Connected(cancel) = &p.peer {
456 cancel.cancel()
457 } else {
458 self.spawn_connect(k, a)
459 }
460 }
461 continue
462 }
463 info!(
464 name = %self.conf.name,
465 node = %self.key,
466 peer = %k,
467 addr = %a,
468 "adding new peer"
469 );
470 self.parties.insert(k, Party::new(role, a.clone()));
471 self.spawn_connect(k, a)
472 }
473 }
474 Some(Command::Peer(PeerCommand::Remove(peers))) => {
475 for k in &peers {
476 if *k == self.key {
477 info!(
478 name = %self.conf.name,
479 node = %self.key,
480 "removing self sets role to passive"
481 );
482 self.role = Role::Passive;
483 continue
484 }
485 info!(
486 name = %self.conf.name,
487 node = %self.key,
488 peer = %k,
489 "removing peer"
490 );
491 self.parties.remove(k);
492 self.connect_tasks.abort(k);
493 self.peer_tasks.abort(k);
494 }
495 }
496 Some(Command::Peer(PeerCommand::Assign(role, peers))) => {
497 for k in &peers {
498 if *k == self.key {
499 self.role = role;
500 continue
501 }
502 if let Some(p) = self.parties.get_mut(k) {
503 info!(
504 name = %self.conf.name,
505 node = %self.key,
506 peer = %k,
507 %role,
508 "assigning role to peer"
509 );
510 p.role = role
511 } else {
512 warn!(
513 name = %self.conf.name,
514 node = %self.key,
515 peer = %k,
516 role = %role,
517 "peer to assign role to not found"
518 );
519 }
520 }
521 }
522 Some(Command::Send(cmd)) => match cmd.action {
523 SendAction::Unicast(to, m) => {
524 if cmd.slot < self.lower_bound {
525 continue
526 }
527
528 if to == self.key {
529 trace!(name = %self.conf.name, node = %self.key, "sending message");
530 if let Err(err) = self.ibound.send((self.key, m.into(), None)) {
531 warn!(
532 name = %self.conf.name,
533 node = %self.key,
534 err = %err,
535 "channel closed"
536 );
537 return
538 }
539 trace!(name = %self.conf.name, node = %self.key, "message delivered");
540 continue
541 }
542
543 let msgid = self.next_msgid();
544 let bytes = append_trailer(cmd.retry, cmd.slot, msgid, m);
545
546 if let Some(party) = self.parties.get(&to) {
547 party.outbox.enqueue(cmd.slot, msgid, (cmd.retry, bytes));
548 } else {
549 warn!(
550 name = %self.conf.name,
551 node = %self.key,
552 peer = %to,
553 "unicast target not found"
554 );
555 }
556 }
557 SendAction::Multicast(parties, m) => {
558 if cmd.slot < self.lower_bound {
559 continue
560 }
561
562 let msgid = self.next_msgid();
563 let bytes = append_trailer(cmd.retry, cmd.slot, msgid, m);
564
565 if parties.contains(&self.key) {
566 let bytes = remove_trailer(bytes.clone());
567 trace!(name = %self.conf.name, node = %self.key, "sending message");
568 if let Err(err) = self.ibound.send((self.key, bytes, None)) {
569 warn!(
570 name = %self.conf.name,
571 node = %self.key,
572 err = %err,
573 "channel closed"
574 );
575 return
576 }
577 trace!(name = %self.conf.name, node = %self.key, "message delivered");
578 }
579
580 for (to, party) in &self.parties {
581 if !parties.contains(to) {
582 continue
583 }
584 trace!(name = %self.conf.name, node = %self.key, %to, "sending message");
585 party.outbox.enqueue(cmd.slot, msgid, (cmd.retry, bytes.clone()));
586 }
587 }
588 SendAction::Broadcast(m) => {
589 if cmd.slot < self.lower_bound {
590 continue
591 }
592
593 let msgid = self.next_msgid();
594 let bytes = append_trailer(cmd.retry, cmd.slot, msgid, m);
595
596 if self.role.is_active() {
597 let bytes = remove_trailer(bytes.clone());
598 trace!(name = %self.conf.name, node = %self.key, "sending message");
599 if let Err(err) = self.ibound.send((self.key, bytes, None)) {
600 warn!(
601 name = %self.conf.name,
602 node = %self.key,
603 err = %err,
604 "channel closed"
605 );
606 return
607 }
608 trace!(name = %self.conf.name, node = %self.key, "message delivered");
609 }
610 for (key, party) in &self.parties {
611 if party.role.is_active() {
612 trace!(
613 name = %self.conf.name,
614 node = %self.key,
615 to = %key,
616 "sending message"
617 );
618 party.outbox.enqueue(cmd.slot, msgid, (cmd.retry, bytes.clone()));
619 }
620 }
621 }
622 }
623 Some(Command::Shutdown(tx)) => {
624 debug!(name = %self.conf.name, node = %self.key, "shutting down");
625 let _ = tx.send(());
626 return
627 }
628 None => return
629 }
630 }
631 }
632 }
633 }
634
635 fn spawn_connect(&mut self, key: PublicKey, addr: NetAddr) {
636 if self.key == key {
637 return;
638 }
639 debug!(
640 name = %self.conf.name,
641 node = %self.key,
642 peer = %key,
643 addr = %addr,
644 "spawning connect task"
645 );
646 let conn = Connection::connect(self.conf.clone(), key, addr);
647 self.connect_tasks.spawn(key, conn);
648 self.metrics.add(&key, "connect_attempts", 1);
649 self.metrics
650 .set(&self.key, "connect_tasks", self.connect_tasks.len());
651 }
652
653 fn spawn_accept(&mut self, stream: TcpStream) {
654 debug!(name = %self.conf.name, node = %self.key, "spawning accept task");
655 let conn = Connection::accept(self.conf.clone(), stream);
656 self.accept_tasks.spawn(conn);
657 self.metrics
658 .set(&self.key, "accept_tasks", self.accept_tasks.len());
659 }
660
661 fn spawn_hello(&mut self, mut conn: Connection, ours: Hello) {
662 debug!(
663 name = %self.conf.name,
664 node = %self.key,
665 peer = %conn.key,
666 addr = %conn.addr,
667 "spawning hello task"
668 );
669
670 self.metrics.add(&conn.key, "hellos", 1);
671
672 self.hello_tasks.abort(&conn.key);
673 self.hello_tasks.spawn(
674 conn.key,
675 until(self.conf.handshake_timeout, async move {
676 let theirs = conn.recv_hello().await?;
677 conn.send_hello(ours.clone()).await?;
678 Ok::<_, NetworkError>((ours, conn, theirs))
679 }),
680 );
681
682 self.metrics
683 .set(&self.key, "hello_tasks", self.hello_tasks.len());
684 }
685
686 fn spawn_peer(&mut self, key: PublicKey, mut peer: Peer) {
687 debug!(
688 name = %self.conf.name,
689 node = %self.key,
690 peer = %peer.public_key(),
691 addr = %peer.socket_addr(),
692 "spawning peer task"
693 );
694 let node = self.key;
695 let name = self.conf.name.clone();
696 let metrics = self.metrics.clone();
697 self.peer_tasks.spawn(key, async move {
698 let Err(err) = peer.start().await;
699 if !matches!(err, NetworkError::PeerInterrupt) {
700 warn!(
701 %name,
702 %node,
703 peer = %peer.public_key(),
704 addr = %peer.socket_addr(),
705 %err,
706 "peer failure"
707 );
708 metrics.add(peer.public_key(), "errors", 1)
709 }
710 peer
711 });
712 self.metrics
713 .set(&self.key, "peer_tasks", self.peer_tasks.len());
714 }
715
716 fn next_msgid(&mut self) -> MsgId {
717 let current = self.msgid;
718 self.msgid = MsgId::new(self.msgid.0.wrapping_add(1));
719 current
720 }
721}
722
723impl Party {
724 fn new(r: Role, a: NetAddr) -> Self {
725 Self {
726 addr: a,
727 role: r,
728 outbox: Queue::new(),
729 peer: PeerState::None,
730 }
731 }
732
733 fn ip_addr_mismatch(&self, addr: IpAddr) -> bool {
734 let NetAddr::Inet(ip, _) = &self.addr else {
735 return false;
736 };
737 *ip != addr
738 }
739}
740
741impl PeerState {
742 fn take(&mut self) -> Self {
743 mem::replace(self, Self::None)
744 }
745}
746
747fn append_trailer(pol: RetryPolicy, slot: Slot, id: MsgId, bytes: Vec<u8>) -> Bytes {
748 let t = match pol {
749 RetryPolicy::Default => Trailer::Std { slot, id },
750 RetryPolicy::NoRetry => Trailer::NoAck { slot },
751 };
752 let mut msg = BytesMut::from(Bytes::from(bytes));
753 msg.extend_from_slice(t.to_bytes().as_ref());
754 msg.freeze()
755}
756
757fn remove_trailer(mut bytes: Bytes) -> Bytes {
758 let _t = Trailer::from_bytes(&mut bytes);
759 debug_assert!(_t.is_some());
760 bytes
761}