cliquenet/net/peer/
delay.rs1use std::{collections::BTreeMap, sync::Arc, time::Duration};
2
3use bytes::Bytes;
4use tokio::time::Instant;
5
6use crate::{
7 Config,
8 msg::{MsgId, Slot},
9};
10
11const READY_BATCH: usize = 32;
13
14pub struct DelayQueue {
15 conf: Arc<Config>,
16 items: BTreeMap<(Slot, MsgId), RetryItem>,
18 ready: BTreeMap<(Slot, MsgId), Bytes>,
20}
21
22struct RetryItem {
23 data: Bytes,
24 timeout: Instant,
26 retries: usize,
28}
29
30impl DelayQueue {
31 pub fn new(c: Arc<Config>) -> Self {
32 Self {
33 conf: c,
34 items: BTreeMap::new(),
35 ready: BTreeMap::new(),
36 }
37 }
38
39 pub fn len(&self) -> usize {
40 self.items.len()
41 }
42
43 pub fn is_empty(&self) -> bool {
44 self.items.is_empty()
45 }
46
47 pub fn is_ready(&self) -> bool {
48 !self.ready.is_empty()
49 }
50
51 pub fn add(&mut self, s: Slot, i: MsgId, b: Bytes) {
52 let t = timeout(&self.conf, 0);
53 let r = RetryItem {
54 data: b,
55 timeout: t,
56 retries: 0,
57 };
58 self.items.insert((s, i), r);
59 }
60
61 pub fn del(&mut self, s: Slot, i: MsgId) {
62 self.items.remove(&(s, i));
63 self.ready.remove(&(s, i));
64 }
65
66 pub fn reset(&mut self) {
67 let now = Instant::now();
68 for x in self.items.values_mut() {
69 x.retries = 0;
70 x.timeout = now;
71 }
72 }
73
74 pub fn next(&mut self) -> Option<Bytes> {
76 self.ready.pop_first().map(|(_, b)| b)
77 }
78
79 pub fn check(&mut self, now: Instant) {
85 let mut n = self.ready.len();
86 for (k, x) in &mut self.items {
87 if n >= READY_BATCH {
88 break;
89 }
90 if x.timeout > now {
91 continue;
92 }
93 x.retries = x.retries.saturating_add(1);
94 x.timeout = timeout(&self.conf, x.retries);
95 self.ready.entry(*k).or_insert(x.data.clone());
96 n += 1
97 }
98 }
99
100 pub fn gc(&mut self, s: Slot) {
101 let key = (s, MsgId(0));
102 self.items = self.items.split_off(&key);
103 self.ready = self.ready.split_off(&key)
104 }
105}
106
107fn timeout(conf: &Config, at: usize) -> Instant {
108 let d = *conf
109 .send_retry_delays
110 .get(at)
111 .unwrap_or_else(|| conf.send_retry_delays.last());
112
113 Instant::now() + Duration::from_secs(d.into())
114}