1 use std::collections::HashMap; 2 use std::sync::Arc; 3 use std::time::SystemTime; 4 5 use tokio::time::Duration; 6 7 mod interceptor; 8 9 pub use self::interceptor::StatsInterceptor; 10 11 pub fn make_stats_interceptor(id: &str) -> Arc<StatsInterceptor> { 12 Arc::new(StatsInterceptor::new(id.to_owned())) 13 } 14 15 /// Types related to inbound RTP streams. 16 mod inbound { 17 use std::time::SystemTime; 18 19 use tokio::time::{Duration, Instant}; 20 21 use super::{RTCPStats, RTPStats}; 22 23 #[derive(Debug, Clone)] 24 /// Stats collected for an inbound RTP stream. 25 /// Contains both stats relating to the inbound stream and remote stats for the corresponding 26 /// outbound stream at the remote end. 27 pub(super) struct StreamStats { 28 /// Received RTP stats. 29 pub(super) rtp_stats: RTPStats, 30 /// Common RTCP stats derived from inbound and outbound RTCP packets. 31 pub(super) rtcp_stats: RTCPStats, 32 33 /// The last time any stats where update, used for garbage collection to remove obsolete stats. 34 last_update: Instant, 35 36 /// The number of packets sent as reported in the latest SR from the remote. 37 remote_packets_sent: u32, 38 39 /// The number of bytes sent as reported in the latest SR from the remote. 40 remote_bytes_sent: u32, 41 42 /// The total number of sender reports sent by the remote and received. 43 remote_reports_sent: u64, 44 45 /// The last remote round trip time measurement in ms. [`None`] if no round trip time has 46 /// been derived yet, or if it wasn't possible to derive it. 47 remote_round_trip_time: Option<f64>, 48 49 /// The cummulative total round trip times reported in ms. 50 remote_total_round_trip_time: f64, 51 52 /// The total number of measurements of the remote round trip time. 53 remote_round_trip_time_measurements: u64, 54 } 55 56 impl Default for StreamStats { 57 fn default() -> Self { 58 Self { 59 rtp_stats: RTPStats::default(), 60 rtcp_stats: RTCPStats::default(), 61 last_update: Instant::now(), 62 remote_packets_sent: 0, 63 remote_bytes_sent: 0, 64 remote_reports_sent: 0, 65 remote_round_trip_time: None, 66 remote_total_round_trip_time: 0.0, 67 remote_round_trip_time_measurements: 0, 68 } 69 } 70 } 71 72 impl StreamStats { 73 pub(super) fn snapshot(&self) -> StatsSnapshot { 74 self.into() 75 } 76 77 pub(super) fn mark_updated(&mut self) { 78 self.last_update = Instant::now(); 79 } 80 81 pub(super) fn duration_since_last_update(&self) -> Duration { 82 self.last_update.elapsed() 83 } 84 85 pub(super) fn record_sender_report(&mut self, packets_sent: u32, bytes_sent: u32) { 86 self.remote_reports_sent += 1; 87 self.remote_packets_sent = packets_sent; 88 self.remote_bytes_sent = bytes_sent; 89 } 90 91 pub(super) fn record_remote_round_trip_time(&mut self, round_trip_time: Option<f64>) { 92 // Store the latest measurement, even if it's None. 93 self.remote_round_trip_time = round_trip_time; 94 95 if let Some(rtt) = round_trip_time { 96 // Only if we have a valid measurement do we update the totals 97 self.remote_total_round_trip_time += rtt; 98 self.remote_round_trip_time_measurements += 1; 99 } 100 } 101 } 102 103 /// A point in time snapshot of the stream stats for an inbound RTP stream. 104 /// 105 /// Created by [`StreamStats::snapshot`]. 106 #[derive(Debug)] 107 pub struct StatsSnapshot { 108 /// Received RTP stats. 109 rtp_stats: RTPStats, 110 /// Common RTCP stats derived from inbound and outbound RTCP packets. 111 rtcp_stats: RTCPStats, 112 113 /// The number of packets sent as reported in the latest SR from the remote. 114 remote_packets_sent: u32, 115 116 /// The number of bytes sent as reported in the latest SR from the remote. 117 remote_bytes_sent: u32, 118 119 /// The total number of sender reports sent by the remote and received. 120 remote_reports_sent: u64, 121 122 /// The last remote round trip time measurement in ms. [`None`] if no round trip time has 123 /// been derived yet, or if it wasn't possible to derive it. 124 remote_round_trip_time: Option<f64>, 125 126 /// The cummulative total round trip times reported in ms. 127 remote_total_round_trip_time: f64, 128 129 /// The total number of measurements of the remote round trip time. 130 remote_round_trip_time_measurements: u64, 131 } 132 133 impl StatsSnapshot { 134 pub fn packets_received(&self) -> u64 { 135 self.rtp_stats.packets 136 } 137 138 pub fn payload_bytes_received(&self) -> u64 { 139 self.rtp_stats.payload_bytes 140 } 141 142 pub fn header_bytes_received(&self) -> u64 { 143 self.rtp_stats.header_bytes 144 } 145 146 pub fn last_packet_received_timestamp(&self) -> Option<SystemTime> { 147 self.rtp_stats.last_packet_timestamp 148 } 149 150 pub fn nacks_sent(&self) -> u64 { 151 self.rtcp_stats.nack_count 152 } 153 154 pub fn firs_sent(&self) -> u64 { 155 self.rtcp_stats.fir_count 156 } 157 158 pub fn plis_sent(&self) -> u64 { 159 self.rtcp_stats.pli_count 160 } 161 pub fn remote_packets_sent(&self) -> u32 { 162 self.remote_packets_sent 163 } 164 165 pub fn remote_bytes_sent(&self) -> u32 { 166 self.remote_bytes_sent 167 } 168 169 pub fn remote_reports_sent(&self) -> u64 { 170 self.remote_reports_sent 171 } 172 173 pub fn remote_round_trip_time(&self) -> Option<f64> { 174 self.remote_round_trip_time 175 } 176 177 pub fn remote_total_round_trip_time(&self) -> f64 { 178 self.remote_total_round_trip_time 179 } 180 181 pub fn remote_round_trip_time_measurements(&self) -> u64 { 182 self.remote_round_trip_time_measurements 183 } 184 } 185 186 impl From<&StreamStats> for StatsSnapshot { 187 fn from(stream_stats: &StreamStats) -> Self { 188 Self { 189 rtp_stats: stream_stats.rtp_stats.clone(), 190 rtcp_stats: stream_stats.rtcp_stats.clone(), 191 remote_packets_sent: stream_stats.remote_packets_sent, 192 remote_bytes_sent: stream_stats.remote_bytes_sent, 193 remote_reports_sent: stream_stats.remote_reports_sent, 194 remote_round_trip_time: stream_stats.remote_round_trip_time, 195 remote_total_round_trip_time: stream_stats.remote_total_round_trip_time, 196 remote_round_trip_time_measurements: stream_stats 197 .remote_round_trip_time_measurements, 198 } 199 } 200 } 201 } 202 203 /// Types related to outbound RTP streams. 204 mod outbound { 205 use std::time::SystemTime; 206 207 use tokio::time::{Duration, Instant}; 208 209 use super::{RTCPStats, RTPStats}; 210 211 #[derive(Debug, Clone)] 212 /// Stats collected for an outbound RTP stream. 213 /// Contains both stats relating to the outbound stream and remote stats for the corresponding 214 /// inbound stream. 215 pub(super) struct StreamStats { 216 /// Sent RTP stats. 217 pub(super) rtp_stats: RTPStats, 218 /// Common RTCP stats derived from inbound and outbound RTCP packets. 219 pub(super) rtcp_stats: RTCPStats, 220 221 /// The last time any stats where update, used for garbage collection to remove obsolete stats. 222 last_update: Instant, 223 224 /// The first value of extended seq num that was sent in an SR for this SSRC. [`None`] before 225 /// the first SR is sent. 226 /// 227 /// Used to calculate packet statistic for remote stats. 228 initial_outbound_ext_seq_num: Option<u32>, 229 230 /// The number of inbound packets received by the remote side for this stream. 231 remote_packets_received: u64, 232 233 /// The number of lost packets reported by the remote for this tream. 234 remote_total_lost: u32, 235 236 /// The estimated remote jitter for this stream in timestamp units. 237 remote_jitter: u32, 238 239 /// The last remote round trip time measurement in ms. [`None`] if no round trip time has 240 /// been derived yet, or if it wasn't possible to derive it. 241 remote_round_trip_time: Option<f64>, 242 243 /// The cummulative total round trip times reported in ms. 244 remote_total_round_trip_time: f64, 245 246 /// The total number of measurements of the remote round trip time. 247 remote_round_trip_time_measurements: u64, 248 249 /// The latest fraction lost value from RR. 250 remote_fraction_lost: Option<u8>, 251 } 252 253 impl Default for StreamStats { 254 fn default() -> Self { 255 Self { 256 rtp_stats: RTPStats::default(), 257 rtcp_stats: RTCPStats::default(), 258 last_update: Instant::now(), 259 initial_outbound_ext_seq_num: None, 260 remote_packets_received: 0, 261 remote_total_lost: 0, 262 remote_jitter: 0, 263 remote_round_trip_time: None, 264 remote_total_round_trip_time: 0.0, 265 remote_round_trip_time_measurements: 0, 266 remote_fraction_lost: None, 267 } 268 } 269 } 270 271 impl StreamStats { 272 pub(super) fn snapshot(&self) -> StatsSnapshot { 273 self.into() 274 } 275 276 pub(super) fn mark_updated(&mut self) { 277 self.last_update = Instant::now(); 278 } 279 280 pub(super) fn duration_since_last_update(&self) -> Duration { 281 self.last_update.elapsed() 282 } 283 284 pub(super) fn update_remote_inbound_packets_received( 285 &mut self, 286 rr_ext_seq_num: u32, 287 rr_total_lost: u32, 288 ) { 289 if let Some(initial_ext_seq_num) = self.initial_outbound_ext_seq_num { 290 // Total number of RTP packets received for this SSRC. 291 // At the receiving endpoint, this is calculated as defined in [RFC3550] section 6.4.1. 292 // At the sending endpoint the packetsReceived is estimated by subtracting the 293 // Cumulative Number of Packets Lost from the Extended Highest Sequence Number Received, 294 // both reported in the RTCP Receiver Report, and then subtracting the 295 // initial Extended Sequence Number that was sent to this SSRC in a RTCP Sender Report and then adding one, 296 // to mirror what is discussed in Appendix A.3 in [RFC3550], but for the sender side. 297 // If no RTCP Receiver Report has been received yet, then return 0. 298 self.remote_packets_received = 299 (rr_ext_seq_num as u64) - (rr_total_lost as u64) - (initial_ext_seq_num as u64) 300 + 1; 301 } 302 } 303 304 #[inline(always)] 305 pub(super) fn record_sr_ext_seq_num(&mut self, seq_num: u32) { 306 // Only record the initial value 307 if self.initial_outbound_ext_seq_num.is_none() { 308 self.initial_outbound_ext_seq_num = Some(seq_num); 309 } 310 } 311 312 pub(super) fn record_remote_round_trip_time(&mut self, round_trip_time: Option<f64>) { 313 // Store the latest measurement, even if it's None. 314 self.remote_round_trip_time = round_trip_time; 315 316 if let Some(rtt) = round_trip_time { 317 // Only if we have a valid measurement do we update the totals 318 self.remote_total_round_trip_time += rtt; 319 self.remote_round_trip_time_measurements += 1; 320 } 321 } 322 323 pub(super) fn update_remote_fraction_lost(&mut self, fraction_lost: u8) { 324 self.remote_fraction_lost = Some(fraction_lost); 325 } 326 327 pub(super) fn update_remote_jitter(&mut self, jitter: u32) { 328 self.remote_jitter = jitter; 329 } 330 331 pub(super) fn update_remote_total_lost(&mut self, lost: u32) { 332 self.remote_total_lost = lost; 333 } 334 } 335 336 /// A point in time snapshot of the stream stats for an outbound RTP stream. 337 /// 338 /// Created by [`StreamStats::snapshot`]. 339 #[derive(Debug)] 340 pub struct StatsSnapshot { 341 /// Sent RTP stats. 342 rtp_stats: RTPStats, 343 /// Common RTCP stats derived from inbound and outbound RTCP packets. 344 rtcp_stats: RTCPStats, 345 346 /// The number of inbound packets received by the remote side for this stream. 347 remote_packets_received: u64, 348 349 /// The number of lost packets reported by the remote for this tream. 350 remote_total_lost: u32, 351 352 /// The estimated remote jitter for this stream in timestamp units. 353 remote_jitter: u32, 354 355 /// The most recent remote round trip time in milliseconds. 356 remote_round_trip_time: Option<f64>, 357 358 /// The cummulative total round trip times reported in ms. 359 remote_total_round_trip_time: f64, 360 361 /// The total number of measurements of the remote round trip time. 362 remote_round_trip_time_measurements: u64, 363 364 /// The fraction of packets lost reported for this stream. 365 /// Calculated as defined in [RFC3550](https://www.rfc-editor.org/rfc/rfc3550) section 6.4.1 and Appendix A.3. 366 remote_fraction_lost: Option<f64>, 367 } 368 369 impl StatsSnapshot { 370 pub fn packets_sent(&self) -> u64 { 371 self.rtp_stats.packets 372 } 373 374 pub fn payload_bytes_sent(&self) -> u64 { 375 self.rtp_stats.payload_bytes 376 } 377 378 pub fn header_bytes_sent(&self) -> u64 { 379 self.rtp_stats.header_bytes 380 } 381 382 pub fn last_packet_sent_timestamp(&self) -> Option<SystemTime> { 383 self.rtp_stats.last_packet_timestamp 384 } 385 386 pub fn nacks_received(&self) -> u64 { 387 self.rtcp_stats.nack_count 388 } 389 390 pub fn firs_received(&self) -> u64 { 391 self.rtcp_stats.fir_count 392 } 393 394 pub fn plis_received(&self) -> u64 { 395 self.rtcp_stats.pli_count 396 } 397 398 /// Packets received on the remote side. 399 pub fn remote_packets_received(&self) -> u64 { 400 self.remote_packets_received 401 } 402 403 /// The number of lost packets reported by the remote for this tream. 404 pub fn remote_total_lost(&self) -> u32 { 405 self.remote_total_lost 406 } 407 408 /// The estimated remote jitter for this stream in timestamp units. 409 pub fn remote_jitter(&self) -> u32 { 410 self.remote_jitter 411 } 412 413 /// The latest RTT in ms if enough data is available to measure it. 414 pub fn remote_round_trip_time(&self) -> Option<f64> { 415 self.remote_round_trip_time 416 } 417 418 /// Total RTT in ms. 419 pub fn remote_total_round_trip_time(&self) -> f64 { 420 self.remote_total_round_trip_time 421 } 422 423 /// The number of RTT measurements so far. 424 pub fn remote_round_trip_time_measurements(&self) -> u64 { 425 self.remote_round_trip_time_measurements 426 } 427 428 /// The latest fraction lost value from the remote or None if it hasn't been reported yet. 429 pub fn remote_fraction_lost(&self) -> Option<f64> { 430 self.remote_fraction_lost 431 } 432 } 433 434 impl From<&StreamStats> for StatsSnapshot { 435 fn from(stream_stats: &StreamStats) -> Self { 436 Self { 437 rtp_stats: stream_stats.rtp_stats.clone(), 438 rtcp_stats: stream_stats.rtcp_stats.clone(), 439 remote_packets_received: stream_stats.remote_packets_received, 440 remote_total_lost: stream_stats.remote_total_lost, 441 remote_jitter: stream_stats.remote_jitter, 442 remote_round_trip_time: stream_stats.remote_round_trip_time, 443 remote_total_round_trip_time: stream_stats.remote_total_round_trip_time, 444 remote_round_trip_time_measurements: stream_stats 445 .remote_round_trip_time_measurements, 446 remote_fraction_lost: stream_stats 447 .remote_fraction_lost 448 .map(|fraction| (fraction as f64) / (u8::MAX as f64)), 449 } 450 } 451 } 452 } 453 454 #[derive(Default, Debug)] 455 struct StatsContainer { 456 inbound_stats: HashMap<u32, inbound::StreamStats>, 457 outbound_stats: HashMap<u32, outbound::StreamStats>, 458 } 459 460 impl StatsContainer { 461 fn get_or_create_inbound_stream_stats(&mut self, ssrc: u32) -> &mut inbound::StreamStats { 462 self.inbound_stats.entry(ssrc).or_default() 463 } 464 465 fn get_or_create_outbound_stream_stats(&mut self, ssrc: u32) -> &mut outbound::StreamStats { 466 self.outbound_stats.entry(ssrc).or_default() 467 } 468 469 fn get_inbound_stats(&self, ssrc: u32) -> Option<&inbound::StreamStats> { 470 self.inbound_stats.get(&ssrc) 471 } 472 473 fn get_outbound_stats(&self, ssrc: u32) -> Option<&outbound::StreamStats> { 474 self.outbound_stats.get(&ssrc) 475 } 476 477 fn remove_stale_entries(&mut self) { 478 const MAX_AGE: Duration = Duration::from_secs(60); 479 480 self.inbound_stats 481 .retain(|_, s| s.duration_since_last_update() < MAX_AGE); 482 self.outbound_stats 483 .retain(|_, s| s.duration_since_last_update() < MAX_AGE); 484 } 485 } 486 487 #[derive(Debug, Default, Clone, PartialEq, Eq)] 488 /// Records stats about a given RTP stream. 489 pub struct RTPStats { 490 /// Packets sent or received 491 packets: u64, 492 493 /// Payload bytes sent or received 494 payload_bytes: u64, 495 496 /// Header bytes sent or received 497 header_bytes: u64, 498 499 /// A wall clock timestamp for when the last packet was sent or recieved encoded as milliseconds since 500 /// [`SystemTime::UNIX_EPOCH`]. 501 last_packet_timestamp: Option<SystemTime>, 502 } 503 504 impl RTPStats { 505 fn update(&mut self, header_bytes: u64, payload_bytes: u64, packets: u64, now: SystemTime) { 506 self.header_bytes += header_bytes; 507 self.payload_bytes += payload_bytes; 508 self.packets += packets; 509 self.last_packet_timestamp = Some(now); 510 } 511 512 pub fn header_bytes(&self) -> u64 { 513 self.header_bytes 514 } 515 516 pub fn payload_bytes(&self) -> u64 { 517 self.payload_bytes 518 } 519 520 pub fn packets(&self) -> u64 { 521 self.packets 522 } 523 524 pub fn last_packet_timestamp(&self) -> Option<SystemTime> { 525 self.last_packet_timestamp 526 } 527 } 528 529 #[derive(Debug, Default, Clone)] 530 pub struct RTCPStats { 531 /// The number of FIRs sent or recevied 532 fir_count: u64, 533 534 /// The number of PLIs sent or recevied 535 pli_count: u64, 536 537 /// The number of NACKs sent or recevied 538 nack_count: u64, 539 } 540 541 impl RTCPStats { 542 #[allow(clippy::too_many_arguments)] 543 fn update(&mut self, fir_count: Option<u64>, pli_count: Option<u64>, nack_count: Option<u64>) { 544 if let Some(fir_count) = fir_count { 545 self.fir_count += fir_count; 546 } 547 548 if let Some(pli_count) = pli_count { 549 self.pli_count += pli_count; 550 } 551 552 if let Some(nack_count) = nack_count { 553 self.nack_count += nack_count; 554 } 555 } 556 557 pub fn fir_count(&self) -> u64 { 558 self.fir_count 559 } 560 561 pub fn pli_count(&self) -> u64 { 562 self.pli_count 563 } 564 565 pub fn nack_count(&self) -> u64 { 566 self.nack_count 567 } 568 } 569 570 #[cfg(test)] 571 mod test { 572 use super::*; 573 574 #[test] 575 fn test_rtp_stats() { 576 let mut stats: RTPStats = Default::default(); 577 assert_eq!( 578 (stats.header_bytes(), stats.payload_bytes(), stats.packets()), 579 (0, 0, 0), 580 ); 581 582 stats.update(24, 960, 1, SystemTime::now()); 583 584 assert_eq!( 585 (stats.header_bytes(), stats.payload_bytes(), stats.packets()), 586 (24, 960, 1), 587 ); 588 } 589 590 #[test] 591 fn test_rtcp_stats() { 592 let mut stats: RTCPStats = Default::default(); 593 assert_eq!( 594 (stats.fir_count(), stats.pli_count(), stats.nack_count()), 595 (0, 0, 0), 596 ); 597 598 stats.update(Some(1), Some(2), Some(3)); 599 600 assert_eq!( 601 (stats.fir_count(), stats.pli_count(), stats.nack_count()), 602 (1, 2, 3), 603 ); 604 } 605 606 #[test] 607 fn test_rtp_stats_send_sync() { 608 fn test_send_sync<T: Send + Sync>() {} 609 test_send_sync::<RTPStats>(); 610 } 611 612 #[test] 613 fn test_rtcp_stats_send_sync() { 614 fn test_send_sync<T: Send + Sync>() {} 615 test_send_sync::<RTCPStats>(); 616 } 617 } 618