1 use std::sync::atomic::{AtomicU64, Ordering}; 2 use std::sync::Arc; 3 use std::time::{Duration, SystemTime}; 4 5 mod interceptor; 6 7 pub use self::interceptor::StatsInterceptor; 8 9 pub fn make_stats_interceptor(id: &str) -> Arc<StatsInterceptor> { 10 Arc::new(StatsInterceptor::new(id.to_owned())) 11 } 12 13 #[derive(Debug, Default)] 14 /// Records stats about a given RTP stream. 15 pub struct RTPStats { 16 /// Packets sent or received 17 packets: Arc<AtomicU64>, 18 19 /// Payload bytes sent or received 20 payload_bytes: Arc<AtomicU64>, 21 22 /// Header bytes sent or received 23 header_bytes: Arc<AtomicU64>, 24 25 /// A wall clock timestamp for when the last packet was sent or recieved encoded as milliseconds since 26 /// [`SystemTime::UNIX_EPOCH`]. 27 last_packet_timestamp: Arc<AtomicU64>, 28 } 29 30 impl RTPStats { 31 pub fn update(&self, header_bytes: u64, payload_bytes: u64, packets: u64) { 32 let now = SystemTime::now(); 33 34 self.header_bytes.fetch_add(header_bytes, Ordering::SeqCst); 35 self.payload_bytes 36 .fetch_add(payload_bytes, Ordering::SeqCst); 37 self.packets.fetch_add(packets, Ordering::SeqCst); 38 39 if let Ok(duration) = now.duration_since(SystemTime::UNIX_EPOCH) { 40 let millis = duration.as_millis(); 41 // NB: We truncate 128bits to 64 bits here, but even at 64 bits we have ~500k years 42 // before this becomes a problem, then it can be someone else's problem. 43 self.last_packet_timestamp 44 .store(millis as u64, Ordering::SeqCst); 45 } else { 46 log::warn!("SystemTime::now was before SystemTime::UNIX_EPOCH"); 47 } 48 } 49 50 pub fn reader(&self) -> RTPStatsReader { 51 RTPStatsReader { 52 packets: self.packets.clone(), 53 payload_bytes: self.payload_bytes.clone(), 54 header_bytes: self.header_bytes.clone(), 55 last_packet_timestamp: self.last_packet_timestamp.clone(), 56 } 57 } 58 } 59 60 #[derive(Clone, Debug, Default)] 61 /// Reader half of RTPStats. 62 pub struct RTPStatsReader { 63 packets: Arc<AtomicU64>, 64 payload_bytes: Arc<AtomicU64>, 65 header_bytes: Arc<AtomicU64>, 66 67 last_packet_timestamp: Arc<AtomicU64>, 68 } 69 70 impl RTPStatsReader { 71 /// Get packets sent or received. 72 pub fn packets(&self) -> u64 { 73 self.packets.load(Ordering::SeqCst) 74 } 75 76 /// Get payload bytes sent or received. 77 pub fn header_bytes(&self) -> u64 { 78 self.header_bytes.load(Ordering::SeqCst) 79 } 80 81 /// Get header bytes sent or received. 82 pub fn payload_bytes(&self) -> u64 { 83 self.payload_bytes.load(Ordering::SeqCst) 84 } 85 86 pub fn last_packet_timestamp(&self) -> SystemTime { 87 let millis = self.last_packet_timestamp.load(Ordering::SeqCst); 88 89 SystemTime::UNIX_EPOCH + Duration::from_millis(millis) 90 } 91 } 92 93 #[cfg(test)] 94 mod test { 95 use super::*; 96 97 #[test] 98 fn test_rtp_stats() { 99 let stats: RTPStats = Default::default(); 100 let reader = stats.reader(); 101 assert_eq!( 102 ( 103 reader.header_bytes(), 104 reader.payload_bytes(), 105 reader.packets() 106 ), 107 (0, 0, 0), 108 ); 109 110 stats.update(24, 960, 1); 111 112 assert_eq!( 113 ( 114 reader.header_bytes(), 115 reader.payload_bytes(), 116 reader.packets() 117 ), 118 (24, 960, 1), 119 ); 120 } 121 122 #[test] 123 fn test_rtp_stats_send_sync() { 124 fn test_send_sync<T: Send + Sync>() {} 125 test_send_sync::<RTPStats>(); 126 } 127 } 128