Skip to main content

cliquenet/net/
server.rs

1use std::{collections::HashMap, mem, net::IpAddr, sync::Arc, time::Duration};
2
3use bytes::{Bytes, BytesMut};
4use tokio::{
5    net::{TcpListener, TcpStream},
6    select, spawn,
7    sync::{
8        mpsc::{UnboundedReceiver, UnboundedSender},
9        watch,
10    },
11    task::{JoinHandle, JoinSet},
12};
13use tokio_util::{sync::CancellationToken, task::JoinMap};
14use tracing::{debug, error, info, trace, warn};
15
16use crate::{
17    Config, Metrics, PublicKey, Role,
18    addr::NetAddr,
19    connection::Connection,
20    error::NetworkError,
21    msg::{MsgId, Slot, Trailer, hello::Hello},
22    net::{Command, PeerCommand, PeerMessage, RetryPolicy, SendAction, peer::Peer},
23    queue::Queue,
24    util::until,
25};
26
27pub struct Server {
28    key: PublicKey,
29    conf: Arc<Config>,
30    role: Role,
31    msgid: MsgId,
32    lower_bound: Slot,
33    parties: HashMap<PublicKey, Party>,
34    ibound: UnboundedSender<PeerMessage>,
35    obound: UnboundedReceiver<Command>,
36    next_slot: watch::Receiver<Slot>,
37    accept_tasks: JoinSet<Result<Connection, NetworkError>>,
38    hello_tasks: JoinMap<PublicKey, Result<(Hello, Connection, Hello), NetworkError>>,
39    connect_tasks: JoinMap<PublicKey, Connection>,
40    peer_tasks: JoinMap<PublicKey, Peer>,
41    metrics: Arc<dyn Metrics>,
42}
43
44struct Party {
45    role: Role,
46    addr: NetAddr,
47    outbox: Queue<(RetryPolicy, Bytes)>,
48    peer: PeerState,
49}
50
51/// The states of a peer.
52///
53/// When a party is created there exists no peer yet. After a connection has
54/// been accepted or a connect attempt succeeded, a peer task is created and
55/// the state transitions to `Connected`. The cancellation token is used if
56/// the peer task should stop in order to replace the peer's connection. When
57/// cancelled the next connection is stored in `Replace` and once the peer
58/// task finishes and returns the peer, its connection is set and a new peer
59/// task (with the same peer object) is spawned and the state transitions to
60/// `Connected` again.
61///
62/// The `Reconnect` state is entered if the peer itself errors and its task
63/// finishes. We keep the peer here until we have a new connection to resume.
64/// Once a connect or accept task finishes we set the connection and spawn
65/// a new peer task again.
66enum PeerState {
67    /// Initial state.
68    None,
69    /// A peer task is running.
70    ///
71    /// The cancellation token can be used to interrupt the peer. The task
72    /// will end and the peer object is returned.
73    Connected(CancellationToken),
74    /// A peer has errored and wants a fresh connection.
75    ///
76    /// We store the peer here until a new connection is available.
77    Reconnect(Peer),
78    /// The server wants to replace a peer's connection.
79    ///
80    /// Only entered from `Connected` after the server has cancelled the
81    /// peer task. While waiting for the task to return the peer object
82    /// we park the connection here.
83    Replace(Connection),
84}
85
86impl Server {
87    pub(super) fn spawn(
88        conf: Arc<Config>,
89        listener: TcpListener,
90        role: Role,
91        tx: UnboundedSender<PeerMessage>,
92        rx: UnboundedReceiver<Command>,
93        sx: watch::Receiver<Slot>,
94        metrics: Arc<dyn Metrics>,
95    ) -> JoinHandle<()> {
96        let our_key = conf.keypair.public_key();
97        let parties = conf
98            .parties
99            .iter()
100            .filter(|&(k, _)| *k != our_key)
101            .map(|(k, a)| {
102                let p = Party::new(Role::Active, a.clone());
103                (*k, p)
104            })
105            .collect();
106
107        let this = Self {
108            key: our_key,
109            conf,
110            role,
111            ibound: tx,
112            obound: rx,
113            parties,
114            accept_tasks: JoinSet::new(),
115            connect_tasks: JoinMap::new(),
116            hello_tasks: JoinMap::new(),
117            peer_tasks: JoinMap::new(),
118            msgid: MsgId::new(0),
119            next_slot: sx,
120            lower_bound: Slot::MIN,
121            metrics,
122        };
123
124        spawn(this.run(listener))
125    }
126
127    async fn run(mut self, listener: TcpListener) {
128        // Connect to all peers.
129        for (k, a) in self
130            .parties
131            .iter()
132            .map(|(k, p)| (*k, p.addr.clone()))
133            .collect::<Vec<_>>()
134        {
135            self.spawn_connect(k, a)
136        }
137
138        loop {
139            select! {
140                x = listener.accept() => match x {
141                    Ok((stream, addr)) => {
142                        debug!(
143                            name = %self.conf.name,
144                            node = %self.key,
145                            %addr,
146                            "accepted new tcp connection"
147                        );
148                        self.spawn_accept(stream)
149                    }
150                    Err(err) => {
151                        warn!(
152                            name = %self.conf.name,
153                            node = %self.key,
154                            %err,
155                            "error accepting tcp connection"
156                        )
157                    }
158                },
159
160                Some(h) = self.accept_tasks.join_next() => match h {
161                    Ok(Ok(conn)) => {
162                        if conn.key == self.key {
163                            warn!(
164                                name = %self.conf.name,
165                                node = %self.key,
166                                peer = %conn.key,
167                                addr = %conn.addr,
168                                "rejecting connection with the same key"
169                            );
170                            self.spawn_hello(conn, Hello::BackOff(Duration::MAX));
171                            continue
172                        }
173                        let Some(party) = self.parties.get_mut(&conn.key) else {
174                            info!(
175                                name = %self.conf.name,
176                                node = %self.key,
177                                peer = %conn.key,
178                                addr = %conn.addr,
179                                "unknown party"
180                            );
181                            self.spawn_hello(conn, Hello::BackOff(self.conf.backoff_duration));
182                            continue
183                        };
184                        if party.ip_addr_mismatch(conn.addr.ip()) {
185                            warn!(
186                                name = %self.conf.name,
187                                node = %self.key,
188                                peer = %conn.key,
189                                addr = %conn.addr,
190                                "party has invalid ip addr"
191                            );
192                            self.spawn_hello(conn, Hello::BackOff(self.conf.backoff_duration));
193                            continue
194                        }
195                        self.spawn_hello(conn, Hello::Ok);
196                    }
197                    Ok(Err(err)) => {
198                        warn!(name = %self.conf.name, node = %self.key, %err, "handshake failed")
199                    }
200                    Err(err) => {
201                        if !err.is_cancelled() {
202                            error!(
203                                name = %self.conf.name,
204                                node = %self.key,
205                                %err,
206                                "handshake task panic"
207                            )
208                        }
209                    }
210                },
211
212                Some(r) = self.hello_tasks.join_next() => match r {
213                    (_, Ok(Ok((our_hello, conn, their_hello)))) => {
214                        if conn.key == self.key {
215                            // This case has been addressed already by rejecting the peer,
216                            // i.e. we told the peer to backoff forever.
217                            continue
218                        }
219                        let Some(party) = self.parties.get_mut(&conn.key) else {
220                            info!(
221                                name = %self.conf.name,
222                                node = %self.key,
223                                peer = %conn.key,
224                                addr = %conn.addr,
225                                "unknown party"
226                            );
227                            continue
228                        };
229                        if !(our_hello.is_ok() && their_hello.is_ok()) {
230                            warn!(
231                                name   = %self.conf.name,
232                                node   = %self.key,
233                                peer   = %conn.key,
234                                addr   = %conn.addr,
235                                ours   = ?our_hello,
236                                theirs = ?their_hello,
237                                "hello failed"
238                            );
239                            continue
240                        }
241                        match party.peer.take() {
242                            PeerState::None => {
243                                self.connect_tasks.abort(&conn.key);
244                                let key = conn.key;
245                                let peer = Peer::builder()
246                                    .config(self.conf.clone())
247                                    .budget(self.conf.peer_budget)
248                                    .next_slot(self.next_slot.clone())
249                                    .inbound(self.ibound.clone())
250                                    .messages(party.outbox.clone())
251                                    .connection(conn)
252                                    .metrics(self.metrics.clone())
253                                    .build();
254                                party.peer = PeerState::Connected(peer.cancel_token());
255                                self.spawn_peer(key, peer);
256                            }
257                            PeerState::Reconnect(mut peer) => {
258                                self.connect_tasks.abort(&conn.key);
259                                let key = conn.key;
260                                peer.set_connection(conn);
261                                party.peer = PeerState::Connected(peer.cancel_token());
262                                self.spawn_peer(key, peer);
263                            }
264                            PeerState::Connected(cancel) => {
265                                if conn.key > self.key {
266                                    info!(
267                                        name = %self.conf.name,
268                                        node = %self.key,
269                                        peer = %conn.key,
270                                        addr = %conn.addr,
271                                        "replacing connection with accepted one"
272                                    );
273                                    cancel.cancel();
274                                    party.peer = PeerState::Replace(conn);
275                                } else {
276                                    party.peer = PeerState::Connected(cancel);
277                                }
278                            }
279                            PeerState::Replace(_) => {
280                                party.peer = PeerState::Replace(conn);
281                            }
282                        }
283                    }
284                    (key, Ok(Err(err))) => {
285                        warn!(
286                            name = %self.conf.name,
287                            node = %self.key,
288                            peer = %key,
289                            %err,
290                            "hello task error"
291                        )
292                    }
293                    (key, Err(err)) => {
294                        if !err.is_cancelled() {
295                            error!(
296                                name = %self.conf.name,
297                                node = %self.key,
298                                peer = %key,
299                                %err,
300                                "hello task panic"
301                            )
302                        }
303                    }
304                },
305
306                Some(x) = self.connect_tasks.join_next() => match x {
307                    (_, Ok(conn)) => {
308                        let Some(party) = self.parties.get_mut(&conn.key) else {
309                            debug!(
310                                name = %self.conf.name,
311                                node = %self.key,
312                                peer = %conn.key,
313                                addr = %conn.addr,
314                                "party has been removed"
315                            );
316                            continue
317                        };
318                        match party.peer.take() {
319                            PeerState::None => {
320                                let key = conn.key;
321                                let peer = Peer::builder()
322                                    .config(self.conf.clone())
323                                    .budget(self.conf.peer_budget)
324                                    .next_slot(self.next_slot.clone())
325                                    .inbound(self.ibound.clone())
326                                    .messages(party.outbox.clone())
327                                    .connection(conn)
328                                    .metrics(self.metrics.clone())
329                                    .build();
330                                party.peer = PeerState::Connected(peer.cancel_token());
331                                self.spawn_peer(key, peer);
332                            }
333                            PeerState::Reconnect(mut peer) => {
334                                let key = conn.key;
335                                peer.set_connection(conn);
336                                party.peer = PeerState::Connected(peer.cancel_token());
337                                self.spawn_peer(key, peer);
338                            }
339                            PeerState::Connected(cancel) => {
340                                if conn.key < self.key {
341                                    info!(
342                                        name = %self.conf.name,
343                                        node = %self.key,
344                                        peer = %conn.key,
345                                        addr = %conn.addr,
346                                        "replacing connection with outgoing one"
347                                    );
348                                    cancel.cancel();
349                                    party.peer = PeerState::Replace(conn);
350                                } else {
351                                    party.peer = PeerState::Connected(cancel);
352                                }
353                            }
354                            PeerState::Replace(_) => {
355                                party.peer = PeerState::Replace(conn);
356                            }
357                        }
358                    }
359                    (key, Err(err)) => {
360                        if !err.is_cancelled() {
361                            error!(
362                                name = %self.conf.name,
363                                node = %self.key,
364                                peer = %key,
365                                %err,
366                                "connect task panic"
367                            )
368                        }
369                    }
370                },
371
372                Some(p) = self.peer_tasks.join_next() => match p {
373                    (key, Ok(mut peer)) => {
374                        if self.ibound.is_closed() {
375                            return
376                        }
377                        let Some(party) = self.parties.get_mut(peer.public_key()) else {
378                            debug!(
379                                name = %self.conf.name,
380                                node = %self.key,
381                                peer = %peer.public_key(),
382                                addr = %peer.socket_addr(),
383                                "party has been removed"
384                            );
385                            continue
386                        };
387                        if let PeerState::Replace(conn) = party.peer.take() {
388                            let key = conn.key;
389                            peer.set_connection(conn);
390                            party.peer = PeerState::Connected(peer.cancel_token());
391                            self.spawn_peer(key, peer);
392                        } else {
393                            let addr = party.addr.clone();
394                            party.peer = PeerState::Reconnect(peer);
395                            self.spawn_connect(key, addr);
396                        }
397                    }
398                    (key, Err(err)) => {
399                        if !err.is_cancelled() {
400                            error!(
401                                name = %self.conf.name,
402                                node = %self.key,
403                                peer = %key,
404                                %err,
405                                "peer task panic"
406                            );
407                            if self.ibound.is_closed() {
408                                return
409                            }
410                            if let Some(party) = self.parties.get_mut(&key) {
411                                let addr = party.addr.clone();
412                                party.peer = PeerState::None;
413                                self.spawn_connect(key, addr);
414                            }
415                        }
416                    }
417                },
418
419                r = self.next_slot.changed() => {
420                    if r.is_err() {
421                        return
422                    }
423                    let s = *self.next_slot.borrow_and_update();
424                    debug_assert!(s > self.lower_bound); // ensured by controller
425                    self.lower_bound = s;
426                    self.metrics.set(&self.key, "lower_bound", u64::from(s) as usize);
427                    for party in self.parties.values() {
428                        party.outbox.gc(s)
429                    }
430                }
431
432                cmd = self.obound.recv() => {
433                    self.metrics.set(&self.key, "channel_size", self.obound.len());
434                    match cmd {
435                        Some(Command::Peer(PeerCommand::Add(role, parties))) => {
436                            for (k, a) in parties {
437                                if k == self.key {
438                                    self.role = role;
439                                    continue
440                                }
441                                if let Some(p) = self.parties.get_mut(&k) {
442                                    if p.addr == a {
443                                        p.role = role;
444                                    } else {
445                                        info!(
446                                            name = %self.conf.name,
447                                            node = %self.key,
448                                            peer = %k,
449                                            addr = %a,
450                                            "updating party address"
451                                        );
452                                        p.addr = a.clone();
453                                        p.role = role;
454                                        self.connect_tasks.abort(&k);
455                                        if let PeerState::Connected(cancel) = &p.peer {
456                                            cancel.cancel()
457                                        } else {
458                                            self.spawn_connect(k, a)
459                                        }
460                                    }
461                                    continue
462                                }
463                                info!(
464                                    name = %self.conf.name,
465                                    node = %self.key,
466                                    peer = %k,
467                                    addr = %a,
468                                    "adding new peer"
469                                );
470                                self.parties.insert(k, Party::new(role, a.clone()));
471                                self.spawn_connect(k, a)
472                            }
473                        }
474                        Some(Command::Peer(PeerCommand::Remove(peers))) => {
475                            for k in &peers {
476                                if *k == self.key {
477                                    info!(
478                                        name = %self.conf.name,
479                                        node = %self.key,
480                                        "removing self sets role to passive"
481                                    );
482                                    self.role = Role::Passive;
483                                    continue
484                                }
485                                info!(
486                                    name = %self.conf.name,
487                                    node = %self.key,
488                                    peer = %k,
489                                    "removing peer"
490                                );
491                                self.parties.remove(k);
492                                self.connect_tasks.abort(k);
493                                self.peer_tasks.abort(k);
494                            }
495                        }
496                        Some(Command::Peer(PeerCommand::Assign(role, peers))) => {
497                            for k in &peers {
498                                if *k == self.key {
499                                    self.role = role;
500                                    continue
501                                }
502                                if let Some(p) = self.parties.get_mut(k) {
503                                    info!(
504                                        name = %self.conf.name,
505                                        node = %self.key,
506                                        peer = %k,
507                                        %role,
508                                        "assigning role to peer"
509                                    );
510                                    p.role = role
511                                } else {
512                                    warn!(
513                                        name = %self.conf.name,
514                                        node = %self.key,
515                                        peer = %k,
516                                        role = %role,
517                                        "peer to assign role to not found"
518                                    );
519                                }
520                            }
521                        }
522                        Some(Command::Send(cmd)) => match cmd.action {
523                            SendAction::Unicast(to, m) => {
524                                if cmd.slot < self.lower_bound {
525                                    continue
526                                }
527
528                                if to == self.key {
529                                    trace!(name = %self.conf.name, node = %self.key, "sending message");
530                                    if let Err(err) = self.ibound.send((self.key, m.into(), None)) {
531                                        warn!(
532                                            name = %self.conf.name,
533                                            node = %self.key,
534                                            err  = %err,
535                                            "channel closed"
536                                        );
537                                        return
538                                    }
539                                    trace!(name = %self.conf.name, node = %self.key, "message delivered");
540                                    continue
541                                }
542
543                                let msgid = self.next_msgid();
544                                let bytes = append_trailer(cmd.retry, cmd.slot, msgid, m);
545
546                                if let Some(party) = self.parties.get(&to) {
547                                    party.outbox.enqueue(cmd.slot, msgid, (cmd.retry, bytes));
548                                } else {
549                                    warn!(
550                                        name = %self.conf.name,
551                                        node = %self.key,
552                                        peer = %to,
553                                        "unicast target not found"
554                                    );
555                                }
556                            }
557                            SendAction::Multicast(parties, m) => {
558                                if cmd.slot < self.lower_bound {
559                                    continue
560                                }
561
562                                let msgid = self.next_msgid();
563                                let bytes = append_trailer(cmd.retry, cmd.slot, msgid, m);
564
565                                if parties.contains(&self.key) {
566                                    let bytes = remove_trailer(bytes.clone());
567                                    trace!(name = %self.conf.name, node = %self.key, "sending message");
568                                    if let Err(err) = self.ibound.send((self.key, bytes, None)) {
569                                        warn!(
570                                            name = %self.conf.name,
571                                            node = %self.key,
572                                            err  = %err,
573                                            "channel closed"
574                                        );
575                                        return
576                                    }
577                                    trace!(name = %self.conf.name, node = %self.key, "message delivered");
578                                }
579
580                                for (to, party) in &self.parties {
581                                    if !parties.contains(to) {
582                                        continue
583                                    }
584                                    trace!(name = %self.conf.name, node = %self.key, %to, "sending message");
585                                    party.outbox.enqueue(cmd.slot, msgid, (cmd.retry, bytes.clone()));
586                                }
587                            }
588                            SendAction::Broadcast(m) => {
589                                if cmd.slot < self.lower_bound {
590                                    continue
591                                }
592
593                                let msgid = self.next_msgid();
594                                let bytes = append_trailer(cmd.retry, cmd.slot, msgid, m);
595
596                                if self.role.is_active() {
597                                    let bytes = remove_trailer(bytes.clone());
598                                    trace!(name = %self.conf.name, node = %self.key, "sending message");
599                                    if let Err(err) = self.ibound.send((self.key, bytes, None)) {
600                                        warn!(
601                                            name = %self.conf.name,
602                                            node = %self.key,
603                                            err  = %err,
604                                            "channel closed"
605                                        );
606                                        return
607                                    }
608                                    trace!(name = %self.conf.name, node = %self.key, "message delivered");
609                                }
610                                for (key, party) in &self.parties {
611                                    if party.role.is_active() {
612                                        trace!(
613                                            name  = %self.conf.name,
614                                            node  = %self.key,
615                                            to    = %key,
616                                            "sending message"
617                                        );
618                                        party.outbox.enqueue(cmd.slot, msgid, (cmd.retry, bytes.clone()));
619                                    }
620                                }
621                            }
622                        }
623                        Some(Command::Shutdown(tx)) => {
624                            debug!(name = %self.conf.name, node = %self.key, "shutting down");
625                            let _ = tx.send(());
626                            return
627                        }
628                        None => return
629                    }
630                }
631            }
632        }
633    }
634
635    fn spawn_connect(&mut self, key: PublicKey, addr: NetAddr) {
636        if self.key == key {
637            return;
638        }
639        debug!(
640            name = %self.conf.name,
641            node = %self.key,
642            peer = %key,
643            addr = %addr,
644            "spawning connect task"
645        );
646        let conn = Connection::connect(self.conf.clone(), key, addr);
647        self.connect_tasks.spawn(key, conn);
648        self.metrics.add(&key, "connect_attempts", 1);
649        self.metrics
650            .set(&self.key, "connect_tasks", self.connect_tasks.len());
651    }
652
653    fn spawn_accept(&mut self, stream: TcpStream) {
654        debug!(name = %self.conf.name, node = %self.key, "spawning accept task");
655        let conn = Connection::accept(self.conf.clone(), stream);
656        self.accept_tasks.spawn(conn);
657        self.metrics
658            .set(&self.key, "accept_tasks", self.accept_tasks.len());
659    }
660
661    fn spawn_hello(&mut self, mut conn: Connection, ours: Hello) {
662        debug!(
663            name = %self.conf.name,
664            node = %self.key,
665            peer = %conn.key,
666            addr = %conn.addr,
667            "spawning hello task"
668        );
669
670        self.metrics.add(&conn.key, "hellos", 1);
671
672        self.hello_tasks.abort(&conn.key);
673        self.hello_tasks.spawn(
674            conn.key,
675            until(self.conf.handshake_timeout, async move {
676                let theirs = conn.recv_hello().await?;
677                conn.send_hello(ours.clone()).await?;
678                Ok::<_, NetworkError>((ours, conn, theirs))
679            }),
680        );
681
682        self.metrics
683            .set(&self.key, "hello_tasks", self.hello_tasks.len());
684    }
685
686    fn spawn_peer(&mut self, key: PublicKey, mut peer: Peer) {
687        debug!(
688            name = %self.conf.name,
689            node = %self.key,
690            peer = %peer.public_key(),
691            addr = %peer.socket_addr(),
692            "spawning peer task"
693        );
694        let node = self.key;
695        let name = self.conf.name.clone();
696        let metrics = self.metrics.clone();
697        self.peer_tasks.spawn(key, async move {
698            let Err(err) = peer.start().await;
699            if !matches!(err, NetworkError::PeerInterrupt) {
700                warn!(
701                    %name,
702                    %node,
703                    peer = %peer.public_key(),
704                    addr = %peer.socket_addr(),
705                    %err,
706                    "peer failure"
707                );
708                metrics.add(peer.public_key(), "errors", 1)
709            }
710            peer
711        });
712        self.metrics
713            .set(&self.key, "peer_tasks", self.peer_tasks.len());
714    }
715
716    fn next_msgid(&mut self) -> MsgId {
717        let current = self.msgid;
718        self.msgid = MsgId::new(self.msgid.0.wrapping_add(1));
719        current
720    }
721}
722
723impl Party {
724    fn new(r: Role, a: NetAddr) -> Self {
725        Self {
726            addr: a,
727            role: r,
728            outbox: Queue::new(),
729            peer: PeerState::None,
730        }
731    }
732
733    fn ip_addr_mismatch(&self, addr: IpAddr) -> bool {
734        let NetAddr::Inet(ip, _) = &self.addr else {
735            return false;
736        };
737        *ip != addr
738    }
739}
740
741impl PeerState {
742    fn take(&mut self) -> Self {
743        mem::replace(self, Self::None)
744    }
745}
746
747fn append_trailer(pol: RetryPolicy, slot: Slot, id: MsgId, bytes: Vec<u8>) -> Bytes {
748    let t = match pol {
749        RetryPolicy::Default => Trailer::Std { slot, id },
750        RetryPolicy::NoRetry => Trailer::NoAck { slot },
751    };
752    let mut msg = BytesMut::from(Bytes::from(bytes));
753    msg.extend_from_slice(t.to_bytes().as_ref());
754    msg.freeze()
755}
756
757fn remove_trailer(mut bytes: Bytes) -> Bytes {
758    let _t = Trailer::from_bytes(&mut bytes);
759    debug_assert!(_t.is_some());
760    bytes
761}