1use std::{collections::BTreeMap, sync::Arc};
2
3use parking_lot::Mutex;
4use tokio::sync::Notify;
5
6use crate::msg::{MsgId, Slot};
7
8#[derive(Debug)]
9pub struct Queue<T>(Arc<Inner<T>>);
10
11#[derive(Debug)]
12struct Inner<T> {
13 sig: Notify,
14 map: Mutex<BTreeMap<(Slot, MsgId), T>>,
15}
16
17impl<T> Default for Queue<T> {
18 fn default() -> Self {
19 Self::new()
20 }
21}
22
23impl<T> Clone for Queue<T> {
24 fn clone(&self) -> Self {
25 Self(self.0.clone())
26 }
27}
28
29impl<T> Queue<T> {
30 pub fn new() -> Self {
31 Self(Arc::new(Inner {
32 sig: Notify::new(),
33 map: Mutex::new(BTreeMap::new()),
34 }))
35 }
36
37 pub fn enqueue(&self, s: Slot, i: MsgId, val: T) {
38 self.0.map.lock().insert((s, i), val);
39 self.0.sig.notify_waiters();
40 }
41
42 pub fn try_dequeue(&self) -> Option<(Slot, MsgId, T)> {
43 let mut map = self.0.map.lock();
44 let ((s, i), v) = map.pop_first()?;
45 Some((s, i, v))
46 }
47
48 pub async fn dequeue(&self) -> (Slot, MsgId, T) {
49 loop {
50 let future = self.0.sig.notified();
51 if let Some(v) = self.try_dequeue() {
52 return v;
53 }
54 future.await;
55 }
56 }
57
58 pub fn gc(&self, s: Slot) {
59 let mut map = self.0.map.lock();
60 *map = map.split_off(&(s, MsgId(0)))
61 }
62
63 pub fn len(&self) -> usize {
64 self.0.map.lock().len()
65 }
66}