xref: /webrtc/interceptor/src/stats/mod.rs (revision 0f1c6b53)
1 use std::collections::HashMap;
2 use std::sync::Arc;
3 use std::time::SystemTime;
4 
5 use tokio::time::Duration;
6 
7 mod interceptor;
8 
9 pub use self::interceptor::StatsInterceptor;
10 
make_stats_interceptor(id: &str) -> Arc<StatsInterceptor>11 pub fn make_stats_interceptor(id: &str) -> Arc<StatsInterceptor> {
12     Arc::new(StatsInterceptor::new(id.to_owned()))
13 }
14 
15 /// Types related to inbound RTP streams.
16 mod inbound {
17     use std::time::SystemTime;
18 
19     use tokio::time::{Duration, Instant};
20 
21     use super::{RTCPStats, RTPStats};
22 
23     #[derive(Debug, Clone)]
24     /// Stats collected for an inbound RTP stream.
25     /// Contains both stats relating to the inbound stream and remote stats for the corresponding
26     /// outbound stream at the remote end.
27     pub(super) struct StreamStats {
28         /// Received RTP stats.
29         pub(super) rtp_stats: RTPStats,
30         /// Common RTCP stats derived from inbound and outbound RTCP packets.
31         pub(super) rtcp_stats: RTCPStats,
32 
33         /// The last time any stats where update, used for garbage collection to remove obsolete stats.
34         last_update: Instant,
35 
36         /// The number of packets sent as reported in the latest SR from the remote.
37         remote_packets_sent: u32,
38 
39         /// The number of bytes sent as reported in the latest SR from the remote.
40         remote_bytes_sent: u32,
41 
42         /// The total number of sender reports sent by the remote and received.
43         remote_reports_sent: u64,
44 
45         /// The last remote round trip time measurement in ms. [`None`] if no round trip time has
46         /// been derived yet, or if it wasn't possible to derive it.
47         remote_round_trip_time: Option<f64>,
48 
49         /// The cummulative total round trip times reported in ms.
50         remote_total_round_trip_time: f64,
51 
52         /// The total number of measurements of the remote round trip time.
53         remote_round_trip_time_measurements: u64,
54     }
55 
56     impl Default for StreamStats {
default() -> Self57         fn default() -> Self {
58             Self {
59                 rtp_stats: RTPStats::default(),
60                 rtcp_stats: RTCPStats::default(),
61                 last_update: Instant::now(),
62                 remote_packets_sent: 0,
63                 remote_bytes_sent: 0,
64                 remote_reports_sent: 0,
65                 remote_round_trip_time: None,
66                 remote_total_round_trip_time: 0.0,
67                 remote_round_trip_time_measurements: 0,
68             }
69         }
70     }
71 
72     impl StreamStats {
snapshot(&self) -> StatsSnapshot73         pub(super) fn snapshot(&self) -> StatsSnapshot {
74             self.into()
75         }
76 
mark_updated(&mut self)77         pub(super) fn mark_updated(&mut self) {
78             self.last_update = Instant::now();
79         }
80 
duration_since_last_update(&self) -> Duration81         pub(super) fn duration_since_last_update(&self) -> Duration {
82             self.last_update.elapsed()
83         }
84 
record_sender_report(&mut self, packets_sent: u32, bytes_sent: u32)85         pub(super) fn record_sender_report(&mut self, packets_sent: u32, bytes_sent: u32) {
86             self.remote_reports_sent += 1;
87             self.remote_packets_sent = packets_sent;
88             self.remote_bytes_sent = bytes_sent;
89         }
90 
record_remote_round_trip_time(&mut self, round_trip_time: Option<f64>)91         pub(super) fn record_remote_round_trip_time(&mut self, round_trip_time: Option<f64>) {
92             // Store the latest measurement, even if it's None.
93             self.remote_round_trip_time = round_trip_time;
94 
95             if let Some(rtt) = round_trip_time {
96                 // Only if we have a valid measurement do we update the totals
97                 self.remote_total_round_trip_time += rtt;
98                 self.remote_round_trip_time_measurements += 1;
99             }
100         }
101     }
102 
103     /// A point in time snapshot of the stream stats for an inbound RTP stream.
104     ///
105     /// Created by [`StreamStats::snapshot`].
106     #[derive(Debug)]
107     pub struct StatsSnapshot {
108         /// Received RTP stats.
109         rtp_stats: RTPStats,
110         /// Common RTCP stats derived from inbound and outbound RTCP packets.
111         rtcp_stats: RTCPStats,
112 
113         /// The number of packets sent as reported in the latest SR from the remote.
114         remote_packets_sent: u32,
115 
116         /// The number of bytes sent as reported in the latest SR from the remote.
117         remote_bytes_sent: u32,
118 
119         /// The total number of sender reports sent by the remote and received.
120         remote_reports_sent: u64,
121 
122         /// The last remote round trip time measurement in ms. [`None`] if no round trip time has
123         /// been derived yet, or if it wasn't possible to derive it.
124         remote_round_trip_time: Option<f64>,
125 
126         /// The cummulative total round trip times reported in ms.
127         remote_total_round_trip_time: f64,
128 
129         /// The total number of measurements of the remote round trip time.
130         remote_round_trip_time_measurements: u64,
131     }
132 
133     impl StatsSnapshot {
packets_received(&self) -> u64134         pub fn packets_received(&self) -> u64 {
135             self.rtp_stats.packets
136         }
137 
payload_bytes_received(&self) -> u64138         pub fn payload_bytes_received(&self) -> u64 {
139             self.rtp_stats.payload_bytes
140         }
141 
header_bytes_received(&self) -> u64142         pub fn header_bytes_received(&self) -> u64 {
143             self.rtp_stats.header_bytes
144         }
145 
last_packet_received_timestamp(&self) -> Option<SystemTime>146         pub fn last_packet_received_timestamp(&self) -> Option<SystemTime> {
147             self.rtp_stats.last_packet_timestamp
148         }
149 
nacks_sent(&self) -> u64150         pub fn nacks_sent(&self) -> u64 {
151             self.rtcp_stats.nack_count
152         }
153 
firs_sent(&self) -> u64154         pub fn firs_sent(&self) -> u64 {
155             self.rtcp_stats.fir_count
156         }
157 
plis_sent(&self) -> u64158         pub fn plis_sent(&self) -> u64 {
159             self.rtcp_stats.pli_count
160         }
remote_packets_sent(&self) -> u32161         pub fn remote_packets_sent(&self) -> u32 {
162             self.remote_packets_sent
163         }
164 
remote_bytes_sent(&self) -> u32165         pub fn remote_bytes_sent(&self) -> u32 {
166             self.remote_bytes_sent
167         }
168 
remote_reports_sent(&self) -> u64169         pub fn remote_reports_sent(&self) -> u64 {
170             self.remote_reports_sent
171         }
172 
remote_round_trip_time(&self) -> Option<f64>173         pub fn remote_round_trip_time(&self) -> Option<f64> {
174             self.remote_round_trip_time
175         }
176 
remote_total_round_trip_time(&self) -> f64177         pub fn remote_total_round_trip_time(&self) -> f64 {
178             self.remote_total_round_trip_time
179         }
180 
remote_round_trip_time_measurements(&self) -> u64181         pub fn remote_round_trip_time_measurements(&self) -> u64 {
182             self.remote_round_trip_time_measurements
183         }
184     }
185 
186     impl From<&StreamStats> for StatsSnapshot {
from(stream_stats: &StreamStats) -> Self187         fn from(stream_stats: &StreamStats) -> Self {
188             Self {
189                 rtp_stats: stream_stats.rtp_stats.clone(),
190                 rtcp_stats: stream_stats.rtcp_stats.clone(),
191                 remote_packets_sent: stream_stats.remote_packets_sent,
192                 remote_bytes_sent: stream_stats.remote_bytes_sent,
193                 remote_reports_sent: stream_stats.remote_reports_sent,
194                 remote_round_trip_time: stream_stats.remote_round_trip_time,
195                 remote_total_round_trip_time: stream_stats.remote_total_round_trip_time,
196                 remote_round_trip_time_measurements: stream_stats
197                     .remote_round_trip_time_measurements,
198             }
199         }
200     }
201 }
202 
203 /// Types related to outbound RTP streams.
204 mod outbound {
205     use std::time::SystemTime;
206 
207     use tokio::time::{Duration, Instant};
208 
209     use super::{RTCPStats, RTPStats};
210 
211     #[derive(Debug, Clone)]
212     /// Stats collected for an outbound RTP stream.
213     /// Contains both stats relating to the outbound stream and remote stats for the corresponding
214     /// inbound stream.
215     pub(super) struct StreamStats {
216         /// Sent RTP stats.
217         pub(super) rtp_stats: RTPStats,
218         /// Common RTCP stats derived from inbound and outbound RTCP packets.
219         pub(super) rtcp_stats: RTCPStats,
220 
221         /// The last time any stats where update, used for garbage collection to remove obsolete stats.
222         last_update: Instant,
223 
224         /// The first value of extended seq num that was sent in an SR for this SSRC. [`None`] before
225         /// the first SR is sent.
226         ///
227         /// Used to calculate packet statistic for remote stats.
228         initial_outbound_ext_seq_num: Option<u32>,
229 
230         /// The number of inbound packets received by the remote side for this stream.
231         remote_packets_received: u64,
232 
233         /// The number of lost packets reported by the remote for this tream.
234         remote_total_lost: u32,
235 
236         /// The estimated remote jitter for this stream in timestamp units.
237         remote_jitter: u32,
238 
239         /// The last remote round trip time measurement in ms. [`None`] if no round trip time has
240         /// been derived yet, or if it wasn't possible to derive it.
241         remote_round_trip_time: Option<f64>,
242 
243         /// The cummulative total round trip times reported in ms.
244         remote_total_round_trip_time: f64,
245 
246         /// The total number of measurements of the remote round trip time.
247         remote_round_trip_time_measurements: u64,
248 
249         /// The latest fraction lost value from RR.
250         remote_fraction_lost: Option<u8>,
251     }
252 
253     impl Default for StreamStats {
default() -> Self254         fn default() -> Self {
255             Self {
256                 rtp_stats: RTPStats::default(),
257                 rtcp_stats: RTCPStats::default(),
258                 last_update: Instant::now(),
259                 initial_outbound_ext_seq_num: None,
260                 remote_packets_received: 0,
261                 remote_total_lost: 0,
262                 remote_jitter: 0,
263                 remote_round_trip_time: None,
264                 remote_total_round_trip_time: 0.0,
265                 remote_round_trip_time_measurements: 0,
266                 remote_fraction_lost: None,
267             }
268         }
269     }
270 
271     impl StreamStats {
snapshot(&self) -> StatsSnapshot272         pub(super) fn snapshot(&self) -> StatsSnapshot {
273             self.into()
274         }
275 
mark_updated(&mut self)276         pub(super) fn mark_updated(&mut self) {
277             self.last_update = Instant::now();
278         }
279 
duration_since_last_update(&self) -> Duration280         pub(super) fn duration_since_last_update(&self) -> Duration {
281             self.last_update.elapsed()
282         }
283 
update_remote_inbound_packets_received( &mut self, rr_ext_seq_num: u32, rr_total_lost: u32, )284         pub(super) fn update_remote_inbound_packets_received(
285             &mut self,
286             rr_ext_seq_num: u32,
287             rr_total_lost: u32,
288         ) {
289             if let Some(initial_ext_seq_num) = self.initial_outbound_ext_seq_num {
290                 // Total number of RTP packets received for this SSRC.
291                 // At the receiving endpoint, this is calculated as defined in [RFC3550] section 6.4.1.
292                 // At the sending endpoint the packetsReceived is estimated by subtracting the
293                 // Cumulative Number of Packets Lost from the Extended Highest Sequence Number Received,
294                 // both reported in the RTCP Receiver Report, and then subtracting the
295                 // initial Extended Sequence Number that was sent to this SSRC in a RTCP Sender Report and then adding one,
296                 // to mirror what is discussed in Appendix A.3 in [RFC3550], but for the sender side.
297                 // If no RTCP Receiver Report has been received yet, then return 0.
298                 self.remote_packets_received =
299                     (rr_ext_seq_num as u64) - (rr_total_lost as u64) - (initial_ext_seq_num as u64)
300                         + 1;
301             }
302         }
303 
304         #[inline(always)]
record_sr_ext_seq_num(&mut self, seq_num: u32)305         pub(super) fn record_sr_ext_seq_num(&mut self, seq_num: u32) {
306             // Only record the initial value
307             if self.initial_outbound_ext_seq_num.is_none() {
308                 self.initial_outbound_ext_seq_num = Some(seq_num);
309             }
310         }
311 
record_remote_round_trip_time(&mut self, round_trip_time: Option<f64>)312         pub(super) fn record_remote_round_trip_time(&mut self, round_trip_time: Option<f64>) {
313             // Store the latest measurement, even if it's None.
314             self.remote_round_trip_time = round_trip_time;
315 
316             if let Some(rtt) = round_trip_time {
317                 // Only if we have a valid measurement do we update the totals
318                 self.remote_total_round_trip_time += rtt;
319                 self.remote_round_trip_time_measurements += 1;
320             }
321         }
322 
update_remote_fraction_lost(&mut self, fraction_lost: u8)323         pub(super) fn update_remote_fraction_lost(&mut self, fraction_lost: u8) {
324             self.remote_fraction_lost = Some(fraction_lost);
325         }
326 
update_remote_jitter(&mut self, jitter: u32)327         pub(super) fn update_remote_jitter(&mut self, jitter: u32) {
328             self.remote_jitter = jitter;
329         }
330 
update_remote_total_lost(&mut self, lost: u32)331         pub(super) fn update_remote_total_lost(&mut self, lost: u32) {
332             self.remote_total_lost = lost;
333         }
334     }
335 
336     /// A point in time snapshot of the stream stats for an outbound RTP stream.
337     ///
338     /// Created by [`StreamStats::snapshot`].
339     #[derive(Debug)]
340     pub struct StatsSnapshot {
341         /// Sent RTP stats.
342         rtp_stats: RTPStats,
343         /// Common RTCP stats derived from inbound and outbound RTCP packets.
344         rtcp_stats: RTCPStats,
345 
346         /// The number of inbound packets received by the remote side for this stream.
347         remote_packets_received: u64,
348 
349         /// The number of lost packets reported by the remote for this tream.
350         remote_total_lost: u32,
351 
352         /// The estimated remote jitter for this stream in timestamp units.
353         remote_jitter: u32,
354 
355         /// The most recent remote round trip time in milliseconds.
356         remote_round_trip_time: Option<f64>,
357 
358         /// The cummulative total round trip times reported in ms.
359         remote_total_round_trip_time: f64,
360 
361         /// The total number of measurements of the remote round trip time.
362         remote_round_trip_time_measurements: u64,
363 
364         /// The fraction of packets lost reported for this stream.
365         /// Calculated as defined in [RFC3550](https://www.rfc-editor.org/rfc/rfc3550) section 6.4.1 and Appendix A.3.
366         remote_fraction_lost: Option<f64>,
367     }
368 
369     impl StatsSnapshot {
packets_sent(&self) -> u64370         pub fn packets_sent(&self) -> u64 {
371             self.rtp_stats.packets
372         }
373 
payload_bytes_sent(&self) -> u64374         pub fn payload_bytes_sent(&self) -> u64 {
375             self.rtp_stats.payload_bytes
376         }
377 
header_bytes_sent(&self) -> u64378         pub fn header_bytes_sent(&self) -> u64 {
379             self.rtp_stats.header_bytes
380         }
381 
last_packet_sent_timestamp(&self) -> Option<SystemTime>382         pub fn last_packet_sent_timestamp(&self) -> Option<SystemTime> {
383             self.rtp_stats.last_packet_timestamp
384         }
385 
nacks_received(&self) -> u64386         pub fn nacks_received(&self) -> u64 {
387             self.rtcp_stats.nack_count
388         }
389 
firs_received(&self) -> u64390         pub fn firs_received(&self) -> u64 {
391             self.rtcp_stats.fir_count
392         }
393 
plis_received(&self) -> u64394         pub fn plis_received(&self) -> u64 {
395             self.rtcp_stats.pli_count
396         }
397 
398         /// Packets received on the remote side.
remote_packets_received(&self) -> u64399         pub fn remote_packets_received(&self) -> u64 {
400             self.remote_packets_received
401         }
402 
403         /// The number of lost packets reported by the remote for this tream.
remote_total_lost(&self) -> u32404         pub fn remote_total_lost(&self) -> u32 {
405             self.remote_total_lost
406         }
407 
408         /// The estimated remote jitter for this stream in timestamp units.
remote_jitter(&self) -> u32409         pub fn remote_jitter(&self) -> u32 {
410             self.remote_jitter
411         }
412 
413         /// The latest RTT in ms if enough data is available to measure it.
remote_round_trip_time(&self) -> Option<f64>414         pub fn remote_round_trip_time(&self) -> Option<f64> {
415             self.remote_round_trip_time
416         }
417 
418         /// Total RTT in ms.
remote_total_round_trip_time(&self) -> f64419         pub fn remote_total_round_trip_time(&self) -> f64 {
420             self.remote_total_round_trip_time
421         }
422 
423         /// The number of RTT measurements so far.
remote_round_trip_time_measurements(&self) -> u64424         pub fn remote_round_trip_time_measurements(&self) -> u64 {
425             self.remote_round_trip_time_measurements
426         }
427 
428         /// The latest fraction lost value from the remote or None if it hasn't been reported yet.
remote_fraction_lost(&self) -> Option<f64>429         pub fn remote_fraction_lost(&self) -> Option<f64> {
430             self.remote_fraction_lost
431         }
432     }
433 
434     impl From<&StreamStats> for StatsSnapshot {
from(stream_stats: &StreamStats) -> Self435         fn from(stream_stats: &StreamStats) -> Self {
436             Self {
437                 rtp_stats: stream_stats.rtp_stats.clone(),
438                 rtcp_stats: stream_stats.rtcp_stats.clone(),
439                 remote_packets_received: stream_stats.remote_packets_received,
440                 remote_total_lost: stream_stats.remote_total_lost,
441                 remote_jitter: stream_stats.remote_jitter,
442                 remote_round_trip_time: stream_stats.remote_round_trip_time,
443                 remote_total_round_trip_time: stream_stats.remote_total_round_trip_time,
444                 remote_round_trip_time_measurements: stream_stats
445                     .remote_round_trip_time_measurements,
446                 remote_fraction_lost: stream_stats
447                     .remote_fraction_lost
448                     .map(|fraction| (fraction as f64) / (u8::MAX as f64)),
449             }
450         }
451     }
452 }
453 
454 #[derive(Default, Debug)]
455 struct StatsContainer {
456     inbound_stats: HashMap<u32, inbound::StreamStats>,
457     outbound_stats: HashMap<u32, outbound::StreamStats>,
458 }
459 
460 impl StatsContainer {
get_or_create_inbound_stream_stats(&mut self, ssrc: u32) -> &mut inbound::StreamStats461     fn get_or_create_inbound_stream_stats(&mut self, ssrc: u32) -> &mut inbound::StreamStats {
462         self.inbound_stats.entry(ssrc).or_default()
463     }
464 
get_or_create_outbound_stream_stats(&mut self, ssrc: u32) -> &mut outbound::StreamStats465     fn get_or_create_outbound_stream_stats(&mut self, ssrc: u32) -> &mut outbound::StreamStats {
466         self.outbound_stats.entry(ssrc).or_default()
467     }
468 
get_inbound_stats(&self, ssrc: u32) -> Option<&inbound::StreamStats>469     fn get_inbound_stats(&self, ssrc: u32) -> Option<&inbound::StreamStats> {
470         self.inbound_stats.get(&ssrc)
471     }
472 
get_outbound_stats(&self, ssrc: u32) -> Option<&outbound::StreamStats>473     fn get_outbound_stats(&self, ssrc: u32) -> Option<&outbound::StreamStats> {
474         self.outbound_stats.get(&ssrc)
475     }
476 
remove_stale_entries(&mut self)477     fn remove_stale_entries(&mut self) {
478         const MAX_AGE: Duration = Duration::from_secs(60);
479 
480         self.inbound_stats
481             .retain(|_, s| s.duration_since_last_update() < MAX_AGE);
482         self.outbound_stats
483             .retain(|_, s| s.duration_since_last_update() < MAX_AGE);
484     }
485 }
486 
487 #[derive(Debug, Default, Clone, PartialEq, Eq)]
488 /// Records stats about a given RTP stream.
489 pub struct RTPStats {
490     /// Packets sent or received
491     packets: u64,
492 
493     /// Payload bytes sent or received
494     payload_bytes: u64,
495 
496     /// Header bytes sent or received
497     header_bytes: u64,
498 
499     /// A wall clock timestamp for when the last packet was sent or recieved encoded as milliseconds since
500     /// [`SystemTime::UNIX_EPOCH`].
501     last_packet_timestamp: Option<SystemTime>,
502 }
503 
504 impl RTPStats {
update(&mut self, header_bytes: u64, payload_bytes: u64, packets: u64, now: SystemTime)505     fn update(&mut self, header_bytes: u64, payload_bytes: u64, packets: u64, now: SystemTime) {
506         self.header_bytes += header_bytes;
507         self.payload_bytes += payload_bytes;
508         self.packets += packets;
509         self.last_packet_timestamp = Some(now);
510     }
511 
header_bytes(&self) -> u64512     pub fn header_bytes(&self) -> u64 {
513         self.header_bytes
514     }
515 
payload_bytes(&self) -> u64516     pub fn payload_bytes(&self) -> u64 {
517         self.payload_bytes
518     }
519 
packets(&self) -> u64520     pub fn packets(&self) -> u64 {
521         self.packets
522     }
523 
last_packet_timestamp(&self) -> Option<SystemTime>524     pub fn last_packet_timestamp(&self) -> Option<SystemTime> {
525         self.last_packet_timestamp
526     }
527 }
528 
529 #[derive(Debug, Default, Clone)]
530 pub struct RTCPStats {
531     /// The number of FIRs sent or recevied
532     fir_count: u64,
533 
534     /// The number of PLIs sent or recevied
535     pli_count: u64,
536 
537     /// The number of NACKs sent or recevied
538     nack_count: u64,
539 }
540 
541 impl RTCPStats {
542     #[allow(clippy::too_many_arguments)]
update(&mut self, fir_count: Option<u64>, pli_count: Option<u64>, nack_count: Option<u64>)543     fn update(&mut self, fir_count: Option<u64>, pli_count: Option<u64>, nack_count: Option<u64>) {
544         if let Some(fir_count) = fir_count {
545             self.fir_count += fir_count;
546         }
547 
548         if let Some(pli_count) = pli_count {
549             self.pli_count += pli_count;
550         }
551 
552         if let Some(nack_count) = nack_count {
553             self.nack_count += nack_count;
554         }
555     }
556 
fir_count(&self) -> u64557     pub fn fir_count(&self) -> u64 {
558         self.fir_count
559     }
560 
pli_count(&self) -> u64561     pub fn pli_count(&self) -> u64 {
562         self.pli_count
563     }
564 
nack_count(&self) -> u64565     pub fn nack_count(&self) -> u64 {
566         self.nack_count
567     }
568 }
569 
570 #[cfg(test)]
571 mod test {
572     use super::*;
573 
574     #[test]
test_rtp_stats()575     fn test_rtp_stats() {
576         let mut stats: RTPStats = Default::default();
577         assert_eq!(
578             (stats.header_bytes(), stats.payload_bytes(), stats.packets()),
579             (0, 0, 0),
580         );
581 
582         stats.update(24, 960, 1, SystemTime::now());
583 
584         assert_eq!(
585             (stats.header_bytes(), stats.payload_bytes(), stats.packets()),
586             (24, 960, 1),
587         );
588     }
589 
590     #[test]
test_rtcp_stats()591     fn test_rtcp_stats() {
592         let mut stats: RTCPStats = Default::default();
593         assert_eq!(
594             (stats.fir_count(), stats.pli_count(), stats.nack_count()),
595             (0, 0, 0),
596         );
597 
598         stats.update(Some(1), Some(2), Some(3));
599 
600         assert_eq!(
601             (stats.fir_count(), stats.pli_count(), stats.nack_count()),
602             (1, 2, 3),
603         );
604     }
605 
606     #[test]
test_rtp_stats_send_sync()607     fn test_rtp_stats_send_sync() {
608         fn test_send_sync<T: Send + Sync>() {}
609         test_send_sync::<RTPStats>();
610     }
611 
612     #[test]
test_rtcp_stats_send_sync()613     fn test_rtcp_stats_send_sync() {
614         fn test_send_sync<T: Send + Sync>() {}
615         test_send_sync::<RTCPStats>();
616     }
617 }
618