cliquenet/net/peer.rs
1mod delay;
2
3#[cfg(test)]
4mod tests;
5
6use std::{
7 cmp::min, collections::VecDeque, future::ready, io, mem, net::SocketAddr, num::NonZeroUsize,
8 sync::Arc, time::Duration,
9};
10
11use bon::bon;
12use bytes::{Bytes, BytesMut};
13use delay::DelayQueue;
14use snow::TransportState;
15use tokio::{
16 select,
17 sync::{OwnedSemaphorePermit, Semaphore, mpsc::UnboundedSender, watch},
18 time::{MissedTickBehavior, interval},
19};
20use tokio_util::sync::CancellationToken;
21use tracing::{info, trace, warn};
22
23use crate::{
24 Config, Metrics, PublicKey,
25 connection::Connection,
26 error::{Empty, NetworkError},
27 msg::{
28 Ack, FrameType, Header, MAX_NOISE_MESSAGE_SIZE, MAX_PAYLOAD_SIZE, RESERVED_TAG_SIZE, Slot,
29 Trailer,
30 },
31 net::{PeerMessage, RetryPolicy},
32 queue::Queue,
33 time::Countdown,
34};
35
36type NoiseBuf = Box<[u8; MAX_NOISE_MESSAGE_SIZE]>;
37type Result<T> = std::result::Result<T, NetworkError>;
38
39/// A peer sends and receives messages over a connection with a remote.
40///
41/// Peers are initialised with a [`Connection`], which can be replaced
42/// later, should it be necessary.
43///
44/// Messages sent are expected to be acknowledged, i.e. the remote needs to
45/// send back an ACK frame. Otherwise the message is resent after some time.
46pub struct Peer {
47 /// Network configuration.
48 conf: Arc<Config>,
49
50 /// A budget limits how many message a peer can deliver to the application.
51 budget: Budget,
52
53 /// The current TCP+Noise connection.
54 conn: Connection,
55
56 /// Messages the application wants to be sent to the remote.
57 msgs: Queue<(RetryPolicy, Bytes)>,
58
59 /// Messages waiting to be retried if no ACK has been received.
60 retry: DelayQueue,
61
62 /// Receive notifications about changes to the GC threshold
63 next_slot: watch::Receiver<Slot>,
64
65 /// Our current GC threshold.
66 lower_bound: Slot,
67
68 /// The channel over which to deliver inbound messages to the application.
69 tx: UnboundedSender<PeerMessage>,
70
71 /// A healthcheck countdown.
72 ///
73 /// When dropped to 0 the connection should be replaced.
74 countdown: Countdown,
75
76 /// Token to interrupt processing.
77 cancel: CancellationToken,
78
79 /// The true max. message size.
80 ///
81 /// It accounts for the additional `Trailer` bytes.
82 max_message_size: usize,
83
84 metrics: Arc<dyn Metrics>,
85}
86
87/// A budget limits how many messages a peer can delivery to the application.
88pub struct Budget(Arc<Semaphore>);
89
90impl Budget {
91 pub fn new(amount: NonZeroUsize) -> Self {
92 Self(Arc::new(Semaphore::new(amount.get())))
93 }
94
95 fn remaining(&self) -> usize {
96 self.0.available_permits()
97 }
98}
99
100#[bon]
101impl Peer {
102 #[builder]
103 pub fn new(
104 config: Arc<Config>,
105 budget: NonZeroUsize,
106 messages: Queue<(RetryPolicy, Bytes)>,
107 inbound: UnboundedSender<PeerMessage>,
108 next_slot: watch::Receiver<Slot>,
109 connection: Connection,
110 metrics: Arc<dyn Metrics>,
111 ) -> Self {
112 Self {
113 max_message_size: config
114 .max_message_size
115 .get()
116 .saturating_add(Trailer::MAX_SIZE),
117 conf: config.clone(),
118 budget: Budget::new(budget),
119 next_slot,
120 lower_bound: Slot::MIN,
121 tx: inbound,
122 conn: connection,
123 retry: DelayQueue::new(config),
124 msgs: messages,
125 countdown: Countdown::new(),
126 cancel: CancellationToken::new(),
127 metrics,
128 }
129 }
130
131 /// Replace the existing connection.
132 pub fn set_connection(&mut self, c: Connection) {
133 self.conn = c;
134 self.retry.reset();
135 self.cancel = CancellationToken::new()
136 }
137
138 /// Get a token to interrupt processing.
139 pub fn cancel_token(&self) -> CancellationToken {
140 self.cancel.clone()
141 }
142
143 /// Get the peer's public key.
144 pub fn public_key(&self) -> &PublicKey {
145 &self.conn.key
146 }
147
148 /// Get the peer's socket address.
149 pub fn socket_addr(&self) -> &SocketAddr {
150 &self.conn.addr
151 }
152
153 /// Start I/O with a connected peer.
154 ///
155 /// This method continues until an error occurs, after which callers may
156 /// want to reconnect and resume peer operation with a new `Connection`.
157 pub async fn start(&mut self) -> Result<Empty> {
158 /// Messages are broken into frames and each frame has a header.
159 ///
160 /// The `ReadState` tracks what we have received from the remote.
161 enum ReadState<'a> {
162 Header {
163 off: usize,
164 buf: [u8; Header::SIZE],
165 },
166 Frame {
167 hdr: Header,
168 typ: FrameType,
169 off: usize,
170 buf: &'a mut Vec<u8>,
171 },
172 }
173
174 /// This state tracks what we have sent to the remote. We currently
175 /// support either data or ACK frames. The offset and length values
176 /// are relative to the write buffer (see below).
177 enum WriteState {
178 /// No write operation is in progress.
179 Idle,
180 /// An ACK frame is sent.
181 Ack { off: usize, len: usize },
182 /// A data frame is sent.
183 Data { off: usize, len: usize },
184 }
185
186 impl WriteState {
187 fn is_idle(&self) -> bool {
188 matches!(self, Self::Idle)
189 }
190
191 /// Encrypt a single data frame with Noise.
192 fn data_frame(
193 data: &[u8],
194 is_partial: bool,
195 state: &mut TransportState,
196 buf: &mut NoiseBuf,
197 ) -> Result<Self> {
198 let n = state.write_message(data, &mut buf[Header::SIZE..])?;
199 let h = if is_partial {
200 Header::data(n as u16).partial()
201 } else {
202 Header::data(n as u16)
203 };
204 buf[..Header::SIZE].copy_from_slice(&h.to_bytes());
205 Ok(Self::Data {
206 off: 0,
207 len: n + Header::SIZE,
208 })
209 }
210
211 /// Encrypt a single ACK frame with Noise.
212 fn ack_frame(a: Ack, state: &mut TransportState, buf: &mut NoiseBuf) -> Result<Self> {
213 let n = state.write_message(&a.0, &mut buf[Header::SIZE..])?;
214 let h = Header::ack(n as u16);
215 buf[..Header::SIZE].copy_from_slice(&h.to_bytes());
216 Ok(Self::Ack {
217 off: 0,
218 len: n + Header::SIZE,
219 })
220 }
221 }
222
223 // Early check if we already got cancelled which can happen with many
224 // simultaneous connects where we need to drop connections.
225 if self.cancel.is_cancelled() {
226 return Err(NetworkError::PeerInterrupt);
227 }
228
229 // Write buffer (Noise packages are limited to 64KiB).
230 let mut wbuf = NoiseBuf::new([0; _]);
231
232 // Ack buffer.
233 let mut abuf = [0; size_of::<Ack>() + RESERVED_TAG_SIZE];
234
235 // A frame buffer for reading before it is decrypted.
236 let mut fbuf = Vec::new();
237
238 // An incoming message that is assembled from its frames.
239 let mut ibound_msg = BytesMut::new();
240
241 // An outgoing message. The integer tracks the chunk size as we need
242 // to break the message into frames that fit into a noise package.
243 let mut obound_msg: Option<(RetryPolicy, Bytes, usize)> = None;
244
245 // Pending outbound ACK messages. This is appended when we received a
246 // message and picked up and interleaved when sending frames or else
247 // at the start of the loop (see below).
248 let mut obound_acks: VecDeque<Ack> = VecDeque::new();
249
250 // Track write and read states:
251 let mut wstate = WriteState::Idle;
252 let mut rstate = ReadState::Header {
253 off: 0,
254 buf: [0; _],
255 };
256
257 // Each read needs a permit taken from our budget in order to deliver
258 // to the application.
259 let mut read_permit: Option<OwnedSemaphorePermit> = None;
260
261 // Measure time.
262 //
263 // Used to trigger resends of messages that have not been ACKed yet.
264 let mut clock = interval(Duration::from_secs(1));
265 clock.set_missed_tick_behavior(MissedTickBehavior::Skip);
266
267 // Ensure any previously fired countdown is reset.
268 self.countdown.stop();
269
270 loop {
271 trace!(
272 name = %self.conf.name,
273 peer = %self.conn.key,
274 addr = %self.conn.addr,
275 writing = %!wstate.is_idle(),
276 acks = %obound_acks.len(),
277 retries = %self.retry.len(),
278 can_read = %read_permit.is_some(),
279 "entering event loop"
280 );
281
282 select! {
283 // Wait for the next message if no write is in progress.
284 //
285 // We limit writing if we expect too many ACKs that the remote
286 // has not sent yet. This is to prevent an attack were a
287 // malicious peer never sends ACKs, which would cause our
288 // delay queue to grow unbounded.
289 m = self.msgs.dequeue(), if wstate.is_idle() && self.retry.len() < self.conf.peer_budget.get() => {
290 trace!(name = %self.conf.name, peer = %self.conn.key, "next outbound message");
291 let (slot, id, (policy, bytes)) = m;
292 if policy.is_retry() {
293 self.retry.add(slot, id, bytes.clone());
294 }
295 let chunk = min(bytes.len(), MAX_PAYLOAD_SIZE);
296 wstate = WriteState::data_frame(
297 &bytes[..chunk],
298 chunk < bytes.len(),
299 &mut self.conn.state,
300 &mut wbuf
301 )?;
302 obound_msg = Some((policy, bytes, chunk))
303 }
304
305 // Pick up an ACK and send it if possible.
306 () = ready(()), if wstate.is_idle() && !obound_acks.is_empty() => {
307 trace!(name = %self.conf.name, peer = %self.conn.key, "next outbound ack");
308 let ack = obound_acks.pop_front().expect("obound_acks is not empty");
309 wstate = WriteState::ack_frame(ack, &mut self.conn.state, &mut wbuf)?
310 }
311
312 // If requested, interrupt all I/O processing.
313 //
314 // Once a peer has been interrupted, its connection needs to be
315 // replaced before calling start again.
316 _ = self.cancel.cancelled() => {
317 info!(name = %self.conf.name, peer = %self.conn.key, "peer interrupt");
318 return Err(NetworkError::PeerInterrupt)
319 }
320
321 // Wait for a slot change, signaling GC.
322 r = self.next_slot.changed() => {
323 trace!(name = %self.conf.name, peer = %self.conn.key, "gc");
324 if r.is_err() {
325 return Err(NetworkError::ChannelClosed)
326 }
327 let s = *self.next_slot.borrow_and_update();
328 debug_assert!(s > self.lower_bound);
329 self.lower_bound = s;
330 self.retry.gc(s);
331 }
332
333 // Periodic maintenance:
334 //
335 // - Retry messages
336 // - Update metrics
337 t = clock.tick() => {
338 if !self.retry.is_empty() {
339 trace!(name = %self.conf.name, peer = %self.conn.key, "retry check");
340 self.retry.check(t);
341 }
342 self.update_metrics()
343 }
344
345 // If messages should be re-sent and we can do so, send it:
346 //
347 // This is separate from the retry check above because the check
348 // scans multiple items and here we just take the next one that
349 // is ready and go with it.
350 () = ready(()), if self.retry.is_ready() && wstate.is_idle() => {
351 trace!(name = %self.conf.name, peer = %self.conn.key, "resending message");
352 let Some(bytes) = self.retry.next() else {
353 continue
354 };
355 let chunk = min(bytes.len(), MAX_PAYLOAD_SIZE);
356 wstate = WriteState::data_frame(
357 &bytes[..chunk],
358 chunk < bytes.len(),
359 &mut self.conn.state,
360 &mut wbuf
361 )?;
362 obound_msg = Some((RetryPolicy::Default, bytes, chunk))
363 }
364
365 // Continue an ongoing write operation.
366 r = self.conn.stream.writable(), if !wstate.is_idle() => {
367 trace!(name = %self.conf.name, peer = %self.conn.key, "continue writing");
368 if let Err(e) = r {
369 return Err(e.into())
370 }
371 match &mut wstate {
372 WriteState::Ack { off, len } => {
373 match self.conn.stream.try_write(&wbuf[*off..*len]) {
374 Ok(n) => {
375 *off += n;
376 if *off < *len {
377 continue
378 }
379 if let Some((policy, bytes, chunk)) = &mut obound_msg {
380 if *chunk < bytes.len() {
381 let end = min(*chunk + MAX_PAYLOAD_SIZE, bytes.len());
382 wstate = WriteState::data_frame(
383 &bytes[*chunk..end],
384 end < bytes.len(),
385 &mut self.conn.state,
386 &mut wbuf
387 )?;
388 *chunk = end;
389 } else {
390 if policy.is_retry() {
391 self.countdown.start(self.conf.receive_timeout)
392 }
393 obound_msg = None;
394 wstate = WriteState::Idle;
395 }
396 } else {
397 wstate = WriteState::Idle;
398 }
399 }
400 Err(e) => if e.kind() != io::ErrorKind::WouldBlock {
401 return Err(e.into())
402 }
403 }
404 }
405 WriteState::Data { off, len } => {
406 match self.conn.stream.try_write(&wbuf[*off..*len]) {
407 Ok(n) => {
408 *off += n;
409 if *off < *len {
410 continue
411 }
412 if let Some(ack) = obound_acks.pop_front() {
413 wstate = WriteState::ack_frame(ack, &mut self.conn.state, &mut wbuf)?
414 } else if let Some((policy, bytes, chunk)) = &mut obound_msg {
415 if *chunk < bytes.len() {
416 let end = min(*chunk + MAX_PAYLOAD_SIZE, bytes.len());
417 wstate = WriteState::data_frame(
418 &bytes[*chunk..end],
419 end < bytes.len(),
420 &mut self.conn.state,
421 &mut wbuf
422 )?;
423 *chunk = end;
424 } else {
425 if policy.is_retry() {
426 self.countdown.start(self.conf.receive_timeout)
427 }
428 obound_msg = None;
429 wstate = WriteState::Idle;
430 }
431 } else {
432 wstate = WriteState::Idle;
433 }
434 }
435 Err(e) => if e.kind() != io::ErrorKind::WouldBlock {
436 return Err(e.into())
437 }
438 }
439 },
440 WriteState::Idle => { /* unreachable!() */ }
441 }
442 }
443
444 // Wait for the healthcheck countdown to finish.
445 //
446 // The countdown is started after writing a message and reset
447 // when we received a frame.
448 () = &mut self.countdown => {
449 trace!(name = %self.conf.name, peer = %self.conn.key, "read timeout");
450 return Err(NetworkError::Timeout)
451 }
452
453 // Await the next read permit.
454 //
455 // If our budget is used up, we need to wait for capacity to become
456 // available before we can continue to read from the socket (and
457 // eventually deliver the message to the application).
458 p = self.budget.0.clone().acquire_owned(), if read_permit.is_none() => {
459 trace!(name = %self.conf.name, peer = %self.conn.key, "next read permit");
460 read_permit = Some(p.map_err(|_| NetworkError::BudgetClosed)?);
461 }
462
463 // Continue reading from the socket if possible.
464 //
465 // NB that we require the ACKs that we have appended before to
466 // picked up. This should be very fast as writing interleaves
467 // ACKs in between frames. We do this to exercise backpressure
468 // because if the remote does not or can not read what we write
469 // but keeps sending us data we would accumulate ACKs without
470 // bound.
471 r = self.conn.stream.readable(), if read_permit.is_some() => {
472 trace!(name = %self.conf.name, peer = %self.conn.key, "continue reading");
473 if let Err(e) = r {
474 return Err(e.into())
475 }
476 if obound_acks.len() > self.conf.peer_budget.get() {
477 return Err(NetworkError::TooManyPendingAcks(self.conn.key))
478 }
479 match &mut rstate {
480 ReadState::Header { off, buf } => {
481 match self.conn.stream.try_read(&mut buf[*off..]) {
482 Ok(0) => {
483 let e = io::ErrorKind::UnexpectedEof.into();
484 return Err(NetworkError::Io(e))
485 }
486 Ok(n) => {
487 self.countdown.stop();
488 *off += n;
489 if *off < buf.len() {
490 continue
491 }
492 let hdr = Header::unvalidated(u32::from_be_bytes(*buf));
493 let typ = match hdr.frame_type() {
494 Ok(FrameType::Data) => FrameType::Data,
495 Ok(FrameType::Ack) => {
496 if hdr.is_partial() {
497 warn!(
498 name = %self.conf.name,
499 node = %self.conf.keypair.public_key(),
500 peer = %self.conn.key,
501 addr = %self.conn.addr,
502 "ACK header marked as partial"
503 );
504 return Err(NetworkError::InvalidAck)
505 }
506 if hdr.len() as usize > abuf.len() {
507 warn!(
508 name = %self.conf.name,
509 node = %self.conf.keypair.public_key(),
510 peer = %self.conn.key,
511 addr = %self.conn.addr,
512 len = %hdr.len(),
513 "ACK header length too large"
514 );
515 return Err(NetworkError::InvalidAck)
516 }
517 FrameType::Ack
518 }
519 Err(typ) => {
520 return Err(NetworkError::UnknownFrameType(typ))
521 }
522 };
523 fbuf.resize(hdr.len().into(), 0);
524 rstate = ReadState::Frame { hdr, typ, off: 0, buf: &mut fbuf }
525 }
526 Err(e) => if e.kind() != io::ErrorKind::WouldBlock {
527 return Err(e.into())
528 }
529 }
530 }
531 ReadState::Frame { hdr, typ, off, buf } => {
532 match self.conn.stream.try_read(&mut buf[*off..]) {
533 Ok(0) => {
534 let e = io::ErrorKind::UnexpectedEof.into();
535 return Err(NetworkError::Io(e))
536 }
537 Ok(n) => {
538 self.countdown.stop();
539 *off += n;
540 if *off < buf.len() {
541 continue
542 }
543 match typ {
544 FrameType::Data => {
545 let n = buf.len();
546 let i = ibound_msg.len();
547 ibound_msg.resize(i + n, 0);
548
549 let n = self.conn.state.read_message(buf, &mut ibound_msg[i..])?;
550 ibound_msg.truncate(i + n);
551
552 if ibound_msg.len() > self.max_message_size {
553 return Err(NetworkError::MessageTooLarge)
554 }
555
556 if !hdr.is_partial() { // message complete
557 let mut msg = mem::take(&mut ibound_msg).freeze();
558 let Some(t) = Trailer::from_bytes(&mut msg) else {
559 warn!(
560 name = %self.conf.name,
561 node = %self.conf.keypair.public_key(),
562 peer = %self.conn.key,
563 addr = %self.conn.addr,
564 "invalid trailer"
565 );
566 return Err(NetworkError::InvalidTrailer);
567 };
568 let slot = match t {
569 Trailer::Std { slot, id } => {
570 obound_acks.push_back(Ack::from((slot, id)));
571 Some(slot)
572 }
573 Trailer::NoAck { slot } => Some(slot),
574 Trailer::Unknown => None
575 };
576 if let Some(s) = slot && s < self.lower_bound {
577 rstate = ReadState::Header { off: 0, buf: [0; _] };
578 continue
579 }
580 let p = read_permit.take();
581 debug_assert!(p.is_some());
582 if self.tx.send((self.conn.key, msg, p)).is_err() {
583 return Err(NetworkError::ChannelClosed)
584 }
585 trace!(
586 name = %self.conf.name,
587 node = %self.conf.keypair.public_key(),
588 peer = %self.conn.key,
589 addr = %self.conn.addr,
590 "message delivered"
591 );
592 }
593 rstate = ReadState::Header { off: 0, buf: [0; _] };
594 }
595 FrameType::Ack => {
596 let n = self.conn.state.read_message(buf, &mut abuf)?;
597 let Ok(a) = Ack::try_from(&abuf[..n]) else {
598 return Err(NetworkError::InvalidAck)
599 };
600 let (s, i) = a.into();
601 self.retry.del(s, i);
602 rstate = ReadState::Header { off: 0, buf: [0; _] };
603 }
604 }
605 }
606 Err(e) => if e.kind() != io::ErrorKind::WouldBlock {
607 return Err(e.into())
608 }
609 }
610 }
611 }
612 }
613 }
614 }
615 }
616
617 fn update_metrics(&self) {
618 self.metrics
619 .set(&self.conn.key, "outbound_messages", self.msgs.len());
620 self.metrics
621 .set(&self.conn.key, "retrying_messages", self.retry.len());
622 self.metrics
623 .set(&self.conn.key, "remaining_budget", self.budget.remaining());
624 }
625}