Skip to main content

cliquenet/net/peer/
delay.rs

1use std::{collections::BTreeMap, sync::Arc, time::Duration};
2
3use bytes::Bytes;
4use tokio::time::Instant;
5
6use crate::{
7    Config,
8    msg::{MsgId, Slot},
9};
10
11/// Number of due items to have available at once.
12const READY_BATCH: usize = 32;
13
14pub struct DelayQueue {
15    conf: Arc<Config>,
16    /// All items that need to be resend at some point.
17    items: BTreeMap<(Slot, MsgId), RetryItem>,
18    /// Items that are due and should be sent asap.
19    ready: BTreeMap<(Slot, MsgId), Bytes>,
20}
21
22struct RetryItem {
23    data: Bytes,
24    /// Next due time.
25    timeout: Instant,
26    /// The number of times an item has been resent already.
27    retries: usize,
28}
29
30impl DelayQueue {
31    pub fn new(c: Arc<Config>) -> Self {
32        Self {
33            conf: c,
34            items: BTreeMap::new(),
35            ready: BTreeMap::new(),
36        }
37    }
38
39    pub fn len(&self) -> usize {
40        self.items.len()
41    }
42
43    pub fn is_empty(&self) -> bool {
44        self.items.is_empty()
45    }
46
47    pub fn is_ready(&self) -> bool {
48        !self.ready.is_empty()
49    }
50
51    pub fn add(&mut self, s: Slot, i: MsgId, b: Bytes) {
52        let t = timeout(&self.conf, 0);
53        let r = RetryItem {
54            data: b,
55            timeout: t,
56            retries: 0,
57        };
58        self.items.insert((s, i), r);
59    }
60
61    pub fn del(&mut self, s: Slot, i: MsgId) {
62        self.items.remove(&(s, i));
63        self.ready.remove(&(s, i));
64    }
65
66    pub fn reset(&mut self) {
67        let now = Instant::now();
68        for x in self.items.values_mut() {
69            x.retries = 0;
70            x.timeout = now;
71        }
72    }
73
74    /// Get any due item to resend.
75    pub fn next(&mut self) -> Option<Bytes> {
76        self.ready.pop_first().map(|(_, b)| b)
77    }
78
79    /// Scan all items to find the next ones that are due.
80    ///
81    /// Scanning stops when a `READY_BATCH` is full. Since scanning happens
82    /// less frequently (about once per second) and is O(n) in the worst case,
83    /// we make several items that are due available for quick access.
84    pub fn check(&mut self, now: Instant) {
85        let mut n = self.ready.len();
86        for (k, x) in &mut self.items {
87            if n >= READY_BATCH {
88                break;
89            }
90            if x.timeout > now {
91                continue;
92            }
93            x.retries = x.retries.saturating_add(1);
94            x.timeout = timeout(&self.conf, x.retries);
95            self.ready.entry(*k).or_insert(x.data.clone());
96            n += 1
97        }
98    }
99
100    pub fn gc(&mut self, s: Slot) {
101        let key = (s, MsgId(0));
102        self.items = self.items.split_off(&key);
103        self.ready = self.ready.split_off(&key)
104    }
105}
106
107fn timeout(conf: &Config, at: usize) -> Instant {
108    let d = *conf
109        .send_retry_delays
110        .get(at)
111        .unwrap_or_else(|| conf.send_retry_delays.last());
112
113    Instant::now() + Duration::from_secs(d.into())
114}