hotshot_libp2p_networking/network/
log_summary.rs1use std::{
2 sync::atomic::{AtomicBool, AtomicU64, Ordering},
3 time::Duration,
4};
5
6use tokio::time::interval;
7use tracing::info;
8
9pub const SUMMARY_INTERVAL: Duration = Duration::from_secs(60);
10
11macro_rules! events {
12 ($($variant:ident => $name:literal),* $(,)?) => {
13 #[derive(Clone, Copy)]
14 #[repr(usize)]
15 pub enum LogEvent {
16 $($variant),*
17 }
18
19 const NAMES: &[&str] = &[$($name),*];
20 static COUNTERS: [AtomicU64; NAMES.len()] =
21 [const { AtomicU64::new(0) }; NAMES.len()];
22 };
23}
24
25events! {
26 AuthFailure => "auth_failures",
27 AuthHandshakeTimeout => "auth_handshake_timeouts",
28 DhtClosestPeersFailure => "dht_closest_peers_failures",
29 DhtDisagreementGivenUp => "dht_disagreements_given_up",
30 DhtKadQueryError => "dht_kad_query_errors",
31 DhtLookupFailure => "dht_lookup_failures",
32 DialFailure => "dial_failures",
33 DirectMessageInboundFailure => "direct_message_inbound_failures",
34 DirectMessageOutboundFailure => "direct_message_outbound_failures",
35 GossipPublishFailure => "gossip_publish_failures",
36 GossipsubNotSupported => "gossipsub_not_supported",
37 GossipsubSlowPeer => "gossipsub_slow_peer",
38 IncomingConnError => "incoming_conn_errors",
39 ListenerError => "listener_errors",
40 NetworkSendFailure => "network_send_failures",
41 VerifyFailure => "verify_failures",
42}
43
44impl LogEvent {
45 pub fn record(self) {
46 COUNTERS[self as usize].fetch_add(1, Ordering::Relaxed);
47 }
48}
49
50fn drain_and_format() -> Option<String> {
51 let parts: Vec<String> = COUNTERS
52 .iter()
53 .zip(NAMES.iter())
54 .filter_map(|(counter, name)| {
55 let value = counter.swap(0, Ordering::Relaxed);
56 (value != 0).then(|| format!("{name}={value}"))
57 })
58 .collect();
59 (!parts.is_empty()).then(|| parts.join(" "))
60}
61
62fn emit_summary() {
63 if let Some(body) = drain_and_format() {
64 info!("libp2p {}s summary: {body}", SUMMARY_INTERVAL.as_secs());
65 }
66}
67
68static SPAWNED: AtomicBool = AtomicBool::new(false);
69
70pub fn spawn_summary_task() {
71 if SPAWNED
72 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
73 .is_err()
74 {
75 return;
76 }
77 tokio::spawn(async move {
78 let mut ticker = interval(SUMMARY_INTERVAL);
79 ticker.tick().await; loop {
81 ticker.tick().await;
82 emit_summary();
83 }
84 });
85}
86
87#[cfg(test)]
88mod tests {
89 use std::sync::{Mutex, MutexGuard, OnceLock, atomic::Ordering};
90
91 use tracing_test::traced_test;
92
93 use super::{COUNTERS, LogEvent, drain_and_format, emit_summary};
94
95 fn test_lock() -> MutexGuard<'static, ()> {
96 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
97 LOCK.get_or_init(|| Mutex::new(()))
98 .lock()
99 .unwrap_or_else(|p| p.into_inner())
100 }
101
102 fn reset() {
103 for c in COUNTERS.iter() {
104 c.store(0, Ordering::Relaxed);
105 }
106 }
107
108 #[test]
109 #[traced_test]
110 fn emits_only_nonzero_and_skips_when_idle() {
111 let _g = test_lock();
112 reset();
113 emit_summary();
114 assert!(!logs_contain("libp2p"));
115
116 for _ in 0..3 {
117 LogEvent::DialFailure.record();
118 }
119 for _ in 0..5 {
120 LogEvent::AuthFailure.record();
121 }
122 emit_summary();
123 assert!(logs_contain("libp2p 60s summary:"));
124 assert!(logs_contain("auth_failures=5"));
125 assert!(logs_contain("dial_failures=3"));
126 assert!(!logs_contain("verify_failures"));
127 assert!(drain_and_format().is_none());
128 }
129
130 #[test]
131 fn concurrent_increments_are_not_lost() {
132 let _g = test_lock();
133 reset();
134 const THREADS: usize = 8;
135 const PER_THREAD: u64 = 1_000;
136 std::thread::scope(|s| {
137 for _ in 0..THREADS {
138 s.spawn(|| {
139 for _ in 0..PER_THREAD {
140 LogEvent::DialFailure.record();
141 }
142 });
143 }
144 });
145 let line = drain_and_format().expect("expected a summary");
146 assert!(line.contains(&format!("dial_failures={}", THREADS as u64 * PER_THREAD)));
147 }
148}