Skip to main content

cliquenet/
queue.rs

1use std::{collections::BTreeMap, sync::Arc};
2
3use parking_lot::Mutex;
4use tokio::sync::Notify;
5
6use crate::msg::{MsgId, Slot};
7
8#[derive(Debug)]
9pub struct Queue<T>(Arc<Inner<T>>);
10
11#[derive(Debug)]
12struct Inner<T> {
13    sig: Notify,
14    map: Mutex<BTreeMap<(Slot, MsgId), T>>,
15}
16
17impl<T> Default for Queue<T> {
18    fn default() -> Self {
19        Self::new()
20    }
21}
22
23impl<T> Clone for Queue<T> {
24    fn clone(&self) -> Self {
25        Self(self.0.clone())
26    }
27}
28
29impl<T> Queue<T> {
30    pub fn new() -> Self {
31        Self(Arc::new(Inner {
32            sig: Notify::new(),
33            map: Mutex::new(BTreeMap::new()),
34        }))
35    }
36
37    pub fn enqueue(&self, s: Slot, i: MsgId, val: T) {
38        self.0.map.lock().insert((s, i), val);
39        self.0.sig.notify_waiters();
40    }
41
42    pub fn try_dequeue(&self) -> Option<(Slot, MsgId, T)> {
43        let mut map = self.0.map.lock();
44        let ((s, i), v) = map.pop_first()?;
45        Some((s, i, v))
46    }
47
48    pub async fn dequeue(&self) -> (Slot, MsgId, T) {
49        loop {
50            let future = self.0.sig.notified();
51            if let Some(v) = self.try_dequeue() {
52                return v;
53            }
54            future.await;
55        }
56    }
57
58    pub fn gc(&self, s: Slot) {
59        let mut map = self.0.map.lock();
60        *map = map.split_off(&(s, MsgId(0)))
61    }
62
63    pub fn len(&self) -> usize {
64        self.0.map.lock().len()
65    }
66}