xref: /webrtc/interceptor/src/stats/mod.rs (revision 04f0bd9e)
1 use std::sync::atomic::{AtomicU64, Ordering};
2 use std::sync::Arc;
3 use std::time::{Duration, SystemTime};
4 
5 mod interceptor;
6 
7 pub use self::interceptor::StatsInterceptor;
8 
9 pub fn make_stats_interceptor(id: &str) -> Arc<StatsInterceptor> {
10     Arc::new(StatsInterceptor::new(id.to_owned()))
11 }
12 
13 #[derive(Debug, Default)]
14 /// Records stats about a given RTP stream.
15 pub struct RTPStats {
16     /// Packets sent or received
17     packets: Arc<AtomicU64>,
18 
19     /// Payload bytes sent or received
20     payload_bytes: Arc<AtomicU64>,
21 
22     /// Header bytes sent or received
23     header_bytes: Arc<AtomicU64>,
24 
25     /// A wall clock timestamp for when the last packet was sent or recieved encoded as milliseconds since
26     /// [`SystemTime::UNIX_EPOCH`].
27     last_packet_timestamp: Arc<AtomicU64>,
28 }
29 
30 impl RTPStats {
31     pub fn update(&self, header_bytes: u64, payload_bytes: u64, packets: u64) {
32         let now = SystemTime::now();
33 
34         self.header_bytes.fetch_add(header_bytes, Ordering::SeqCst);
35         self.payload_bytes
36             .fetch_add(payload_bytes, Ordering::SeqCst);
37         self.packets.fetch_add(packets, Ordering::SeqCst);
38 
39         if let Ok(duration) = now.duration_since(SystemTime::UNIX_EPOCH) {
40             let millis = duration.as_millis();
41             // NB: We truncate 128bits to 64 bits here, but even at 64 bits we have ~500k years
42             // before this becomes a problem, then it can be someone else's problem.
43             self.last_packet_timestamp
44                 .store(millis as u64, Ordering::SeqCst);
45         } else {
46             log::warn!("SystemTime::now was before SystemTime::UNIX_EPOCH");
47         }
48     }
49 
50     pub fn reader(&self) -> RTPStatsReader {
51         RTPStatsReader {
52             packets: self.packets.clone(),
53             payload_bytes: self.payload_bytes.clone(),
54             header_bytes: self.header_bytes.clone(),
55             last_packet_timestamp: self.last_packet_timestamp.clone(),
56         }
57     }
58 }
59 
60 #[derive(Clone, Debug, Default)]
61 /// Reader half of RTPStats.
62 pub struct RTPStatsReader {
63     packets: Arc<AtomicU64>,
64     payload_bytes: Arc<AtomicU64>,
65     header_bytes: Arc<AtomicU64>,
66 
67     last_packet_timestamp: Arc<AtomicU64>,
68 }
69 
70 impl RTPStatsReader {
71     /// Get packets sent or received.
72     pub fn packets(&self) -> u64 {
73         self.packets.load(Ordering::SeqCst)
74     }
75 
76     /// Get payload bytes sent or received.
77     pub fn header_bytes(&self) -> u64 {
78         self.header_bytes.load(Ordering::SeqCst)
79     }
80 
81     /// Get header bytes sent or received.
82     pub fn payload_bytes(&self) -> u64 {
83         self.payload_bytes.load(Ordering::SeqCst)
84     }
85 
86     pub fn last_packet_timestamp(&self) -> SystemTime {
87         let millis = self.last_packet_timestamp.load(Ordering::SeqCst);
88 
89         SystemTime::UNIX_EPOCH + Duration::from_millis(millis)
90     }
91 }
92 
93 #[cfg(test)]
94 mod test {
95     use super::*;
96 
97     #[test]
98     fn test_rtp_stats() {
99         let stats: RTPStats = Default::default();
100         let reader = stats.reader();
101         assert_eq!(
102             (
103                 reader.header_bytes(),
104                 reader.payload_bytes(),
105                 reader.packets()
106             ),
107             (0, 0, 0),
108         );
109 
110         stats.update(24, 960, 1);
111 
112         assert_eq!(
113             (
114                 reader.header_bytes(),
115                 reader.payload_bytes(),
116                 reader.packets()
117             ),
118             (24, 960, 1),
119         );
120     }
121 
122     #[test]
123     fn test_rtp_stats_send_sync() {
124         fn test_send_sync<T: Send + Sync>() {}
125         test_send_sync::<RTPStats>();
126     }
127 }
128