Skip to main content

cliquenet/net/
peer.rs

1mod delay;
2
3#[cfg(test)]
4mod tests;
5
6use std::{
7    cmp::min, collections::VecDeque, future::ready, io, mem, net::SocketAddr, num::NonZeroUsize,
8    sync::Arc, time::Duration,
9};
10
11use bon::bon;
12use bytes::{Bytes, BytesMut};
13use delay::DelayQueue;
14use snow::TransportState;
15use tokio::{
16    select,
17    sync::{OwnedSemaphorePermit, Semaphore, mpsc::UnboundedSender, watch},
18    time::{MissedTickBehavior, interval},
19};
20use tokio_util::sync::CancellationToken;
21use tracing::{info, trace, warn};
22
23use crate::{
24    Config, Metrics, PublicKey,
25    connection::Connection,
26    error::{Empty, NetworkError},
27    msg::{
28        Ack, FrameType, Header, MAX_NOISE_MESSAGE_SIZE, MAX_PAYLOAD_SIZE, RESERVED_TAG_SIZE, Slot,
29        Trailer,
30    },
31    net::{PeerMessage, RetryPolicy},
32    queue::Queue,
33    time::Countdown,
34};
35
36type NoiseBuf = Box<[u8; MAX_NOISE_MESSAGE_SIZE]>;
37type Result<T> = std::result::Result<T, NetworkError>;
38
39/// A peer sends and receives messages over a connection with a remote.
40///
41/// Peers are initialised with a [`Connection`], which can be replaced
42/// later, should it be necessary.
43///
44/// Messages sent are expected to be acknowledged, i.e. the remote needs to
45/// send back an ACK frame. Otherwise the message is resent after some time.
46pub struct Peer {
47    /// Network configuration.
48    conf: Arc<Config>,
49
50    /// A budget limits how many message a peer can deliver to the application.
51    budget: Budget,
52
53    /// The current TCP+Noise connection.
54    conn: Connection,
55
56    /// Messages the application wants to be sent to the remote.
57    msgs: Queue<(RetryPolicy, Bytes)>,
58
59    /// Messages waiting to be retried if no ACK has been received.
60    retry: DelayQueue,
61
62    /// Receive notifications about changes to the GC threshold
63    next_slot: watch::Receiver<Slot>,
64
65    /// Our current GC threshold.
66    lower_bound: Slot,
67
68    /// The channel over which to deliver inbound messages to the application.
69    tx: UnboundedSender<PeerMessage>,
70
71    /// A healthcheck countdown.
72    ///
73    /// When dropped to 0 the connection should be replaced.
74    countdown: Countdown,
75
76    /// Token to interrupt processing.
77    cancel: CancellationToken,
78
79    /// The true max. message size.
80    ///
81    /// It accounts for the additional `Trailer` bytes.
82    max_message_size: usize,
83
84    metrics: Arc<dyn Metrics>,
85}
86
87/// A budget limits how many messages a peer can delivery to the application.
88pub struct Budget(Arc<Semaphore>);
89
90impl Budget {
91    pub fn new(amount: NonZeroUsize) -> Self {
92        Self(Arc::new(Semaphore::new(amount.get())))
93    }
94
95    fn remaining(&self) -> usize {
96        self.0.available_permits()
97    }
98}
99
100#[bon]
101impl Peer {
102    #[builder]
103    pub fn new(
104        config: Arc<Config>,
105        budget: NonZeroUsize,
106        messages: Queue<(RetryPolicy, Bytes)>,
107        inbound: UnboundedSender<PeerMessage>,
108        next_slot: watch::Receiver<Slot>,
109        connection: Connection,
110        metrics: Arc<dyn Metrics>,
111    ) -> Self {
112        Self {
113            max_message_size: config
114                .max_message_size
115                .get()
116                .saturating_add(Trailer::MAX_SIZE),
117            conf: config.clone(),
118            budget: Budget::new(budget),
119            next_slot,
120            lower_bound: Slot::MIN,
121            tx: inbound,
122            conn: connection,
123            retry: DelayQueue::new(config),
124            msgs: messages,
125            countdown: Countdown::new(),
126            cancel: CancellationToken::new(),
127            metrics,
128        }
129    }
130
131    /// Replace the existing connection.
132    pub fn set_connection(&mut self, c: Connection) {
133        self.conn = c;
134        self.retry.reset();
135        self.cancel = CancellationToken::new()
136    }
137
138    /// Get a token to interrupt processing.
139    pub fn cancel_token(&self) -> CancellationToken {
140        self.cancel.clone()
141    }
142
143    /// Get the peer's public key.
144    pub fn public_key(&self) -> &PublicKey {
145        &self.conn.key
146    }
147
148    /// Get the peer's socket address.
149    pub fn socket_addr(&self) -> &SocketAddr {
150        &self.conn.addr
151    }
152
153    /// Start I/O with a connected peer.
154    ///
155    /// This method continues until an error occurs, after which callers may
156    /// want to reconnect and resume peer operation with a new `Connection`.
157    pub async fn start(&mut self) -> Result<Empty> {
158        /// Messages are broken into frames and each frame has a header.
159        ///
160        /// The `ReadState` tracks what we have received from the remote.
161        enum ReadState<'a> {
162            Header {
163                off: usize,
164                buf: [u8; Header::SIZE],
165            },
166            Frame {
167                hdr: Header,
168                typ: FrameType,
169                off: usize,
170                buf: &'a mut Vec<u8>,
171            },
172        }
173
174        /// This state tracks what we have sent to the remote. We currently
175        /// support either data or ACK frames. The offset and length values
176        /// are relative to the write buffer (see below).
177        enum WriteState {
178            /// No write operation is in progress.
179            Idle,
180            /// An ACK frame is sent.
181            Ack { off: usize, len: usize },
182            /// A data frame is sent.
183            Data { off: usize, len: usize },
184        }
185
186        impl WriteState {
187            fn is_idle(&self) -> bool {
188                matches!(self, Self::Idle)
189            }
190
191            /// Encrypt a single data frame with Noise.
192            fn data_frame(
193                data: &[u8],
194                is_partial: bool,
195                state: &mut TransportState,
196                buf: &mut NoiseBuf,
197            ) -> Result<Self> {
198                let n = state.write_message(data, &mut buf[Header::SIZE..])?;
199                let h = if is_partial {
200                    Header::data(n as u16).partial()
201                } else {
202                    Header::data(n as u16)
203                };
204                buf[..Header::SIZE].copy_from_slice(&h.to_bytes());
205                Ok(Self::Data {
206                    off: 0,
207                    len: n + Header::SIZE,
208                })
209            }
210
211            /// Encrypt a single ACK frame with Noise.
212            fn ack_frame(a: Ack, state: &mut TransportState, buf: &mut NoiseBuf) -> Result<Self> {
213                let n = state.write_message(&a.0, &mut buf[Header::SIZE..])?;
214                let h = Header::ack(n as u16);
215                buf[..Header::SIZE].copy_from_slice(&h.to_bytes());
216                Ok(Self::Ack {
217                    off: 0,
218                    len: n + Header::SIZE,
219                })
220            }
221        }
222
223        // Early check if we already got cancelled which can happen with many
224        // simultaneous connects where we need to drop connections.
225        if self.cancel.is_cancelled() {
226            return Err(NetworkError::PeerInterrupt);
227        }
228
229        // Write buffer (Noise packages are limited to 64KiB).
230        let mut wbuf = NoiseBuf::new([0; _]);
231
232        // Ack buffer.
233        let mut abuf = [0; size_of::<Ack>() + RESERVED_TAG_SIZE];
234
235        // A frame buffer for reading before it is decrypted.
236        let mut fbuf = Vec::new();
237
238        // An incoming message that is assembled from its frames.
239        let mut ibound_msg = BytesMut::new();
240
241        // An outgoing message. The integer tracks the chunk size as we need
242        // to break the message into frames that fit into a noise package.
243        let mut obound_msg: Option<(RetryPolicy, Bytes, usize)> = None;
244
245        // Pending outbound ACK messages. This is appended when we received a
246        // message and picked up and interleaved when sending frames or else
247        // at the start of the loop (see below).
248        let mut obound_acks: VecDeque<Ack> = VecDeque::new();
249
250        // Track write and read states:
251        let mut wstate = WriteState::Idle;
252        let mut rstate = ReadState::Header {
253            off: 0,
254            buf: [0; _],
255        };
256
257        // Each read needs a permit taken from our budget in order to deliver
258        // to the application.
259        let mut read_permit: Option<OwnedSemaphorePermit> = None;
260
261        // Measure time.
262        //
263        // Used to trigger resends of messages that have not been ACKed yet.
264        let mut clock = interval(Duration::from_secs(1));
265        clock.set_missed_tick_behavior(MissedTickBehavior::Skip);
266
267        // Ensure any previously fired countdown is reset.
268        self.countdown.stop();
269
270        loop {
271            trace!(
272                name     = %self.conf.name,
273                peer     = %self.conn.key,
274                addr     = %self.conn.addr,
275                writing  = %!wstate.is_idle(),
276                acks     = %obound_acks.len(),
277                retries  = %self.retry.len(),
278                can_read = %read_permit.is_some(),
279                "entering event loop"
280            );
281
282            select! {
283                // Wait for the next message if no write is in progress.
284                //
285                // We limit writing if we expect too many ACKs that the remote
286                // has not sent yet. This is to prevent an attack were a
287                // malicious peer never sends ACKs, which would cause our
288                // delay queue to grow unbounded.
289                m = self.msgs.dequeue(), if wstate.is_idle() && self.retry.len() < self.conf.peer_budget.get() => {
290                    trace!(name = %self.conf.name, peer = %self.conn.key, "next outbound message");
291                    let (slot, id, (policy, bytes)) = m;
292                    if policy.is_retry() {
293                        self.retry.add(slot, id, bytes.clone());
294                    }
295                    let chunk = min(bytes.len(), MAX_PAYLOAD_SIZE);
296                    wstate = WriteState::data_frame(
297                        &bytes[..chunk],
298                        chunk < bytes.len(),
299                        &mut self.conn.state,
300                        &mut wbuf
301                    )?;
302                    obound_msg = Some((policy, bytes, chunk))
303                }
304
305                // Pick up an ACK and send it if possible.
306                () = ready(()), if wstate.is_idle() && !obound_acks.is_empty() => {
307                    trace!(name = %self.conf.name, peer = %self.conn.key, "next outbound ack");
308                    let ack = obound_acks.pop_front().expect("obound_acks is not empty");
309                    wstate = WriteState::ack_frame(ack, &mut self.conn.state, &mut wbuf)?
310                }
311
312                // If requested, interrupt all I/O processing.
313                //
314                // Once a peer has been interrupted, its connection needs to be
315                // replaced before calling start again.
316                _ = self.cancel.cancelled() => {
317                    info!(name = %self.conf.name, peer = %self.conn.key, "peer interrupt");
318                    return Err(NetworkError::PeerInterrupt)
319                }
320
321                // Wait for a slot change, signaling GC.
322                r = self.next_slot.changed() => {
323                    trace!(name = %self.conf.name, peer = %self.conn.key, "gc");
324                    if r.is_err() {
325                        return Err(NetworkError::ChannelClosed)
326                    }
327                    let s = *self.next_slot.borrow_and_update();
328                    debug_assert!(s > self.lower_bound);
329                    self.lower_bound = s;
330                    self.retry.gc(s);
331                }
332
333                // Periodic maintenance:
334                //
335                // - Retry messages
336                // - Update metrics
337                t = clock.tick() => {
338                    if !self.retry.is_empty() {
339                        trace!(name = %self.conf.name, peer = %self.conn.key, "retry check");
340                        self.retry.check(t);
341                    }
342                    self.update_metrics()
343                }
344
345                // If messages should be re-sent and we can do so, send it:
346                //
347                // This is separate from the retry check above because the check
348                // scans multiple items and here we just take the next one that
349                // is ready and go with it.
350                () = ready(()), if self.retry.is_ready() && wstate.is_idle() => {
351                    trace!(name = %self.conf.name, peer = %self.conn.key, "resending message");
352                    let Some(bytes) = self.retry.next() else {
353                        continue
354                    };
355                    let chunk = min(bytes.len(), MAX_PAYLOAD_SIZE);
356                    wstate = WriteState::data_frame(
357                        &bytes[..chunk],
358                        chunk < bytes.len(),
359                        &mut self.conn.state,
360                        &mut wbuf
361                    )?;
362                    obound_msg = Some((RetryPolicy::Default, bytes, chunk))
363                }
364
365                // Continue an ongoing write operation.
366                r = self.conn.stream.writable(), if !wstate.is_idle() => {
367                    trace!(name = %self.conf.name, peer = %self.conn.key, "continue writing");
368                    if let Err(e) = r {
369                        return Err(e.into())
370                    }
371                    match &mut wstate {
372                        WriteState::Ack { off, len } => {
373                            match self.conn.stream.try_write(&wbuf[*off..*len]) {
374                                Ok(n) => {
375                                    *off += n;
376                                    if *off < *len {
377                                        continue
378                                    }
379                                    if let Some((policy, bytes, chunk)) = &mut obound_msg {
380                                        if *chunk < bytes.len() {
381                                            let end = min(*chunk + MAX_PAYLOAD_SIZE, bytes.len());
382                                            wstate = WriteState::data_frame(
383                                                &bytes[*chunk..end],
384                                                end < bytes.len(),
385                                                &mut self.conn.state,
386                                                &mut wbuf
387                                            )?;
388                                            *chunk = end;
389                                        } else {
390                                            if policy.is_retry() {
391                                                self.countdown.start(self.conf.receive_timeout)
392                                            }
393                                            obound_msg = None;
394                                            wstate = WriteState::Idle;
395                                        }
396                                    } else {
397                                        wstate = WriteState::Idle;
398                                    }
399                                }
400                                Err(e) => if e.kind() != io::ErrorKind::WouldBlock {
401                                    return Err(e.into())
402                                }
403                            }
404                        }
405                        WriteState::Data { off, len } => {
406                            match self.conn.stream.try_write(&wbuf[*off..*len]) {
407                                Ok(n) => {
408                                    *off += n;
409                                    if *off < *len {
410                                        continue
411                                    }
412                                    if let Some(ack) = obound_acks.pop_front() {
413                                        wstate = WriteState::ack_frame(ack, &mut self.conn.state, &mut wbuf)?
414                                    } else if let Some((policy, bytes, chunk)) = &mut obound_msg {
415                                        if *chunk < bytes.len() {
416                                            let end = min(*chunk + MAX_PAYLOAD_SIZE, bytes.len());
417                                            wstate = WriteState::data_frame(
418                                                &bytes[*chunk..end],
419                                                end < bytes.len(),
420                                                &mut self.conn.state,
421                                                &mut wbuf
422                                            )?;
423                                            *chunk = end;
424                                        } else {
425                                            if policy.is_retry() {
426                                                self.countdown.start(self.conf.receive_timeout)
427                                            }
428                                            obound_msg = None;
429                                            wstate = WriteState::Idle;
430                                        }
431                                    } else {
432                                        wstate = WriteState::Idle;
433                                    }
434                                }
435                                Err(e) => if e.kind() != io::ErrorKind::WouldBlock {
436                                    return Err(e.into())
437                                }
438                            }
439                        },
440                        WriteState::Idle => { /* unreachable!() */ }
441                    }
442                }
443
444                // Wait for the healthcheck countdown to finish.
445                //
446                // The countdown is started after writing a message and reset
447                // when we received a frame.
448                () = &mut self.countdown => {
449                    trace!(name = %self.conf.name, peer = %self.conn.key, "read timeout");
450                    return Err(NetworkError::Timeout)
451                }
452
453                // Await the next read permit.
454                //
455                // If our budget is used up, we need to wait for capacity to become
456                // available before we can continue to read from the socket (and
457                // eventually deliver the message to the application).
458                p = self.budget.0.clone().acquire_owned(), if read_permit.is_none() => {
459                    trace!(name = %self.conf.name, peer = %self.conn.key, "next read permit");
460                    read_permit = Some(p.map_err(|_| NetworkError::BudgetClosed)?);
461                }
462
463                // Continue reading from the socket if possible.
464                //
465                // NB that we require the ACKs that we have appended before to
466                // picked up. This should be very fast as writing interleaves
467                // ACKs in between frames. We do this to exercise backpressure
468                // because if the remote does not or can not read what we write
469                // but keeps sending us data we would accumulate ACKs without
470                // bound.
471                r = self.conn.stream.readable(), if read_permit.is_some() => {
472                    trace!(name = %self.conf.name, peer = %self.conn.key, "continue reading");
473                    if let Err(e) = r {
474                        return Err(e.into())
475                    }
476                    if obound_acks.len() > self.conf.peer_budget.get() {
477                        return Err(NetworkError::TooManyPendingAcks(self.conn.key))
478                    }
479                    match &mut rstate {
480                        ReadState::Header { off, buf } => {
481                            match self.conn.stream.try_read(&mut buf[*off..]) {
482                                Ok(0) => {
483                                    let e = io::ErrorKind::UnexpectedEof.into();
484                                    return Err(NetworkError::Io(e))
485                                }
486                                Ok(n) => {
487                                    self.countdown.stop();
488                                    *off += n;
489                                    if *off < buf.len() {
490                                        continue
491                                    }
492                                    let hdr = Header::unvalidated(u32::from_be_bytes(*buf));
493                                    let typ = match hdr.frame_type() {
494                                        Ok(FrameType::Data) => FrameType::Data,
495                                        Ok(FrameType::Ack) => {
496                                            if hdr.is_partial() {
497                                                warn!(
498                                                    name = %self.conf.name,
499                                                    node = %self.conf.keypair.public_key(),
500                                                    peer = %self.conn.key,
501                                                    addr = %self.conn.addr,
502                                                    "ACK header marked as partial"
503                                                );
504                                                return Err(NetworkError::InvalidAck)
505                                            }
506                                            if hdr.len() as usize > abuf.len() {
507                                                warn!(
508                                                    name = %self.conf.name,
509                                                    node = %self.conf.keypair.public_key(),
510                                                    peer = %self.conn.key,
511                                                    addr = %self.conn.addr,
512                                                    len  = %hdr.len(),
513                                                    "ACK header length too large"
514                                                );
515                                                return Err(NetworkError::InvalidAck)
516                                            }
517                                            FrameType::Ack
518                                        }
519                                        Err(typ) => {
520                                            return Err(NetworkError::UnknownFrameType(typ))
521                                        }
522                                    };
523                                    fbuf.resize(hdr.len().into(), 0);
524                                    rstate = ReadState::Frame { hdr, typ, off: 0, buf: &mut fbuf }
525                                }
526                                Err(e) => if e.kind() != io::ErrorKind::WouldBlock {
527                                    return Err(e.into())
528                                }
529                            }
530                        }
531                        ReadState::Frame { hdr, typ, off, buf } => {
532                            match self.conn.stream.try_read(&mut buf[*off..]) {
533                                Ok(0) => {
534                                    let e = io::ErrorKind::UnexpectedEof.into();
535                                    return Err(NetworkError::Io(e))
536                                }
537                                Ok(n) => {
538                                    self.countdown.stop();
539                                    *off += n;
540                                    if *off < buf.len() {
541                                        continue
542                                    }
543                                    match typ {
544                                        FrameType::Data => {
545                                            let n = buf.len();
546                                            let i = ibound_msg.len();
547                                            ibound_msg.resize(i + n, 0);
548
549                                            let n = self.conn.state.read_message(buf, &mut ibound_msg[i..])?;
550                                            ibound_msg.truncate(i + n);
551
552                                            if ibound_msg.len() > self.max_message_size {
553                                                return Err(NetworkError::MessageTooLarge)
554                                            }
555
556                                            if !hdr.is_partial() { // message complete
557                                                let mut msg = mem::take(&mut ibound_msg).freeze();
558                                                let Some(t) = Trailer::from_bytes(&mut msg) else {
559                                                    warn!(
560                                                        name = %self.conf.name,
561                                                        node = %self.conf.keypair.public_key(),
562                                                        peer = %self.conn.key,
563                                                        addr = %self.conn.addr,
564                                                        "invalid trailer"
565                                                    );
566                                                    return Err(NetworkError::InvalidTrailer);
567                                                };
568                                                let slot = match t {
569                                                    Trailer::Std { slot, id } => {
570                                                        obound_acks.push_back(Ack::from((slot, id)));
571                                                        Some(slot)
572                                                    }
573                                                    Trailer::NoAck { slot } => Some(slot),
574                                                    Trailer::Unknown => None
575                                                };
576                                                if let Some(s) = slot && s < self.lower_bound {
577                                                    rstate = ReadState::Header { off: 0, buf: [0; _] };
578                                                    continue
579                                                }
580                                                let p = read_permit.take();
581                                                debug_assert!(p.is_some());
582                                                if self.tx.send((self.conn.key, msg, p)).is_err() {
583                                                    return Err(NetworkError::ChannelClosed)
584                                                }
585                                                trace!(
586                                                    name = %self.conf.name,
587                                                    node = %self.conf.keypair.public_key(),
588                                                    peer = %self.conn.key,
589                                                    addr = %self.conn.addr,
590                                                    "message delivered"
591                                                );
592                                            }
593                                            rstate = ReadState::Header { off: 0, buf: [0; _] };
594                                        }
595                                        FrameType::Ack => {
596                                            let n = self.conn.state.read_message(buf, &mut abuf)?;
597                                            let Ok(a) = Ack::try_from(&abuf[..n]) else {
598                                                return Err(NetworkError::InvalidAck)
599                                            };
600                                            let (s, i) = a.into();
601                                            self.retry.del(s, i);
602                                            rstate = ReadState::Header { off: 0, buf: [0; _] };
603                                        }
604                                    }
605                                }
606                                Err(e) => if e.kind() != io::ErrorKind::WouldBlock {
607                                    return Err(e.into())
608                                }
609                            }
610                        }
611                    }
612                }
613            }
614        }
615    }
616
617    fn update_metrics(&self) {
618        self.metrics
619            .set(&self.conn.key, "outbound_messages", self.msgs.len());
620        self.metrics
621            .set(&self.conn.key, "retrying_messages", self.retry.len());
622        self.metrics
623            .set(&self.conn.key, "remaining_budget", self.budget.remaining());
624    }
625}