Skip to main content

hotshot_libp2p_networking/network/
log_summary.rs

1use std::{
2    sync::atomic::{AtomicBool, AtomicU64, Ordering},
3    time::Duration,
4};
5
6use tokio::time::interval;
7use tracing::info;
8
9pub const SUMMARY_INTERVAL: Duration = Duration::from_secs(60);
10
11macro_rules! events {
12    ($($variant:ident => $name:literal),* $(,)?) => {
13        #[derive(Clone, Copy)]
14        #[repr(usize)]
15        pub enum LogEvent {
16            $($variant),*
17        }
18
19        const NAMES: &[&str] = &[$($name),*];
20        static COUNTERS: [AtomicU64; NAMES.len()] =
21            [const { AtomicU64::new(0) }; NAMES.len()];
22    };
23}
24
25events! {
26    AuthFailure => "auth_failures",
27    AuthHandshakeTimeout => "auth_handshake_timeouts",
28    DhtClosestPeersFailure => "dht_closest_peers_failures",
29    DhtDisagreementGivenUp => "dht_disagreements_given_up",
30    DhtKadQueryError => "dht_kad_query_errors",
31    DhtLookupFailure => "dht_lookup_failures",
32    DialFailure => "dial_failures",
33    DirectMessageInboundFailure => "direct_message_inbound_failures",
34    DirectMessageOutboundFailure => "direct_message_outbound_failures",
35    GossipPublishFailure => "gossip_publish_failures",
36    GossipsubNotSupported => "gossipsub_not_supported",
37    GossipsubSlowPeer => "gossipsub_slow_peer",
38    IncomingConnError => "incoming_conn_errors",
39    ListenerError => "listener_errors",
40    NetworkSendFailure => "network_send_failures",
41    VerifyFailure => "verify_failures",
42}
43
44impl LogEvent {
45    pub fn record(self) {
46        COUNTERS[self as usize].fetch_add(1, Ordering::Relaxed);
47    }
48}
49
50fn drain_and_format() -> Option<String> {
51    let parts: Vec<String> = COUNTERS
52        .iter()
53        .zip(NAMES.iter())
54        .filter_map(|(counter, name)| {
55            let value = counter.swap(0, Ordering::Relaxed);
56            (value != 0).then(|| format!("{name}={value}"))
57        })
58        .collect();
59    (!parts.is_empty()).then(|| parts.join(" "))
60}
61
62fn emit_summary() {
63    if let Some(body) = drain_and_format() {
64        info!("libp2p {}s summary: {body}", SUMMARY_INTERVAL.as_secs());
65    }
66}
67
68static SPAWNED: AtomicBool = AtomicBool::new(false);
69
70pub fn spawn_summary_task() {
71    if SPAWNED
72        .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
73        .is_err()
74    {
75        return;
76    }
77    tokio::spawn(async move {
78        let mut ticker = interval(SUMMARY_INTERVAL);
79        ticker.tick().await; // skip immediate first tick
80        loop {
81            ticker.tick().await;
82            emit_summary();
83        }
84    });
85}
86
87#[cfg(test)]
88mod tests {
89    use std::sync::{Mutex, MutexGuard, OnceLock, atomic::Ordering};
90
91    use tracing_test::traced_test;
92
93    use super::{COUNTERS, LogEvent, drain_and_format, emit_summary};
94
95    fn test_lock() -> MutexGuard<'static, ()> {
96        static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
97        LOCK.get_or_init(|| Mutex::new(()))
98            .lock()
99            .unwrap_or_else(|p| p.into_inner())
100    }
101
102    fn reset() {
103        for c in COUNTERS.iter() {
104            c.store(0, Ordering::Relaxed);
105        }
106    }
107
108    #[test]
109    #[traced_test]
110    fn emits_only_nonzero_and_skips_when_idle() {
111        let _g = test_lock();
112        reset();
113        emit_summary();
114        assert!(!logs_contain("libp2p"));
115
116        for _ in 0..3 {
117            LogEvent::DialFailure.record();
118        }
119        for _ in 0..5 {
120            LogEvent::AuthFailure.record();
121        }
122        emit_summary();
123        assert!(logs_contain("libp2p 60s summary:"));
124        assert!(logs_contain("auth_failures=5"));
125        assert!(logs_contain("dial_failures=3"));
126        assert!(!logs_contain("verify_failures"));
127        assert!(drain_and_format().is_none());
128    }
129
130    #[test]
131    fn concurrent_increments_are_not_lost() {
132        let _g = test_lock();
133        reset();
134        const THREADS: usize = 8;
135        const PER_THREAD: u64 = 1_000;
136        std::thread::scope(|s| {
137            for _ in 0..THREADS {
138                s.spawn(|| {
139                    for _ in 0..PER_THREAD {
140                        LogEvent::DialFailure.record();
141                    }
142                });
143            }
144        });
145        let line = drain_and_format().expect("expected a summary");
146        assert!(line.contains(&format!("dial_failures={}", THREADS as u64 * PER_THREAD)));
147    }
148}