xref: /webrtc/interceptor/src/stats/interceptor.rs (revision 1f428f4d)
1 use std::collections::HashMap;
2 use std::fmt;
3 use std::sync::Arc;
4 use std::time::SystemTime;
5 
6 use super::{inbound, outbound, StatsContainer};
7 use async_trait::async_trait;
8 use rtcp::extended_report::{DLRRReportBlock, ExtendedReport};
9 use rtcp::payload_feedbacks::full_intra_request::FullIntraRequest;
10 use rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndication;
11 use rtcp::receiver_report::ReceiverReport;
12 use rtcp::sender_report::SenderReport;
13 use rtcp::transport_feedbacks::transport_layer_nack::TransportLayerNack;
14 use rtp::extension::abs_send_time_extension::unix2ntp;
15 use tokio::sync::{mpsc, oneshot};
16 use tokio::time::Duration;
17 
18 use util::sync::Mutex;
19 use util::{MarshalSize, Unmarshal};
20 
21 use crate::error::Result;
22 use crate::stream_info::StreamInfo;
23 use crate::{Attributes, Interceptor, RTCPReader, RTCPWriter, RTPReader, RTPWriter};
24 
25 #[derive(Debug)]
26 enum Message {
27     StatUpdate {
28         ssrc: u32,
29         update: StatsUpdate,
30     },
31     RequestInboundSnapshot {
32         ssrcs: Vec<u32>,
33         chan: oneshot::Sender<Vec<Option<inbound::StatsSnapshot>>>,
34     },
35     RequestOutboundSnapshot {
36         ssrcs: Vec<u32>,
37         chan: oneshot::Sender<Vec<Option<outbound::StatsSnapshot>>>,
38     },
39 }
40 
41 #[derive(Debug)]
42 enum StatsUpdate {
43     /// Stats collected on the receiving end(inbound) of an RTP stream.
44     InboundRTP {
45         packets: u64,
46         header_bytes: u64,
47         payload_bytes: u64,
48         last_packet_timestamp: SystemTime,
49     },
50     /// Stats collected on the sending end(outbound) of an RTP stream.
51     OutboundRTP {
52         packets: u64,
53         header_bytes: u64,
54         payload_bytes: u64,
55         last_packet_timestamp: SystemTime,
56     },
57     /// Stats collected from received RTCP packets.
58     InboundRTCP {
59         fir_count: Option<u64>,
60         pli_count: Option<u64>,
61         nack_count: Option<u64>,
62     },
63     /// Stats collected from sent RTCP packets.
64     OutboundRTCP {
65         fir_count: Option<u64>,
66         pli_count: Option<u64>,
67         nack_count: Option<u64>,
68     },
69     /// An extended sequence number sent in an SR.
70     OutboundSRExtSeqNum { seq_num: u32 },
71     /// Stats collected from received Receiver Reports i.e. where we have an outbound RTP stream.
72     InboundRecieverReport {
73         ext_seq_num: u32,
74         total_lost: u32,
75         jitter: u32,
76         rtt_ms: Option<f64>,
77         fraction_lost: u8,
78     },
79     /// Stats collected from recieved Sender Reports i.e. where we have an inbound RTP stream.
80     InboundSenderRerport {
81         packets_and_bytes_sent: Option<(u32, u32)>,
82         rtt_ms: Option<f64>,
83     },
84 }
85 
86 pub struct StatsInterceptor {
87     // Wrapped RTP streams
88     recv_streams: Mutex<HashMap<u32, Arc<RTPReadRecorder>>>,
89     send_streams: Mutex<HashMap<u32, Arc<RTPWriteRecorder>>>,
90 
91     tx: mpsc::Sender<Message>,
92 
93     id: String,
94     now_gen: Arc<dyn Fn() -> SystemTime + Send + Sync>,
95 }
96 
97 impl StatsInterceptor {
new(id: String) -> Self98     pub fn new(id: String) -> Self {
99         let (tx, rx) = mpsc::channel(100);
100 
101         tokio::spawn(run_stats_reducer(rx));
102 
103         Self {
104             id,
105             recv_streams: Default::default(),
106             send_streams: Default::default(),
107             tx,
108             now_gen: Arc::new(SystemTime::now),
109         }
110     }
111 
with_time_gen<F>(id: String, now_gen: F) -> Self where F: Fn() -> SystemTime + Send + Sync + 'static,112     fn with_time_gen<F>(id: String, now_gen: F) -> Self
113     where
114         F: Fn() -> SystemTime + Send + Sync + 'static,
115     {
116         let (tx, rx) = mpsc::channel(100);
117         tokio::spawn(run_stats_reducer(rx));
118 
119         Self {
120             id,
121             recv_streams: Default::default(),
122             send_streams: Default::default(),
123             tx,
124             now_gen: Arc::new(now_gen),
125         }
126     }
127 
fetch_inbound_stats( &self, ssrcs: Vec<u32>, ) -> Vec<Option<inbound::StatsSnapshot>>128     pub async fn fetch_inbound_stats(
129         &self,
130         ssrcs: Vec<u32>,
131     ) -> Vec<Option<inbound::StatsSnapshot>> {
132         let (tx, rx) = oneshot::channel();
133 
134         if let Err(e) = self
135             .tx
136             .send(Message::RequestInboundSnapshot { ssrcs, chan: tx })
137             .await
138         {
139             log::debug!(
140                 "Failed to fetch inbound RTP stream stats from stats task with error: {}",
141                 e
142             );
143 
144             return vec![];
145         }
146 
147         rx.await.unwrap_or_default()
148     }
149 
fetch_outbound_stats( &self, ssrcs: Vec<u32>, ) -> Vec<Option<outbound::StatsSnapshot>>150     pub async fn fetch_outbound_stats(
151         &self,
152         ssrcs: Vec<u32>,
153     ) -> Vec<Option<outbound::StatsSnapshot>> {
154         let (tx, rx) = oneshot::channel();
155 
156         if let Err(e) = self
157             .tx
158             .send(Message::RequestOutboundSnapshot { ssrcs, chan: tx })
159             .await
160         {
161             log::debug!(
162                 "Failed to fetch outbound RTP stream stats from stats task with error: {}",
163                 e
164             );
165 
166             return vec![];
167         }
168 
169         rx.await.unwrap_or_default()
170     }
171 }
172 
run_stats_reducer(mut rx: mpsc::Receiver<Message>)173 async fn run_stats_reducer(mut rx: mpsc::Receiver<Message>) {
174     let mut ssrc_stats: StatsContainer = Default::default();
175     let mut cleanup_ticker = tokio::time::interval(Duration::from_secs(10));
176 
177     loop {
178         tokio::select! {
179             maybe_msg = rx.recv() => {
180                 let msg = match maybe_msg {
181                     Some(m) => m,
182                     None => break,
183                 };
184 
185                 match msg {
186                     Message::StatUpdate { ssrc, update } => {
187                         handle_stats_update(&mut ssrc_stats, ssrc, update);
188                     }
189                     Message::RequestInboundSnapshot { ssrcs, chan} => {
190                         let result = ssrcs
191                             .into_iter()
192                             .map(|ssrc| ssrc_stats.get_inbound_stats(ssrc).map(inbound::StreamStats::snapshot))
193                             .collect();
194 
195                         let _ = chan.send(result);
196                     }
197                     Message::RequestOutboundSnapshot { ssrcs, chan} => {
198                         let result = ssrcs
199                             .into_iter()
200                             .map(|ssrc| ssrc_stats.get_outbound_stats(ssrc).map(outbound::StreamStats::snapshot))
201                             .collect();
202 
203                         let _ = chan.send(result);
204 
205                     }
206                 }
207 
208             }
209             _ = cleanup_ticker.tick() => {
210                 ssrc_stats.remove_stale_entries();
211             }
212         }
213     }
214 }
215 
handle_stats_update(ssrc_stats: &mut StatsContainer, ssrc: u32, update: StatsUpdate)216 fn handle_stats_update(ssrc_stats: &mut StatsContainer, ssrc: u32, update: StatsUpdate) {
217     match update {
218         StatsUpdate::InboundRTP {
219             packets,
220             header_bytes,
221             payload_bytes,
222             last_packet_timestamp,
223         } => {
224             let stats = ssrc_stats.get_or_create_inbound_stream_stats(ssrc);
225 
226             stats
227                 .rtp_stats
228                 .update(header_bytes, payload_bytes, packets, last_packet_timestamp);
229             stats.mark_updated();
230         }
231         StatsUpdate::OutboundRTP {
232             packets,
233             header_bytes,
234             payload_bytes,
235             last_packet_timestamp,
236         } => {
237             let stats = ssrc_stats.get_or_create_outbound_stream_stats(ssrc);
238             stats
239                 .rtp_stats
240                 .update(header_bytes, payload_bytes, packets, last_packet_timestamp);
241             stats.mark_updated();
242         }
243         StatsUpdate::InboundRTCP {
244             fir_count,
245             pli_count,
246             nack_count,
247         } => {
248             let stats = ssrc_stats.get_or_create_outbound_stream_stats(ssrc);
249             stats.rtcp_stats.update(fir_count, pli_count, nack_count);
250             stats.mark_updated();
251         }
252         StatsUpdate::OutboundRTCP {
253             fir_count,
254             pli_count,
255             nack_count,
256         } => {
257             let stats = ssrc_stats.get_or_create_inbound_stream_stats(ssrc);
258             stats.rtcp_stats.update(fir_count, pli_count, nack_count);
259             stats.mark_updated();
260         }
261         StatsUpdate::OutboundSRExtSeqNum { seq_num } => {
262             let stats = ssrc_stats.get_or_create_outbound_stream_stats(ssrc);
263             stats.record_sr_ext_seq_num(seq_num);
264             stats.mark_updated();
265         }
266         StatsUpdate::InboundRecieverReport {
267             ext_seq_num,
268             total_lost,
269             jitter,
270             rtt_ms,
271             fraction_lost,
272         } => {
273             let stats = ssrc_stats.get_or_create_outbound_stream_stats(ssrc);
274             stats.record_remote_round_trip_time(rtt_ms);
275             stats.update_remote_fraction_lost(fraction_lost);
276             stats.update_remote_total_lost(total_lost);
277             stats.update_remote_inbound_packets_received(ext_seq_num, total_lost);
278             stats.update_remote_jitter(jitter);
279 
280             stats.mark_updated();
281         }
282         StatsUpdate::InboundSenderRerport {
283             rtt_ms,
284             packets_and_bytes_sent,
285         } => {
286             // This is a sender report we received, as such it concerns an RTP stream that's
287             // outbound at the remote.
288             let stats = ssrc_stats.get_or_create_inbound_stream_stats(ssrc);
289 
290             if let Some((packets_sent, bytes_sent)) = packets_and_bytes_sent {
291                 stats.record_sender_report(packets_sent, bytes_sent);
292             }
293             stats.record_remote_round_trip_time(rtt_ms);
294 
295             stats.mark_updated();
296         }
297     }
298 }
299 
300 #[async_trait]
301 impl Interceptor for StatsInterceptor {
302     /// bind_remote_stream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
303     /// will be called once per rtp packet.
bind_remote_stream( &self, info: &StreamInfo, reader: Arc<dyn RTPReader + Send + Sync>, ) -> Arc<dyn RTPReader + Send + Sync>304     async fn bind_remote_stream(
305         &self,
306         info: &StreamInfo,
307         reader: Arc<dyn RTPReader + Send + Sync>,
308     ) -> Arc<dyn RTPReader + Send + Sync> {
309         let mut lock = self.recv_streams.lock();
310 
311         let e = lock
312             .entry(info.ssrc)
313             .or_insert_with(|| Arc::new(RTPReadRecorder::new(reader, self.tx.clone())));
314 
315         e.clone()
316     }
317 
318     /// unbind_remote_stream is called when the Stream is removed. It can be used to clean up any data related to that track.
unbind_remote_stream(&self, info: &StreamInfo)319     async fn unbind_remote_stream(&self, info: &StreamInfo) {
320         let mut lock = self.recv_streams.lock();
321 
322         lock.remove(&info.ssrc);
323     }
324 
325     /// bind_local_stream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method
326     /// will be called once per rtp packet.
bind_local_stream( &self, info: &StreamInfo, writer: Arc<dyn RTPWriter + Send + Sync>, ) -> Arc<dyn RTPWriter + Send + Sync>327     async fn bind_local_stream(
328         &self,
329         info: &StreamInfo,
330         writer: Arc<dyn RTPWriter + Send + Sync>,
331     ) -> Arc<dyn RTPWriter + Send + Sync> {
332         let mut lock = self.send_streams.lock();
333 
334         let e = lock
335             .entry(info.ssrc)
336             .or_insert_with(|| Arc::new(RTPWriteRecorder::new(writer, self.tx.clone())));
337 
338         e.clone()
339     }
340 
341     /// unbind_local_stream is called when the Stream is removed. It can be used to clean up any data related to that track.
unbind_local_stream(&self, info: &StreamInfo)342     async fn unbind_local_stream(&self, info: &StreamInfo) {
343         let mut lock = self.send_streams.lock();
344 
345         lock.remove(&info.ssrc);
346     }
347 
close(&self) -> Result<()>348     async fn close(&self) -> Result<()> {
349         Ok(())
350     }
351 
352     /// bind_rtcp_writer lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
353     /// will be called once per packet batch.
bind_rtcp_writer( &self, writer: Arc<dyn RTCPWriter + Send + Sync>, ) -> Arc<dyn RTCPWriter + Send + Sync>354     async fn bind_rtcp_writer(
355         &self,
356         writer: Arc<dyn RTCPWriter + Send + Sync>,
357     ) -> Arc<dyn RTCPWriter + Send + Sync> {
358         let now = self.now_gen.clone();
359 
360         Arc::new(RTCPWriteInterceptor {
361             rtcp_writer: writer,
362             tx: self.tx.clone(),
363             now_gen: move || now(),
364         })
365     }
366 
367     /// bind_rtcp_reader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might
368     /// change in the future. The returned method will be called once per packet batch.
bind_rtcp_reader( &self, reader: Arc<dyn RTCPReader + Send + Sync>, ) -> Arc<dyn RTCPReader + Send + Sync>369     async fn bind_rtcp_reader(
370         &self,
371         reader: Arc<dyn RTCPReader + Send + Sync>,
372     ) -> Arc<dyn RTCPReader + Send + Sync> {
373         let now = self.now_gen.clone();
374 
375         Arc::new(RTCPReadInterceptor {
376             rtcp_reader: reader,
377             tx: self.tx.clone(),
378             now_gen: move || now(),
379         })
380     }
381 }
382 
383 pub struct RTCPReadInterceptor<F> {
384     rtcp_reader: Arc<dyn RTCPReader + Send + Sync>,
385     tx: mpsc::Sender<Message>,
386     now_gen: F,
387 }
388 
389 #[async_trait]
390 impl<F> RTCPReader for RTCPReadInterceptor<F>
391 where
392     F: Fn() -> SystemTime + Send + Sync,
393 {
394     /// read a batch of rtcp packets
read(&self, buf: &mut [u8], attributes: &Attributes) -> Result<(usize, Attributes)>395     async fn read(&self, buf: &mut [u8], attributes: &Attributes) -> Result<(usize, Attributes)> {
396         let (n, attributes) = self.rtcp_reader.read(buf, attributes).await?;
397 
398         let mut b = &buf[..n];
399         let pkts = rtcp::packet::unmarshal(&mut b)?;
400         // Middle 32 bits
401         let now = (unix2ntp((self.now_gen)()) >> 16) as u32;
402 
403         #[derive(Default, Debug)]
404         struct GenericRTCP {
405             fir_count: Option<u64>,
406             pli_count: Option<u64>,
407             nack_count: Option<u64>,
408         }
409 
410         #[derive(Default, Debug)]
411         struct ReceiverReportEntry {
412             /// Extended sequence number value from Receiver Report, used to calculate remote
413             /// stats.
414             ext_seq_num: u32,
415             /// Total loss value from Receiver Report, used to calculate remote
416             /// stats.
417             total_lost: u32,
418             /// Jitter from Receiver Report.
419             jitter: u32,
420             /// Round Trip Time calculated from Receiver Report.
421             rtt_ms: Option<f64>,
422             /// Fraction of packets lost.
423             fraction_lost: u8,
424         }
425 
426         #[derive(Default, Debug)]
427         struct SenderReportEntry {
428             /// NTP timestamp(from Sender Report).
429             sr_ntp_time: Option<u64>,
430             /// Packets Sent(from Sender Report).
431             sr_packets_sent: Option<u32>,
432             /// Bytes Sent(from Sender Report).
433             sr_bytes_sent: Option<u32>,
434             /// Last RR timestamp(middle bits) from DLRR extended report block.
435             dlrr_last_rr: Option<u32>,
436             /// Delay since last RR from DLRR extended report block.
437             dlrr_delay_rr: Option<u32>,
438         }
439 
440         #[derive(Default, Debug)]
441         struct Entry {
442             generic_rtcp: GenericRTCP,
443             receiver_reports: Vec<ReceiverReportEntry>,
444             sender_reports: Vec<SenderReportEntry>,
445         }
446         let updates = pkts
447             .iter()
448             .fold(HashMap::<u32, Entry>::new(), |mut acc, p| {
449                 if let Some(rr) = p.as_any().downcast_ref::<ReceiverReport>() {
450                     for recp in &rr.reports {
451                         let e = acc.entry(recp.ssrc).or_default();
452 
453                         let rtt_ms = if recp.delay != 0 {
454                             calculate_rtt_ms(now, recp.delay, recp.last_sender_report)
455                         } else {
456                             None
457                         };
458 
459                         e.receiver_reports.push(ReceiverReportEntry {
460                             ext_seq_num: recp.last_sequence_number,
461                             total_lost: recp.total_lost,
462                             jitter: recp.jitter,
463                             rtt_ms,
464                             fraction_lost: recp.fraction_lost,
465                         });
466                     }
467                 } else if let Some(fir) = p.as_any().downcast_ref::<FullIntraRequest>() {
468                     for fir_entry in &fir.fir {
469                         let e = acc.entry(fir_entry.ssrc).or_default();
470                         e.generic_rtcp.fir_count =
471                             e.generic_rtcp.fir_count.map(|v| v + 1).or(Some(1));
472                     }
473                 } else if let Some(pli) = p.as_any().downcast_ref::<PictureLossIndication>() {
474                     let e = acc.entry(pli.media_ssrc).or_default();
475                     e.generic_rtcp.pli_count = e.generic_rtcp.pli_count.map(|v| v + 1).or(Some(1));
476                 } else if let Some(nack) = p.as_any().downcast_ref::<TransportLayerNack>() {
477                     let count = nack.nacks.iter().flat_map(|p| p.into_iter()).count() as u64;
478 
479                     let e = acc.entry(nack.media_ssrc).or_default();
480                     e.generic_rtcp.nack_count =
481                         e.generic_rtcp.nack_count.map(|v| v + count).or(Some(count));
482                 } else if let Some(sr) = p.as_any().downcast_ref::<SenderReport>() {
483                     let e = acc.entry(sr.ssrc).or_default();
484                     let sr_e = {
485                         let need_new_entry = e
486                             .sender_reports
487                             .last()
488                             .map(|e| e.sr_packets_sent.is_some())
489                             .unwrap_or(true);
490 
491                         if need_new_entry {
492                             e.sender_reports.push(Default::default());
493                         }
494 
495                         // SAFETY: Unrwap ok because we just added an entry above
496                         e.sender_reports.last_mut().unwrap()
497                     };
498 
499                     sr_e.sr_ntp_time = Some(sr.ntp_time);
500                     sr_e.sr_packets_sent = Some(sr.packet_count);
501                     sr_e.sr_bytes_sent = Some(sr.octet_count);
502                 } else if let Some(xr) = p.as_any().downcast_ref::<ExtendedReport>() {
503                     // Extended Report(XR)
504 
505                     // We only care about DLRR reports
506                     let dlrrs = xr.reports.iter().flat_map(|report| {
507                         let dlrr = report.as_any().downcast_ref::<DLRRReportBlock>();
508 
509                         dlrr.map(|b| b.reports.iter()).into_iter().flatten()
510                     });
511 
512                     for dlrr in dlrrs {
513                         let e = acc.entry(dlrr.ssrc).or_default();
514                         let sr_e = {
515                             let need_new_entry = e
516                                 .sender_reports
517                                 .last()
518                                 .map(|e| e.dlrr_last_rr.is_some())
519                                 .unwrap_or(true);
520 
521                             if need_new_entry {
522                                 e.sender_reports.push(Default::default());
523                             }
524 
525                             // SAFETY: Unrwap ok because we just added an entry above
526                             e.sender_reports.last_mut().unwrap()
527                         };
528 
529                         sr_e.dlrr_last_rr = Some(dlrr.last_rr);
530                         sr_e.dlrr_delay_rr = Some(dlrr.dlrr);
531                     }
532                 }
533 
534                 acc
535             });
536 
537         for (
538             ssrc,
539             Entry {
540                 generic_rtcp,
541                 mut receiver_reports,
542                 mut sender_reports,
543             },
544         ) in updates.into_iter()
545         {
546             // Sort RR by seq number low to high
547             receiver_reports.sort_by(|a, b| a.ext_seq_num.cmp(&b.ext_seq_num));
548             // Sort SR by ntp time, low to high
549             sender_reports
550                 .sort_by(|a, b| a.sr_ntp_time.unwrap_or(0).cmp(&b.sr_ntp_time.unwrap_or(0)));
551 
552             let _ = self
553                 .tx
554                 .send(Message::StatUpdate {
555                     ssrc,
556                     update: StatsUpdate::InboundRTCP {
557                         fir_count: generic_rtcp.fir_count,
558                         pli_count: generic_rtcp.pli_count,
559                         nack_count: generic_rtcp.nack_count,
560                     },
561                 })
562                 .await;
563 
564             let futures = receiver_reports.into_iter().map(|rr| {
565                 self.tx.send(Message::StatUpdate {
566                     ssrc,
567                     update: StatsUpdate::InboundRecieverReport {
568                         ext_seq_num: rr.ext_seq_num,
569                         total_lost: rr.total_lost,
570                         jitter: rr.jitter,
571                         rtt_ms: rr.rtt_ms,
572                         fraction_lost: rr.fraction_lost,
573                     },
574                 })
575             });
576             for fut in futures {
577                 // TODO: Use futures::join_all
578                 let _ = fut.await;
579             }
580 
581             let futures = sender_reports.into_iter().map(|sr| {
582                 let rtt_ms = match (sr.dlrr_last_rr, sr.dlrr_delay_rr, sr.sr_packets_sent) {
583                     (Some(last_rr), Some(delay_rr), Some(_)) if last_rr != 0 && delay_rr != 0 => {
584                         calculate_rtt_ms(now, delay_rr, last_rr)
585                     }
586                     _ => None,
587                 };
588 
589                 self.tx.send(Message::StatUpdate {
590                     ssrc,
591                     update: StatsUpdate::InboundSenderRerport {
592                         packets_and_bytes_sent: sr
593                             .sr_packets_sent
594                             .and_then(|ps| sr.sr_bytes_sent.map(|bs| (ps, bs))),
595                         rtt_ms,
596                     },
597                 })
598             });
599             for fut in futures {
600                 // TODO: Use futures::join_all
601                 let _ = fut.await;
602             }
603         }
604 
605         Ok((n, attributes))
606     }
607 }
608 
609 pub struct RTCPWriteInterceptor<F> {
610     rtcp_writer: Arc<dyn RTCPWriter + Send + Sync>,
611     tx: mpsc::Sender<Message>,
612     now_gen: F,
613 }
614 
615 #[async_trait]
616 impl<F> RTCPWriter for RTCPWriteInterceptor<F>
617 where
618     F: Fn() -> SystemTime + Send + Sync,
619 {
write( &self, pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>], attributes: &Attributes, ) -> Result<usize>620     async fn write(
621         &self,
622         pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>],
623         attributes: &Attributes,
624     ) -> Result<usize> {
625         #[derive(Default, Debug)]
626         struct Entry {
627             fir_count: Option<u64>,
628             pli_count: Option<u64>,
629             nack_count: Option<u64>,
630             sr_ext_seq_num: Option<u32>,
631         }
632         let updates = pkts
633             .iter()
634             .fold(HashMap::<u32, Entry>::new(), |mut acc, p| {
635                 if let Some(fir) = p.as_any().downcast_ref::<FullIntraRequest>() {
636                     for fir_entry in &fir.fir {
637                         let e = acc.entry(fir_entry.ssrc).or_default();
638                         e.fir_count = e.fir_count.map(|v| v + 1).or(Some(1));
639                     }
640                 } else if let Some(pli) = p.as_any().downcast_ref::<PictureLossIndication>() {
641                     let e = acc.entry(pli.media_ssrc).or_default();
642                     e.pli_count = e.pli_count.map(|v| v + 1).or(Some(1));
643                 } else if let Some(nack) = p.as_any().downcast_ref::<TransportLayerNack>() {
644                     let count = nack.nacks.iter().flat_map(|p| p.into_iter()).count() as u64;
645 
646                     let e = acc.entry(nack.media_ssrc).or_default();
647                     e.nack_count = e.nack_count.map(|v| v + count).or(Some(count));
648                 } else if let Some(sr) = p.as_any().downcast_ref::<SenderReport>() {
649                     for rep in &sr.reports {
650                         let e = acc.entry(rep.ssrc).or_default();
651 
652                         match e.sr_ext_seq_num {
653                             // We want the initial value for `last_sequence_number` from the first
654                             // SR. It's possible that an RTCP batch contains more than one SR, in
655                             // which case we should use the lowest value.
656                             Some(seq_num) if seq_num > rep.last_sequence_number => {
657                                 e.sr_ext_seq_num = Some(rep.last_sequence_number)
658                             }
659                             None => e.sr_ext_seq_num = Some(rep.last_sequence_number),
660                             _ => {}
661                         }
662                     }
663                 }
664 
665                 acc
666             });
667 
668         for (
669             ssrc,
670             Entry {
671                 fir_count,
672                 pli_count,
673                 nack_count,
674                 sr_ext_seq_num,
675             },
676         ) in updates.into_iter()
677         {
678             let _ = self
679                 .tx
680                 .send(Message::StatUpdate {
681                     ssrc,
682                     update: StatsUpdate::OutboundRTCP {
683                         fir_count,
684                         pli_count,
685                         nack_count,
686                     },
687                 })
688                 .await;
689 
690             if let Some(seq_num) = sr_ext_seq_num {
691                 let _ = self
692                     .tx
693                     .send(Message::StatUpdate {
694                         ssrc,
695                         update: StatsUpdate::OutboundSRExtSeqNum { seq_num },
696                     })
697                     .await;
698             }
699         }
700 
701         self.rtcp_writer.write(pkts, attributes).await
702     }
703 }
704 
705 pub struct RTPReadRecorder {
706     rtp_reader: Arc<dyn RTPReader + Send + Sync>,
707     tx: mpsc::Sender<Message>,
708 }
709 
710 impl RTPReadRecorder {
new(rtp_reader: Arc<dyn RTPReader + Send + Sync>, tx: mpsc::Sender<Message>) -> Self711     fn new(rtp_reader: Arc<dyn RTPReader + Send + Sync>, tx: mpsc::Sender<Message>) -> Self {
712         Self { rtp_reader, tx }
713     }
714 }
715 
716 impl fmt::Debug for RTPReadRecorder {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result717     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
718         f.debug_struct("RTPReadRecorder").finish()
719     }
720 }
721 
722 #[async_trait]
723 impl RTPReader for RTPReadRecorder {
read(&self, buf: &mut [u8], attributes: &Attributes) -> Result<(usize, Attributes)>724     async fn read(&self, buf: &mut [u8], attributes: &Attributes) -> Result<(usize, Attributes)> {
725         let (bytes_read, attributes) = self.rtp_reader.read(buf, attributes).await?;
726         // TODO: This parsing happens redundantly in several interceptors, would be good if we
727         // could not do this.
728         let mut b = &buf[..bytes_read];
729         let packet = rtp::packet::Packet::unmarshal(&mut b)?;
730 
731         let _ = self
732             .tx
733             .send(Message::StatUpdate {
734                 ssrc: packet.header.ssrc,
735                 update: StatsUpdate::InboundRTP {
736                     packets: 1,
737                     header_bytes: (bytes_read - packet.payload.len()) as u64,
738                     payload_bytes: packet.payload.len() as u64,
739                     last_packet_timestamp: SystemTime::now(),
740                 },
741             })
742             .await;
743 
744         Ok((bytes_read, attributes))
745     }
746 }
747 
748 pub struct RTPWriteRecorder {
749     rtp_writer: Arc<dyn RTPWriter + Send + Sync>,
750     tx: mpsc::Sender<Message>,
751 }
752 
753 impl RTPWriteRecorder {
new(rtp_writer: Arc<dyn RTPWriter + Send + Sync>, tx: mpsc::Sender<Message>) -> Self754     fn new(rtp_writer: Arc<dyn RTPWriter + Send + Sync>, tx: mpsc::Sender<Message>) -> Self {
755         Self { rtp_writer, tx }
756     }
757 }
758 
759 impl fmt::Debug for RTPWriteRecorder {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result760     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
761         f.debug_struct("RTPWriteRecorder").finish()
762     }
763 }
764 
765 #[async_trait]
766 impl RTPWriter for RTPWriteRecorder {
767     /// write a rtp packet
write(&self, pkt: &rtp::packet::Packet, attributes: &Attributes) -> Result<usize>768     async fn write(&self, pkt: &rtp::packet::Packet, attributes: &Attributes) -> Result<usize> {
769         let n = self.rtp_writer.write(pkt, attributes).await?;
770 
771         let _ = self
772             .tx
773             .send(Message::StatUpdate {
774                 ssrc: pkt.header.ssrc,
775                 update: StatsUpdate::OutboundRTP {
776                     packets: 1,
777                     header_bytes: pkt.header.marshal_size() as u64,
778                     payload_bytes: pkt.payload.len() as u64,
779                     last_packet_timestamp: SystemTime::now(),
780                 },
781             })
782             .await;
783 
784         Ok(n)
785     }
786 }
787 
788 /// Calculate the round trip time for a given peer as described in
789 /// [RFC3550 6.4.1](https://datatracker.ietf.org/doc/html/rfc3550#section-6.4.1).
790 ///
791 /// ## Params
792 ///
793 /// - `now` the current middle 32 bits of an NTP timestamp for the current time.
794 /// - `delay` the delay(`DLSR`) since last sender report expressed as fractions of a second in 32 bits.
795 /// - `last_report` the middle 32 bits of an NTP timestamp for the most recent sender report(LSR) or Receiver Report(LRR).
calculate_rtt_ms(now: u32, delay: u32, last_report: u32) -> Option<f64>796 fn calculate_rtt_ms(now: u32, delay: u32, last_report: u32) -> Option<f64> {
797     // [10 Nov 1995 11:33:25.125 UTC]       [10 Nov 1995 11:33:36.5 UTC]
798     // n                 SR(n)              A=b710:8000 (46864.500 s)
799     // ---------------------------------------------------------------->
800     //                    v                 ^
801     // ntp_sec =0xb44db705 v               ^ dlsr=0x0005:4000 (    5.250s)
802     // ntp_frac=0x20000000  v             ^  lsr =0xb705:2000 (46853.125s)
803     //   (3024992005.125 s)  v           ^
804     // r                      v         ^ RR(n)
805     // ---------------------------------------------------------------->
806     //                        |<-DLSR->|
807     //                         (5.250 s)
808     //
809     // A     0xb710:8000 (46864.500 s)
810     // DLSR -0x0005:4000 (    5.250 s)
811     // LSR  -0xb705:2000 (46853.125 s)
812     // -------------------------------
813     // delay 0x0006:2000 (    6.125 s)
814 
815     let rtt = now.checked_sub(delay)?.checked_sub(last_report)?;
816     let rtt_seconds = rtt >> 16;
817     let rtt_fraction = (rtt & (u16::MAX as u32)) as f64 / (u16::MAX as u32) as f64;
818 
819     Some(rtt_seconds as f64 * 1000.0 + rtt_fraction * 1000.0)
820 }
821 
822 #[cfg(test)]
823 mod test {
824     // Silence warning on `..Default::default()` with no effect:
825     #![allow(clippy::needless_update)]
826 
827     macro_rules! assert_feq {
828         ($left: expr, $right: expr) => {
829             assert_feq!($left, $right, 0.01);
830         };
831         ($left: expr, $right: expr, $eps: expr) => {
832             if ($left - $right).abs() >= $eps {
833                 panic!("{:?} was not within {:?} of {:?}", $left, $eps, $right);
834             }
835         };
836     }
837 
838     use bytes::Bytes;
839     use rtcp::extended_report::{DLRRReport, DLRRReportBlock, ExtendedReport};
840     use rtcp::payload_feedbacks::full_intra_request::{FirEntry, FullIntraRequest};
841     use rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndication;
842     use rtcp::receiver_report::ReceiverReport;
843     use rtcp::reception_report::ReceptionReport;
844     use rtcp::sender_report::SenderReport;
845     use rtcp::transport_feedbacks::transport_layer_nack::{NackPair, TransportLayerNack};
846 
847     use std::sync::Arc;
848     use std::time::{Duration, SystemTime};
849 
850     use crate::error::Result;
851     use crate::mock::mock_stream::MockStream;
852     use crate::stream_info::StreamInfo;
853 
854     use super::StatsInterceptor;
855 
856     #[tokio::test]
test_stats_interceptor_rtp() -> Result<()>857     async fn test_stats_interceptor_rtp() -> Result<()> {
858         let icpr: Arc<_> = Arc::new(StatsInterceptor::new("Hello".to_owned()));
859 
860         let recv_stream = MockStream::new(
861             &StreamInfo {
862                 ssrc: 123456,
863                 ..Default::default()
864             },
865             icpr.clone(),
866         )
867         .await;
868 
869         let send_stream = MockStream::new(
870             &StreamInfo {
871                 ssrc: 234567,
872                 ..Default::default()
873             },
874             icpr.clone(),
875         )
876         .await;
877 
878         recv_stream
879             .receive_rtp(rtp::packet::Packet {
880                 header: rtp::header::Header {
881                     ssrc: 123456,
882                     ..Default::default()
883                 },
884                 payload: Bytes::from_static(b"\xde\xad\xbe\xef"),
885             })
886             .await;
887 
888         let _ = recv_stream
889             .read_rtp()
890             .await
891             .expect("After calling receive_rtp read_rtp should return Some")?;
892 
893         let _ = send_stream
894             .write_rtp(&rtp::packet::Packet {
895                 header: rtp::header::Header {
896                     ssrc: 234567,
897                     ..Default::default()
898                 },
899                 payload: Bytes::from_static(b"\xde\xad\xbe\xef\xde\xad\xbe\xef"),
900             })
901             .await;
902 
903         let _ = send_stream
904             .write_rtp(&rtp::packet::Packet {
905                 header: rtp::header::Header {
906                     ssrc: 234567,
907                     ..Default::default()
908                 },
909                 payload: Bytes::from_static(&[0x13, 0x37]),
910             })
911             .await;
912 
913         let snapshots = icpr.fetch_inbound_stats(vec![123456]).await;
914         let recv_snapshot = snapshots[0]
915             .as_ref()
916             .expect("Stats should exist for ssrc: 123456");
917         assert_eq!(recv_snapshot.packets_received(), 1);
918         assert_eq!(recv_snapshot.header_bytes_received(), 12);
919         assert_eq!(recv_snapshot.payload_bytes_received(), 4);
920 
921         let snapshots = icpr.fetch_outbound_stats(vec![234567]).await;
922         let send_snapshot = snapshots[0]
923             .as_ref()
924             .expect("Stats should exist for ssrc: 234567");
925         assert_eq!(send_snapshot.packets_sent(), 2);
926         assert_eq!(send_snapshot.header_bytes_sent(), 24);
927         assert_eq!(send_snapshot.payload_bytes_sent(), 10);
928 
929         Ok(())
930     }
931 
932     #[tokio::test]
test_stats_interceptor_rtcp() -> Result<()>933     async fn test_stats_interceptor_rtcp() -> Result<()> {
934         let icpr: Arc<_> = Arc::new(StatsInterceptor::with_time_gen("Hello".to_owned(), || {
935             // 10 Nov 1995 11:33:36.5 UTC
936             SystemTime::UNIX_EPOCH + Duration::from_secs_f64(816003216.5)
937         }));
938 
939         let recv_stream = MockStream::new(
940             &StreamInfo {
941                 ssrc: 123456,
942                 ..Default::default()
943             },
944             icpr.clone(),
945         )
946         .await;
947 
948         let send_stream = MockStream::new(
949             &StreamInfo {
950                 ssrc: 234567,
951                 ..Default::default()
952             },
953             icpr.clone(),
954         )
955         .await;
956 
957         send_stream
958             .write_rtcp(&[Box::new(SenderReport {
959                 ssrc: 234567,
960                 reports: vec![
961                     ReceptionReport {
962                         ssrc: 234567,
963                         last_sequence_number: (5 << 16) | 10,
964                         ..Default::default()
965                     },
966                     ReceptionReport {
967                         ssrc: 234567,
968                         last_sequence_number: (5 << 16) | 85,
969                         ..Default::default()
970                     },
971                 ],
972                 ..Default::default()
973             })])
974             .await
975             .expect("Failed to write RTCP packets");
976 
977         send_stream
978             .receive_rtcp(vec![
979                 Box::new(ReceiverReport {
980                     reports: vec![
981                         ReceptionReport {
982                             ssrc: 234567,
983                             last_sequence_number: (5 << 16) | 64,
984                             total_lost: 5,
985                             ..Default::default()
986                         },
987                         ReceptionReport {
988                             ssrc: 234567,
989                             last_sender_report: 0xb705_2000,
990                             delay: 0x0005_4000,
991                             last_sequence_number: (5 << 16) | 70,
992                             total_lost: 8,
993                             fraction_lost: 32,
994                             jitter: 2250,
995                             ..Default::default()
996                         },
997                     ],
998                     ..Default::default()
999                 }),
1000                 Box::new(TransportLayerNack {
1001                     sender_ssrc: 0,
1002                     media_ssrc: 234567,
1003                     nacks: vec![NackPair {
1004                         packet_id: 5,
1005                         lost_packets: 0b0011_0110,
1006                     }],
1007                 }),
1008                 Box::new(TransportLayerNack {
1009                     sender_ssrc: 0,
1010                     // NB: Different SSRC
1011                     media_ssrc: 999999,
1012                     nacks: vec![NackPair {
1013                         packet_id: 5,
1014                         lost_packets: 0b0011_0110,
1015                     }],
1016                 }),
1017                 Box::new(PictureLossIndication {
1018                     sender_ssrc: 0,
1019                     media_ssrc: 234567,
1020                 }),
1021                 Box::new(PictureLossIndication {
1022                     sender_ssrc: 0,
1023                     media_ssrc: 234567,
1024                 }),
1025                 Box::new(FullIntraRequest {
1026                     sender_ssrc: 0,
1027                     media_ssrc: 234567,
1028                     fir: vec![
1029                         FirEntry {
1030                             ssrc: 234567,
1031                             sequence_number: 132,
1032                         },
1033                         FirEntry {
1034                             ssrc: 234567,
1035                             sequence_number: 135,
1036                         },
1037                     ],
1038                 }),
1039             ])
1040             .await;
1041         let snapshots = icpr.fetch_outbound_stats(vec![234567]).await;
1042         let send_snapshot = snapshots[0]
1043             .as_ref()
1044             .expect("Outbound Stats should exist for ssrc: 234567");
1045 
1046         assert!(
1047             send_snapshot.remote_round_trip_time().is_none()
1048                 && send_snapshot.remote_round_trip_time_measurements() == 0,
1049             "Before receiving the first RR we should not have a remote round trip time"
1050         );
1051         let _ = send_stream
1052             .read_rtcp()
1053             .await
1054             .expect("After calling `receive_rtcp`, `read_rtcp` should return some packets");
1055 
1056         recv_stream
1057             .write_rtcp(&[
1058                 Box::new(TransportLayerNack {
1059                     sender_ssrc: 0,
1060                     media_ssrc: 123456,
1061                     nacks: vec![NackPair {
1062                         packet_id: 5,
1063                         lost_packets: 0b0011_0111,
1064                     }],
1065                 }),
1066                 Box::new(TransportLayerNack {
1067                     sender_ssrc: 0,
1068                     // NB: Different SSRC
1069                     media_ssrc: 999999,
1070                     nacks: vec![NackPair {
1071                         packet_id: 5,
1072                         lost_packets: 0b1111_0110,
1073                     }],
1074                 }),
1075                 Box::new(PictureLossIndication {
1076                     sender_ssrc: 0,
1077                     media_ssrc: 123456,
1078                 }),
1079                 Box::new(PictureLossIndication {
1080                     sender_ssrc: 0,
1081                     media_ssrc: 123456,
1082                 }),
1083                 Box::new(PictureLossIndication {
1084                     sender_ssrc: 0,
1085                     media_ssrc: 123456,
1086                 }),
1087                 Box::new(FullIntraRequest {
1088                     sender_ssrc: 0,
1089                     media_ssrc: 123456,
1090                     fir: vec![FirEntry {
1091                         ssrc: 123456,
1092                         sequence_number: 132,
1093                     }],
1094                 }),
1095             ])
1096             .await
1097             .expect("Failed to write RTCP packets for recv_stream");
1098 
1099         recv_stream
1100             .receive_rtcp(vec![
1101                 Box::new(SenderReport {
1102                     ssrc: 123456,
1103                     ntp_time: 12345, // Used for ordering
1104                     packet_count: 52,
1105                     octet_count: 8172,
1106                     reports: vec![],
1107                     ..Default::default()
1108                 }),
1109                 Box::new(SenderReport {
1110                     ssrc: 123456,
1111                     ntp_time: 23456, // Used for ordering
1112                     packet_count: 82,
1113                     octet_count: 10351,
1114                     reports: vec![],
1115                     ..Default::default()
1116                 }),
1117                 Box::new(ExtendedReport {
1118                     sender_ssrc: 928191,
1119                     reports: vec![Box::new(DLRRReportBlock {
1120                         reports: vec![DLRRReport {
1121                             ssrc: 123456,
1122                             last_rr: 0xb705_2000,
1123                             dlrr: 0x0005_4000,
1124                         }],
1125                     })],
1126                 }),
1127                 Box::new(SenderReport {
1128                     /// NB: Different SSRC
1129                     ssrc: 9999999,
1130                     ntp_time: 99999, // Used for ordering
1131                     packet_count: 1231,
1132                     octet_count: 193812,
1133                     reports: vec![],
1134                     ..Default::default()
1135                 }),
1136             ])
1137             .await;
1138 
1139         let snapshots = icpr.fetch_inbound_stats(vec![123456]).await;
1140         let recv_snapshot = snapshots[0]
1141             .as_ref()
1142             .expect("Stats should exist for ssrc: 123456");
1143         assert!(
1144             recv_snapshot.remote_round_trip_time().is_none()
1145                 && recv_snapshot.remote_round_trip_time_measurements() == 0,
1146             "Before receiving the first SR/DLRR we should not have a remote round trip time"
1147         );
1148 
1149         let _ = recv_stream.read_rtcp().await.expect("read_rtcp failed");
1150 
1151         let snapshots = icpr.fetch_outbound_stats(vec![234567]).await;
1152         let send_snapshot = snapshots[0]
1153             .as_ref()
1154             .expect("Outbound Stats should exist for ssrc: 234567");
1155         let rtt_ms = send_snapshot.remote_round_trip_time().expect(
1156             "After receiving an RR with a DSLR block we should have a remote round trip time",
1157         );
1158         assert_feq!(rtt_ms, 6125.0);
1159 
1160         assert_eq!(send_snapshot.nacks_received(), 5);
1161         assert_eq!(send_snapshot.plis_received(), 2);
1162         assert_eq!(send_snapshot.firs_received(), 2);
1163         // Last Seq Num(RR)  - total lost(RR) - Initial Seq Num(SR) + 1
1164         // 70 - 8 - 10 + 1 = 53
1165         assert_eq!(send_snapshot.remote_packets_received(), 53);
1166         assert_feq!(
1167             send_snapshot
1168                 .remote_fraction_lost()
1169                 .expect("Should have a fraction lost values after receiving RR"),
1170             32.0 / 256.0
1171         );
1172         assert_eq!(send_snapshot.remote_total_lost(), 8);
1173         assert_eq!(send_snapshot.remote_jitter(), 2250);
1174 
1175         let snapshots = icpr.fetch_inbound_stats(vec![123456]).await;
1176         let recv_snapshot = snapshots[0]
1177             .as_ref()
1178             .expect("Stats should exist for ssrc: 123456");
1179         assert_eq!(recv_snapshot.nacks_sent(), 6);
1180         assert_eq!(recv_snapshot.plis_sent(), 3);
1181         assert_eq!(recv_snapshot.firs_sent(), 1);
1182         assert_eq!(recv_snapshot.remote_packets_sent(), 82);
1183         assert_eq!(recv_snapshot.remote_bytes_sent(), 10351);
1184         let rtt_ms = recv_snapshot
1185             .remote_round_trip_time()
1186             .expect("After reciving SR and DLRR we should have a round trip time ");
1187         assert_feq!(rtt_ms, 6125.0);
1188         assert_eq!(recv_snapshot.remote_reports_sent(), 2);
1189         assert_eq!(recv_snapshot.remote_round_trip_time_measurements(), 1);
1190         assert_feq!(recv_snapshot.remote_total_round_trip_time(), 6125.0);
1191 
1192         Ok(())
1193     }
1194 }
1195