1 use std::collections::HashMap;
2 use std::sync::Arc;
3 use std::time::SystemTime;
4
5 use tokio::time::Duration;
6
7 mod interceptor;
8
9 pub use self::interceptor::StatsInterceptor;
10
make_stats_interceptor(id: &str) -> Arc<StatsInterceptor>11 pub fn make_stats_interceptor(id: &str) -> Arc<StatsInterceptor> {
12 Arc::new(StatsInterceptor::new(id.to_owned()))
13 }
14
15 /// Types related to inbound RTP streams.
16 mod inbound {
17 use std::time::SystemTime;
18
19 use tokio::time::{Duration, Instant};
20
21 use super::{RTCPStats, RTPStats};
22
23 #[derive(Debug, Clone)]
24 /// Stats collected for an inbound RTP stream.
25 /// Contains both stats relating to the inbound stream and remote stats for the corresponding
26 /// outbound stream at the remote end.
27 pub(super) struct StreamStats {
28 /// Received RTP stats.
29 pub(super) rtp_stats: RTPStats,
30 /// Common RTCP stats derived from inbound and outbound RTCP packets.
31 pub(super) rtcp_stats: RTCPStats,
32
33 /// The last time any stats where update, used for garbage collection to remove obsolete stats.
34 last_update: Instant,
35
36 /// The number of packets sent as reported in the latest SR from the remote.
37 remote_packets_sent: u32,
38
39 /// The number of bytes sent as reported in the latest SR from the remote.
40 remote_bytes_sent: u32,
41
42 /// The total number of sender reports sent by the remote and received.
43 remote_reports_sent: u64,
44
45 /// The last remote round trip time measurement in ms. [`None`] if no round trip time has
46 /// been derived yet, or if it wasn't possible to derive it.
47 remote_round_trip_time: Option<f64>,
48
49 /// The cummulative total round trip times reported in ms.
50 remote_total_round_trip_time: f64,
51
52 /// The total number of measurements of the remote round trip time.
53 remote_round_trip_time_measurements: u64,
54 }
55
56 impl Default for StreamStats {
default() -> Self57 fn default() -> Self {
58 Self {
59 rtp_stats: RTPStats::default(),
60 rtcp_stats: RTCPStats::default(),
61 last_update: Instant::now(),
62 remote_packets_sent: 0,
63 remote_bytes_sent: 0,
64 remote_reports_sent: 0,
65 remote_round_trip_time: None,
66 remote_total_round_trip_time: 0.0,
67 remote_round_trip_time_measurements: 0,
68 }
69 }
70 }
71
72 impl StreamStats {
snapshot(&self) -> StatsSnapshot73 pub(super) fn snapshot(&self) -> StatsSnapshot {
74 self.into()
75 }
76
mark_updated(&mut self)77 pub(super) fn mark_updated(&mut self) {
78 self.last_update = Instant::now();
79 }
80
duration_since_last_update(&self) -> Duration81 pub(super) fn duration_since_last_update(&self) -> Duration {
82 self.last_update.elapsed()
83 }
84
record_sender_report(&mut self, packets_sent: u32, bytes_sent: u32)85 pub(super) fn record_sender_report(&mut self, packets_sent: u32, bytes_sent: u32) {
86 self.remote_reports_sent += 1;
87 self.remote_packets_sent = packets_sent;
88 self.remote_bytes_sent = bytes_sent;
89 }
90
record_remote_round_trip_time(&mut self, round_trip_time: Option<f64>)91 pub(super) fn record_remote_round_trip_time(&mut self, round_trip_time: Option<f64>) {
92 // Store the latest measurement, even if it's None.
93 self.remote_round_trip_time = round_trip_time;
94
95 if let Some(rtt) = round_trip_time {
96 // Only if we have a valid measurement do we update the totals
97 self.remote_total_round_trip_time += rtt;
98 self.remote_round_trip_time_measurements += 1;
99 }
100 }
101 }
102
103 /// A point in time snapshot of the stream stats for an inbound RTP stream.
104 ///
105 /// Created by [`StreamStats::snapshot`].
106 #[derive(Debug)]
107 pub struct StatsSnapshot {
108 /// Received RTP stats.
109 rtp_stats: RTPStats,
110 /// Common RTCP stats derived from inbound and outbound RTCP packets.
111 rtcp_stats: RTCPStats,
112
113 /// The number of packets sent as reported in the latest SR from the remote.
114 remote_packets_sent: u32,
115
116 /// The number of bytes sent as reported in the latest SR from the remote.
117 remote_bytes_sent: u32,
118
119 /// The total number of sender reports sent by the remote and received.
120 remote_reports_sent: u64,
121
122 /// The last remote round trip time measurement in ms. [`None`] if no round trip time has
123 /// been derived yet, or if it wasn't possible to derive it.
124 remote_round_trip_time: Option<f64>,
125
126 /// The cummulative total round trip times reported in ms.
127 remote_total_round_trip_time: f64,
128
129 /// The total number of measurements of the remote round trip time.
130 remote_round_trip_time_measurements: u64,
131 }
132
133 impl StatsSnapshot {
packets_received(&self) -> u64134 pub fn packets_received(&self) -> u64 {
135 self.rtp_stats.packets
136 }
137
payload_bytes_received(&self) -> u64138 pub fn payload_bytes_received(&self) -> u64 {
139 self.rtp_stats.payload_bytes
140 }
141
header_bytes_received(&self) -> u64142 pub fn header_bytes_received(&self) -> u64 {
143 self.rtp_stats.header_bytes
144 }
145
last_packet_received_timestamp(&self) -> Option<SystemTime>146 pub fn last_packet_received_timestamp(&self) -> Option<SystemTime> {
147 self.rtp_stats.last_packet_timestamp
148 }
149
nacks_sent(&self) -> u64150 pub fn nacks_sent(&self) -> u64 {
151 self.rtcp_stats.nack_count
152 }
153
firs_sent(&self) -> u64154 pub fn firs_sent(&self) -> u64 {
155 self.rtcp_stats.fir_count
156 }
157
plis_sent(&self) -> u64158 pub fn plis_sent(&self) -> u64 {
159 self.rtcp_stats.pli_count
160 }
remote_packets_sent(&self) -> u32161 pub fn remote_packets_sent(&self) -> u32 {
162 self.remote_packets_sent
163 }
164
remote_bytes_sent(&self) -> u32165 pub fn remote_bytes_sent(&self) -> u32 {
166 self.remote_bytes_sent
167 }
168
remote_reports_sent(&self) -> u64169 pub fn remote_reports_sent(&self) -> u64 {
170 self.remote_reports_sent
171 }
172
remote_round_trip_time(&self) -> Option<f64>173 pub fn remote_round_trip_time(&self) -> Option<f64> {
174 self.remote_round_trip_time
175 }
176
remote_total_round_trip_time(&self) -> f64177 pub fn remote_total_round_trip_time(&self) -> f64 {
178 self.remote_total_round_trip_time
179 }
180
remote_round_trip_time_measurements(&self) -> u64181 pub fn remote_round_trip_time_measurements(&self) -> u64 {
182 self.remote_round_trip_time_measurements
183 }
184 }
185
186 impl From<&StreamStats> for StatsSnapshot {
from(stream_stats: &StreamStats) -> Self187 fn from(stream_stats: &StreamStats) -> Self {
188 Self {
189 rtp_stats: stream_stats.rtp_stats.clone(),
190 rtcp_stats: stream_stats.rtcp_stats.clone(),
191 remote_packets_sent: stream_stats.remote_packets_sent,
192 remote_bytes_sent: stream_stats.remote_bytes_sent,
193 remote_reports_sent: stream_stats.remote_reports_sent,
194 remote_round_trip_time: stream_stats.remote_round_trip_time,
195 remote_total_round_trip_time: stream_stats.remote_total_round_trip_time,
196 remote_round_trip_time_measurements: stream_stats
197 .remote_round_trip_time_measurements,
198 }
199 }
200 }
201 }
202
203 /// Types related to outbound RTP streams.
204 mod outbound {
205 use std::time::SystemTime;
206
207 use tokio::time::{Duration, Instant};
208
209 use super::{RTCPStats, RTPStats};
210
211 #[derive(Debug, Clone)]
212 /// Stats collected for an outbound RTP stream.
213 /// Contains both stats relating to the outbound stream and remote stats for the corresponding
214 /// inbound stream.
215 pub(super) struct StreamStats {
216 /// Sent RTP stats.
217 pub(super) rtp_stats: RTPStats,
218 /// Common RTCP stats derived from inbound and outbound RTCP packets.
219 pub(super) rtcp_stats: RTCPStats,
220
221 /// The last time any stats where update, used for garbage collection to remove obsolete stats.
222 last_update: Instant,
223
224 /// The first value of extended seq num that was sent in an SR for this SSRC. [`None`] before
225 /// the first SR is sent.
226 ///
227 /// Used to calculate packet statistic for remote stats.
228 initial_outbound_ext_seq_num: Option<u32>,
229
230 /// The number of inbound packets received by the remote side for this stream.
231 remote_packets_received: u64,
232
233 /// The number of lost packets reported by the remote for this tream.
234 remote_total_lost: u32,
235
236 /// The estimated remote jitter for this stream in timestamp units.
237 remote_jitter: u32,
238
239 /// The last remote round trip time measurement in ms. [`None`] if no round trip time has
240 /// been derived yet, or if it wasn't possible to derive it.
241 remote_round_trip_time: Option<f64>,
242
243 /// The cummulative total round trip times reported in ms.
244 remote_total_round_trip_time: f64,
245
246 /// The total number of measurements of the remote round trip time.
247 remote_round_trip_time_measurements: u64,
248
249 /// The latest fraction lost value from RR.
250 remote_fraction_lost: Option<u8>,
251 }
252
253 impl Default for StreamStats {
default() -> Self254 fn default() -> Self {
255 Self {
256 rtp_stats: RTPStats::default(),
257 rtcp_stats: RTCPStats::default(),
258 last_update: Instant::now(),
259 initial_outbound_ext_seq_num: None,
260 remote_packets_received: 0,
261 remote_total_lost: 0,
262 remote_jitter: 0,
263 remote_round_trip_time: None,
264 remote_total_round_trip_time: 0.0,
265 remote_round_trip_time_measurements: 0,
266 remote_fraction_lost: None,
267 }
268 }
269 }
270
271 impl StreamStats {
snapshot(&self) -> StatsSnapshot272 pub(super) fn snapshot(&self) -> StatsSnapshot {
273 self.into()
274 }
275
mark_updated(&mut self)276 pub(super) fn mark_updated(&mut self) {
277 self.last_update = Instant::now();
278 }
279
duration_since_last_update(&self) -> Duration280 pub(super) fn duration_since_last_update(&self) -> Duration {
281 self.last_update.elapsed()
282 }
283
update_remote_inbound_packets_received( &mut self, rr_ext_seq_num: u32, rr_total_lost: u32, )284 pub(super) fn update_remote_inbound_packets_received(
285 &mut self,
286 rr_ext_seq_num: u32,
287 rr_total_lost: u32,
288 ) {
289 if let Some(initial_ext_seq_num) = self.initial_outbound_ext_seq_num {
290 // Total number of RTP packets received for this SSRC.
291 // At the receiving endpoint, this is calculated as defined in [RFC3550] section 6.4.1.
292 // At the sending endpoint the packetsReceived is estimated by subtracting the
293 // Cumulative Number of Packets Lost from the Extended Highest Sequence Number Received,
294 // both reported in the RTCP Receiver Report, and then subtracting the
295 // initial Extended Sequence Number that was sent to this SSRC in a RTCP Sender Report and then adding one,
296 // to mirror what is discussed in Appendix A.3 in [RFC3550], but for the sender side.
297 // If no RTCP Receiver Report has been received yet, then return 0.
298 self.remote_packets_received =
299 (rr_ext_seq_num as u64) - (rr_total_lost as u64) - (initial_ext_seq_num as u64)
300 + 1;
301 }
302 }
303
304 #[inline(always)]
record_sr_ext_seq_num(&mut self, seq_num: u32)305 pub(super) fn record_sr_ext_seq_num(&mut self, seq_num: u32) {
306 // Only record the initial value
307 if self.initial_outbound_ext_seq_num.is_none() {
308 self.initial_outbound_ext_seq_num = Some(seq_num);
309 }
310 }
311
record_remote_round_trip_time(&mut self, round_trip_time: Option<f64>)312 pub(super) fn record_remote_round_trip_time(&mut self, round_trip_time: Option<f64>) {
313 // Store the latest measurement, even if it's None.
314 self.remote_round_trip_time = round_trip_time;
315
316 if let Some(rtt) = round_trip_time {
317 // Only if we have a valid measurement do we update the totals
318 self.remote_total_round_trip_time += rtt;
319 self.remote_round_trip_time_measurements += 1;
320 }
321 }
322
update_remote_fraction_lost(&mut self, fraction_lost: u8)323 pub(super) fn update_remote_fraction_lost(&mut self, fraction_lost: u8) {
324 self.remote_fraction_lost = Some(fraction_lost);
325 }
326
update_remote_jitter(&mut self, jitter: u32)327 pub(super) fn update_remote_jitter(&mut self, jitter: u32) {
328 self.remote_jitter = jitter;
329 }
330
update_remote_total_lost(&mut self, lost: u32)331 pub(super) fn update_remote_total_lost(&mut self, lost: u32) {
332 self.remote_total_lost = lost;
333 }
334 }
335
336 /// A point in time snapshot of the stream stats for an outbound RTP stream.
337 ///
338 /// Created by [`StreamStats::snapshot`].
339 #[derive(Debug)]
340 pub struct StatsSnapshot {
341 /// Sent RTP stats.
342 rtp_stats: RTPStats,
343 /// Common RTCP stats derived from inbound and outbound RTCP packets.
344 rtcp_stats: RTCPStats,
345
346 /// The number of inbound packets received by the remote side for this stream.
347 remote_packets_received: u64,
348
349 /// The number of lost packets reported by the remote for this tream.
350 remote_total_lost: u32,
351
352 /// The estimated remote jitter for this stream in timestamp units.
353 remote_jitter: u32,
354
355 /// The most recent remote round trip time in milliseconds.
356 remote_round_trip_time: Option<f64>,
357
358 /// The cummulative total round trip times reported in ms.
359 remote_total_round_trip_time: f64,
360
361 /// The total number of measurements of the remote round trip time.
362 remote_round_trip_time_measurements: u64,
363
364 /// The fraction of packets lost reported for this stream.
365 /// Calculated as defined in [RFC3550](https://www.rfc-editor.org/rfc/rfc3550) section 6.4.1 and Appendix A.3.
366 remote_fraction_lost: Option<f64>,
367 }
368
369 impl StatsSnapshot {
packets_sent(&self) -> u64370 pub fn packets_sent(&self) -> u64 {
371 self.rtp_stats.packets
372 }
373
payload_bytes_sent(&self) -> u64374 pub fn payload_bytes_sent(&self) -> u64 {
375 self.rtp_stats.payload_bytes
376 }
377
header_bytes_sent(&self) -> u64378 pub fn header_bytes_sent(&self) -> u64 {
379 self.rtp_stats.header_bytes
380 }
381
last_packet_sent_timestamp(&self) -> Option<SystemTime>382 pub fn last_packet_sent_timestamp(&self) -> Option<SystemTime> {
383 self.rtp_stats.last_packet_timestamp
384 }
385
nacks_received(&self) -> u64386 pub fn nacks_received(&self) -> u64 {
387 self.rtcp_stats.nack_count
388 }
389
firs_received(&self) -> u64390 pub fn firs_received(&self) -> u64 {
391 self.rtcp_stats.fir_count
392 }
393
plis_received(&self) -> u64394 pub fn plis_received(&self) -> u64 {
395 self.rtcp_stats.pli_count
396 }
397
398 /// Packets received on the remote side.
remote_packets_received(&self) -> u64399 pub fn remote_packets_received(&self) -> u64 {
400 self.remote_packets_received
401 }
402
403 /// The number of lost packets reported by the remote for this tream.
remote_total_lost(&self) -> u32404 pub fn remote_total_lost(&self) -> u32 {
405 self.remote_total_lost
406 }
407
408 /// The estimated remote jitter for this stream in timestamp units.
remote_jitter(&self) -> u32409 pub fn remote_jitter(&self) -> u32 {
410 self.remote_jitter
411 }
412
413 /// The latest RTT in ms if enough data is available to measure it.
remote_round_trip_time(&self) -> Option<f64>414 pub fn remote_round_trip_time(&self) -> Option<f64> {
415 self.remote_round_trip_time
416 }
417
418 /// Total RTT in ms.
remote_total_round_trip_time(&self) -> f64419 pub fn remote_total_round_trip_time(&self) -> f64 {
420 self.remote_total_round_trip_time
421 }
422
423 /// The number of RTT measurements so far.
remote_round_trip_time_measurements(&self) -> u64424 pub fn remote_round_trip_time_measurements(&self) -> u64 {
425 self.remote_round_trip_time_measurements
426 }
427
428 /// The latest fraction lost value from the remote or None if it hasn't been reported yet.
remote_fraction_lost(&self) -> Option<f64>429 pub fn remote_fraction_lost(&self) -> Option<f64> {
430 self.remote_fraction_lost
431 }
432 }
433
434 impl From<&StreamStats> for StatsSnapshot {
from(stream_stats: &StreamStats) -> Self435 fn from(stream_stats: &StreamStats) -> Self {
436 Self {
437 rtp_stats: stream_stats.rtp_stats.clone(),
438 rtcp_stats: stream_stats.rtcp_stats.clone(),
439 remote_packets_received: stream_stats.remote_packets_received,
440 remote_total_lost: stream_stats.remote_total_lost,
441 remote_jitter: stream_stats.remote_jitter,
442 remote_round_trip_time: stream_stats.remote_round_trip_time,
443 remote_total_round_trip_time: stream_stats.remote_total_round_trip_time,
444 remote_round_trip_time_measurements: stream_stats
445 .remote_round_trip_time_measurements,
446 remote_fraction_lost: stream_stats
447 .remote_fraction_lost
448 .map(|fraction| (fraction as f64) / (u8::MAX as f64)),
449 }
450 }
451 }
452 }
453
454 #[derive(Default, Debug)]
455 struct StatsContainer {
456 inbound_stats: HashMap<u32, inbound::StreamStats>,
457 outbound_stats: HashMap<u32, outbound::StreamStats>,
458 }
459
460 impl StatsContainer {
get_or_create_inbound_stream_stats(&mut self, ssrc: u32) -> &mut inbound::StreamStats461 fn get_or_create_inbound_stream_stats(&mut self, ssrc: u32) -> &mut inbound::StreamStats {
462 self.inbound_stats.entry(ssrc).or_default()
463 }
464
get_or_create_outbound_stream_stats(&mut self, ssrc: u32) -> &mut outbound::StreamStats465 fn get_or_create_outbound_stream_stats(&mut self, ssrc: u32) -> &mut outbound::StreamStats {
466 self.outbound_stats.entry(ssrc).or_default()
467 }
468
get_inbound_stats(&self, ssrc: u32) -> Option<&inbound::StreamStats>469 fn get_inbound_stats(&self, ssrc: u32) -> Option<&inbound::StreamStats> {
470 self.inbound_stats.get(&ssrc)
471 }
472
get_outbound_stats(&self, ssrc: u32) -> Option<&outbound::StreamStats>473 fn get_outbound_stats(&self, ssrc: u32) -> Option<&outbound::StreamStats> {
474 self.outbound_stats.get(&ssrc)
475 }
476
remove_stale_entries(&mut self)477 fn remove_stale_entries(&mut self) {
478 const MAX_AGE: Duration = Duration::from_secs(60);
479
480 self.inbound_stats
481 .retain(|_, s| s.duration_since_last_update() < MAX_AGE);
482 self.outbound_stats
483 .retain(|_, s| s.duration_since_last_update() < MAX_AGE);
484 }
485 }
486
487 #[derive(Debug, Default, Clone, PartialEq, Eq)]
488 /// Records stats about a given RTP stream.
489 pub struct RTPStats {
490 /// Packets sent or received
491 packets: u64,
492
493 /// Payload bytes sent or received
494 payload_bytes: u64,
495
496 /// Header bytes sent or received
497 header_bytes: u64,
498
499 /// A wall clock timestamp for when the last packet was sent or recieved encoded as milliseconds since
500 /// [`SystemTime::UNIX_EPOCH`].
501 last_packet_timestamp: Option<SystemTime>,
502 }
503
504 impl RTPStats {
update(&mut self, header_bytes: u64, payload_bytes: u64, packets: u64, now: SystemTime)505 fn update(&mut self, header_bytes: u64, payload_bytes: u64, packets: u64, now: SystemTime) {
506 self.header_bytes += header_bytes;
507 self.payload_bytes += payload_bytes;
508 self.packets += packets;
509 self.last_packet_timestamp = Some(now);
510 }
511
header_bytes(&self) -> u64512 pub fn header_bytes(&self) -> u64 {
513 self.header_bytes
514 }
515
payload_bytes(&self) -> u64516 pub fn payload_bytes(&self) -> u64 {
517 self.payload_bytes
518 }
519
packets(&self) -> u64520 pub fn packets(&self) -> u64 {
521 self.packets
522 }
523
last_packet_timestamp(&self) -> Option<SystemTime>524 pub fn last_packet_timestamp(&self) -> Option<SystemTime> {
525 self.last_packet_timestamp
526 }
527 }
528
529 #[derive(Debug, Default, Clone)]
530 pub struct RTCPStats {
531 /// The number of FIRs sent or recevied
532 fir_count: u64,
533
534 /// The number of PLIs sent or recevied
535 pli_count: u64,
536
537 /// The number of NACKs sent or recevied
538 nack_count: u64,
539 }
540
541 impl RTCPStats {
542 #[allow(clippy::too_many_arguments)]
update(&mut self, fir_count: Option<u64>, pli_count: Option<u64>, nack_count: Option<u64>)543 fn update(&mut self, fir_count: Option<u64>, pli_count: Option<u64>, nack_count: Option<u64>) {
544 if let Some(fir_count) = fir_count {
545 self.fir_count += fir_count;
546 }
547
548 if let Some(pli_count) = pli_count {
549 self.pli_count += pli_count;
550 }
551
552 if let Some(nack_count) = nack_count {
553 self.nack_count += nack_count;
554 }
555 }
556
fir_count(&self) -> u64557 pub fn fir_count(&self) -> u64 {
558 self.fir_count
559 }
560
pli_count(&self) -> u64561 pub fn pli_count(&self) -> u64 {
562 self.pli_count
563 }
564
nack_count(&self) -> u64565 pub fn nack_count(&self) -> u64 {
566 self.nack_count
567 }
568 }
569
570 #[cfg(test)]
571 mod test {
572 use super::*;
573
574 #[test]
test_rtp_stats()575 fn test_rtp_stats() {
576 let mut stats: RTPStats = Default::default();
577 assert_eq!(
578 (stats.header_bytes(), stats.payload_bytes(), stats.packets()),
579 (0, 0, 0),
580 );
581
582 stats.update(24, 960, 1, SystemTime::now());
583
584 assert_eq!(
585 (stats.header_bytes(), stats.payload_bytes(), stats.packets()),
586 (24, 960, 1),
587 );
588 }
589
590 #[test]
test_rtcp_stats()591 fn test_rtcp_stats() {
592 let mut stats: RTCPStats = Default::default();
593 assert_eq!(
594 (stats.fir_count(), stats.pli_count(), stats.nack_count()),
595 (0, 0, 0),
596 );
597
598 stats.update(Some(1), Some(2), Some(3));
599
600 assert_eq!(
601 (stats.fir_count(), stats.pli_count(), stats.nack_count()),
602 (1, 2, 3),
603 );
604 }
605
606 #[test]
test_rtp_stats_send_sync()607 fn test_rtp_stats_send_sync() {
608 fn test_send_sync<T: Send + Sync>() {}
609 test_send_sync::<RTPStats>();
610 }
611
612 #[test]
test_rtcp_stats_send_sync()613 fn test_rtcp_stats_send_sync() {
614 fn test_send_sync<T: Send + Sync>() {}
615 test_send_sync::<RTCPStats>();
616 }
617 }
618