1 use std::collections::HashMap;
2 use std::fmt;
3 use std::sync::Arc;
4 use std::time::SystemTime;
5
6 use super::{inbound, outbound, StatsContainer};
7 use async_trait::async_trait;
8 use rtcp::extended_report::{DLRRReportBlock, ExtendedReport};
9 use rtcp::payload_feedbacks::full_intra_request::FullIntraRequest;
10 use rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndication;
11 use rtcp::receiver_report::ReceiverReport;
12 use rtcp::sender_report::SenderReport;
13 use rtcp::transport_feedbacks::transport_layer_nack::TransportLayerNack;
14 use rtp::extension::abs_send_time_extension::unix2ntp;
15 use tokio::sync::{mpsc, oneshot};
16 use tokio::time::Duration;
17
18 use util::sync::Mutex;
19 use util::{MarshalSize, Unmarshal};
20
21 use crate::error::Result;
22 use crate::stream_info::StreamInfo;
23 use crate::{Attributes, Interceptor, RTCPReader, RTCPWriter, RTPReader, RTPWriter};
24
25 #[derive(Debug)]
26 enum Message {
27 StatUpdate {
28 ssrc: u32,
29 update: StatsUpdate,
30 },
31 RequestInboundSnapshot {
32 ssrcs: Vec<u32>,
33 chan: oneshot::Sender<Vec<Option<inbound::StatsSnapshot>>>,
34 },
35 RequestOutboundSnapshot {
36 ssrcs: Vec<u32>,
37 chan: oneshot::Sender<Vec<Option<outbound::StatsSnapshot>>>,
38 },
39 }
40
41 #[derive(Debug)]
42 enum StatsUpdate {
43 /// Stats collected on the receiving end(inbound) of an RTP stream.
44 InboundRTP {
45 packets: u64,
46 header_bytes: u64,
47 payload_bytes: u64,
48 last_packet_timestamp: SystemTime,
49 },
50 /// Stats collected on the sending end(outbound) of an RTP stream.
51 OutboundRTP {
52 packets: u64,
53 header_bytes: u64,
54 payload_bytes: u64,
55 last_packet_timestamp: SystemTime,
56 },
57 /// Stats collected from received RTCP packets.
58 InboundRTCP {
59 fir_count: Option<u64>,
60 pli_count: Option<u64>,
61 nack_count: Option<u64>,
62 },
63 /// Stats collected from sent RTCP packets.
64 OutboundRTCP {
65 fir_count: Option<u64>,
66 pli_count: Option<u64>,
67 nack_count: Option<u64>,
68 },
69 /// An extended sequence number sent in an SR.
70 OutboundSRExtSeqNum { seq_num: u32 },
71 /// Stats collected from received Receiver Reports i.e. where we have an outbound RTP stream.
72 InboundRecieverReport {
73 ext_seq_num: u32,
74 total_lost: u32,
75 jitter: u32,
76 rtt_ms: Option<f64>,
77 fraction_lost: u8,
78 },
79 /// Stats collected from recieved Sender Reports i.e. where we have an inbound RTP stream.
80 InboundSenderRerport {
81 packets_and_bytes_sent: Option<(u32, u32)>,
82 rtt_ms: Option<f64>,
83 },
84 }
85
86 pub struct StatsInterceptor {
87 // Wrapped RTP streams
88 recv_streams: Mutex<HashMap<u32, Arc<RTPReadRecorder>>>,
89 send_streams: Mutex<HashMap<u32, Arc<RTPWriteRecorder>>>,
90
91 tx: mpsc::Sender<Message>,
92
93 id: String,
94 now_gen: Arc<dyn Fn() -> SystemTime + Send + Sync>,
95 }
96
97 impl StatsInterceptor {
new(id: String) -> Self98 pub fn new(id: String) -> Self {
99 let (tx, rx) = mpsc::channel(100);
100
101 tokio::spawn(run_stats_reducer(rx));
102
103 Self {
104 id,
105 recv_streams: Default::default(),
106 send_streams: Default::default(),
107 tx,
108 now_gen: Arc::new(SystemTime::now),
109 }
110 }
111
with_time_gen<F>(id: String, now_gen: F) -> Self where F: Fn() -> SystemTime + Send + Sync + 'static,112 fn with_time_gen<F>(id: String, now_gen: F) -> Self
113 where
114 F: Fn() -> SystemTime + Send + Sync + 'static,
115 {
116 let (tx, rx) = mpsc::channel(100);
117 tokio::spawn(run_stats_reducer(rx));
118
119 Self {
120 id,
121 recv_streams: Default::default(),
122 send_streams: Default::default(),
123 tx,
124 now_gen: Arc::new(now_gen),
125 }
126 }
127
fetch_inbound_stats( &self, ssrcs: Vec<u32>, ) -> Vec<Option<inbound::StatsSnapshot>>128 pub async fn fetch_inbound_stats(
129 &self,
130 ssrcs: Vec<u32>,
131 ) -> Vec<Option<inbound::StatsSnapshot>> {
132 let (tx, rx) = oneshot::channel();
133
134 if let Err(e) = self
135 .tx
136 .send(Message::RequestInboundSnapshot { ssrcs, chan: tx })
137 .await
138 {
139 log::debug!(
140 "Failed to fetch inbound RTP stream stats from stats task with error: {}",
141 e
142 );
143
144 return vec![];
145 }
146
147 rx.await.unwrap_or_default()
148 }
149
fetch_outbound_stats( &self, ssrcs: Vec<u32>, ) -> Vec<Option<outbound::StatsSnapshot>>150 pub async fn fetch_outbound_stats(
151 &self,
152 ssrcs: Vec<u32>,
153 ) -> Vec<Option<outbound::StatsSnapshot>> {
154 let (tx, rx) = oneshot::channel();
155
156 if let Err(e) = self
157 .tx
158 .send(Message::RequestOutboundSnapshot { ssrcs, chan: tx })
159 .await
160 {
161 log::debug!(
162 "Failed to fetch outbound RTP stream stats from stats task with error: {}",
163 e
164 );
165
166 return vec![];
167 }
168
169 rx.await.unwrap_or_default()
170 }
171 }
172
run_stats_reducer(mut rx: mpsc::Receiver<Message>)173 async fn run_stats_reducer(mut rx: mpsc::Receiver<Message>) {
174 let mut ssrc_stats: StatsContainer = Default::default();
175 let mut cleanup_ticker = tokio::time::interval(Duration::from_secs(10));
176
177 loop {
178 tokio::select! {
179 maybe_msg = rx.recv() => {
180 let msg = match maybe_msg {
181 Some(m) => m,
182 None => break,
183 };
184
185 match msg {
186 Message::StatUpdate { ssrc, update } => {
187 handle_stats_update(&mut ssrc_stats, ssrc, update);
188 }
189 Message::RequestInboundSnapshot { ssrcs, chan} => {
190 let result = ssrcs
191 .into_iter()
192 .map(|ssrc| ssrc_stats.get_inbound_stats(ssrc).map(inbound::StreamStats::snapshot))
193 .collect();
194
195 let _ = chan.send(result);
196 }
197 Message::RequestOutboundSnapshot { ssrcs, chan} => {
198 let result = ssrcs
199 .into_iter()
200 .map(|ssrc| ssrc_stats.get_outbound_stats(ssrc).map(outbound::StreamStats::snapshot))
201 .collect();
202
203 let _ = chan.send(result);
204
205 }
206 }
207
208 }
209 _ = cleanup_ticker.tick() => {
210 ssrc_stats.remove_stale_entries();
211 }
212 }
213 }
214 }
215
handle_stats_update(ssrc_stats: &mut StatsContainer, ssrc: u32, update: StatsUpdate)216 fn handle_stats_update(ssrc_stats: &mut StatsContainer, ssrc: u32, update: StatsUpdate) {
217 match update {
218 StatsUpdate::InboundRTP {
219 packets,
220 header_bytes,
221 payload_bytes,
222 last_packet_timestamp,
223 } => {
224 let stats = ssrc_stats.get_or_create_inbound_stream_stats(ssrc);
225
226 stats
227 .rtp_stats
228 .update(header_bytes, payload_bytes, packets, last_packet_timestamp);
229 stats.mark_updated();
230 }
231 StatsUpdate::OutboundRTP {
232 packets,
233 header_bytes,
234 payload_bytes,
235 last_packet_timestamp,
236 } => {
237 let stats = ssrc_stats.get_or_create_outbound_stream_stats(ssrc);
238 stats
239 .rtp_stats
240 .update(header_bytes, payload_bytes, packets, last_packet_timestamp);
241 stats.mark_updated();
242 }
243 StatsUpdate::InboundRTCP {
244 fir_count,
245 pli_count,
246 nack_count,
247 } => {
248 let stats = ssrc_stats.get_or_create_outbound_stream_stats(ssrc);
249 stats.rtcp_stats.update(fir_count, pli_count, nack_count);
250 stats.mark_updated();
251 }
252 StatsUpdate::OutboundRTCP {
253 fir_count,
254 pli_count,
255 nack_count,
256 } => {
257 let stats = ssrc_stats.get_or_create_inbound_stream_stats(ssrc);
258 stats.rtcp_stats.update(fir_count, pli_count, nack_count);
259 stats.mark_updated();
260 }
261 StatsUpdate::OutboundSRExtSeqNum { seq_num } => {
262 let stats = ssrc_stats.get_or_create_outbound_stream_stats(ssrc);
263 stats.record_sr_ext_seq_num(seq_num);
264 stats.mark_updated();
265 }
266 StatsUpdate::InboundRecieverReport {
267 ext_seq_num,
268 total_lost,
269 jitter,
270 rtt_ms,
271 fraction_lost,
272 } => {
273 let stats = ssrc_stats.get_or_create_outbound_stream_stats(ssrc);
274 stats.record_remote_round_trip_time(rtt_ms);
275 stats.update_remote_fraction_lost(fraction_lost);
276 stats.update_remote_total_lost(total_lost);
277 stats.update_remote_inbound_packets_received(ext_seq_num, total_lost);
278 stats.update_remote_jitter(jitter);
279
280 stats.mark_updated();
281 }
282 StatsUpdate::InboundSenderRerport {
283 rtt_ms,
284 packets_and_bytes_sent,
285 } => {
286 // This is a sender report we received, as such it concerns an RTP stream that's
287 // outbound at the remote.
288 let stats = ssrc_stats.get_or_create_inbound_stream_stats(ssrc);
289
290 if let Some((packets_sent, bytes_sent)) = packets_and_bytes_sent {
291 stats.record_sender_report(packets_sent, bytes_sent);
292 }
293 stats.record_remote_round_trip_time(rtt_ms);
294
295 stats.mark_updated();
296 }
297 }
298 }
299
300 #[async_trait]
301 impl Interceptor for StatsInterceptor {
302 /// bind_remote_stream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
303 /// will be called once per rtp packet.
bind_remote_stream( &self, info: &StreamInfo, reader: Arc<dyn RTPReader + Send + Sync>, ) -> Arc<dyn RTPReader + Send + Sync>304 async fn bind_remote_stream(
305 &self,
306 info: &StreamInfo,
307 reader: Arc<dyn RTPReader + Send + Sync>,
308 ) -> Arc<dyn RTPReader + Send + Sync> {
309 let mut lock = self.recv_streams.lock();
310
311 let e = lock
312 .entry(info.ssrc)
313 .or_insert_with(|| Arc::new(RTPReadRecorder::new(reader, self.tx.clone())));
314
315 e.clone()
316 }
317
318 /// unbind_remote_stream is called when the Stream is removed. It can be used to clean up any data related to that track.
unbind_remote_stream(&self, info: &StreamInfo)319 async fn unbind_remote_stream(&self, info: &StreamInfo) {
320 let mut lock = self.recv_streams.lock();
321
322 lock.remove(&info.ssrc);
323 }
324
325 /// bind_local_stream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method
326 /// will be called once per rtp packet.
bind_local_stream( &self, info: &StreamInfo, writer: Arc<dyn RTPWriter + Send + Sync>, ) -> Arc<dyn RTPWriter + Send + Sync>327 async fn bind_local_stream(
328 &self,
329 info: &StreamInfo,
330 writer: Arc<dyn RTPWriter + Send + Sync>,
331 ) -> Arc<dyn RTPWriter + Send + Sync> {
332 let mut lock = self.send_streams.lock();
333
334 let e = lock
335 .entry(info.ssrc)
336 .or_insert_with(|| Arc::new(RTPWriteRecorder::new(writer, self.tx.clone())));
337
338 e.clone()
339 }
340
341 /// unbind_local_stream is called when the Stream is removed. It can be used to clean up any data related to that track.
unbind_local_stream(&self, info: &StreamInfo)342 async fn unbind_local_stream(&self, info: &StreamInfo) {
343 let mut lock = self.send_streams.lock();
344
345 lock.remove(&info.ssrc);
346 }
347
close(&self) -> Result<()>348 async fn close(&self) -> Result<()> {
349 Ok(())
350 }
351
352 /// bind_rtcp_writer lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
353 /// will be called once per packet batch.
bind_rtcp_writer( &self, writer: Arc<dyn RTCPWriter + Send + Sync>, ) -> Arc<dyn RTCPWriter + Send + Sync>354 async fn bind_rtcp_writer(
355 &self,
356 writer: Arc<dyn RTCPWriter + Send + Sync>,
357 ) -> Arc<dyn RTCPWriter + Send + Sync> {
358 let now = self.now_gen.clone();
359
360 Arc::new(RTCPWriteInterceptor {
361 rtcp_writer: writer,
362 tx: self.tx.clone(),
363 now_gen: move || now(),
364 })
365 }
366
367 /// bind_rtcp_reader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might
368 /// change in the future. The returned method will be called once per packet batch.
bind_rtcp_reader( &self, reader: Arc<dyn RTCPReader + Send + Sync>, ) -> Arc<dyn RTCPReader + Send + Sync>369 async fn bind_rtcp_reader(
370 &self,
371 reader: Arc<dyn RTCPReader + Send + Sync>,
372 ) -> Arc<dyn RTCPReader + Send + Sync> {
373 let now = self.now_gen.clone();
374
375 Arc::new(RTCPReadInterceptor {
376 rtcp_reader: reader,
377 tx: self.tx.clone(),
378 now_gen: move || now(),
379 })
380 }
381 }
382
383 pub struct RTCPReadInterceptor<F> {
384 rtcp_reader: Arc<dyn RTCPReader + Send + Sync>,
385 tx: mpsc::Sender<Message>,
386 now_gen: F,
387 }
388
389 #[async_trait]
390 impl<F> RTCPReader for RTCPReadInterceptor<F>
391 where
392 F: Fn() -> SystemTime + Send + Sync,
393 {
394 /// read a batch of rtcp packets
read(&self, buf: &mut [u8], attributes: &Attributes) -> Result<(usize, Attributes)>395 async fn read(&self, buf: &mut [u8], attributes: &Attributes) -> Result<(usize, Attributes)> {
396 let (n, attributes) = self.rtcp_reader.read(buf, attributes).await?;
397
398 let mut b = &buf[..n];
399 let pkts = rtcp::packet::unmarshal(&mut b)?;
400 // Middle 32 bits
401 let now = (unix2ntp((self.now_gen)()) >> 16) as u32;
402
403 #[derive(Default, Debug)]
404 struct GenericRTCP {
405 fir_count: Option<u64>,
406 pli_count: Option<u64>,
407 nack_count: Option<u64>,
408 }
409
410 #[derive(Default, Debug)]
411 struct ReceiverReportEntry {
412 /// Extended sequence number value from Receiver Report, used to calculate remote
413 /// stats.
414 ext_seq_num: u32,
415 /// Total loss value from Receiver Report, used to calculate remote
416 /// stats.
417 total_lost: u32,
418 /// Jitter from Receiver Report.
419 jitter: u32,
420 /// Round Trip Time calculated from Receiver Report.
421 rtt_ms: Option<f64>,
422 /// Fraction of packets lost.
423 fraction_lost: u8,
424 }
425
426 #[derive(Default, Debug)]
427 struct SenderReportEntry {
428 /// NTP timestamp(from Sender Report).
429 sr_ntp_time: Option<u64>,
430 /// Packets Sent(from Sender Report).
431 sr_packets_sent: Option<u32>,
432 /// Bytes Sent(from Sender Report).
433 sr_bytes_sent: Option<u32>,
434 /// Last RR timestamp(middle bits) from DLRR extended report block.
435 dlrr_last_rr: Option<u32>,
436 /// Delay since last RR from DLRR extended report block.
437 dlrr_delay_rr: Option<u32>,
438 }
439
440 #[derive(Default, Debug)]
441 struct Entry {
442 generic_rtcp: GenericRTCP,
443 receiver_reports: Vec<ReceiverReportEntry>,
444 sender_reports: Vec<SenderReportEntry>,
445 }
446 let updates = pkts
447 .iter()
448 .fold(HashMap::<u32, Entry>::new(), |mut acc, p| {
449 if let Some(rr) = p.as_any().downcast_ref::<ReceiverReport>() {
450 for recp in &rr.reports {
451 let e = acc.entry(recp.ssrc).or_default();
452
453 let rtt_ms = if recp.delay != 0 {
454 calculate_rtt_ms(now, recp.delay, recp.last_sender_report)
455 } else {
456 None
457 };
458
459 e.receiver_reports.push(ReceiverReportEntry {
460 ext_seq_num: recp.last_sequence_number,
461 total_lost: recp.total_lost,
462 jitter: recp.jitter,
463 rtt_ms,
464 fraction_lost: recp.fraction_lost,
465 });
466 }
467 } else if let Some(fir) = p.as_any().downcast_ref::<FullIntraRequest>() {
468 for fir_entry in &fir.fir {
469 let e = acc.entry(fir_entry.ssrc).or_default();
470 e.generic_rtcp.fir_count =
471 e.generic_rtcp.fir_count.map(|v| v + 1).or(Some(1));
472 }
473 } else if let Some(pli) = p.as_any().downcast_ref::<PictureLossIndication>() {
474 let e = acc.entry(pli.media_ssrc).or_default();
475 e.generic_rtcp.pli_count = e.generic_rtcp.pli_count.map(|v| v + 1).or(Some(1));
476 } else if let Some(nack) = p.as_any().downcast_ref::<TransportLayerNack>() {
477 let count = nack.nacks.iter().flat_map(|p| p.into_iter()).count() as u64;
478
479 let e = acc.entry(nack.media_ssrc).or_default();
480 e.generic_rtcp.nack_count =
481 e.generic_rtcp.nack_count.map(|v| v + count).or(Some(count));
482 } else if let Some(sr) = p.as_any().downcast_ref::<SenderReport>() {
483 let e = acc.entry(sr.ssrc).or_default();
484 let sr_e = {
485 let need_new_entry = e
486 .sender_reports
487 .last()
488 .map(|e| e.sr_packets_sent.is_some())
489 .unwrap_or(true);
490
491 if need_new_entry {
492 e.sender_reports.push(Default::default());
493 }
494
495 // SAFETY: Unrwap ok because we just added an entry above
496 e.sender_reports.last_mut().unwrap()
497 };
498
499 sr_e.sr_ntp_time = Some(sr.ntp_time);
500 sr_e.sr_packets_sent = Some(sr.packet_count);
501 sr_e.sr_bytes_sent = Some(sr.octet_count);
502 } else if let Some(xr) = p.as_any().downcast_ref::<ExtendedReport>() {
503 // Extended Report(XR)
504
505 // We only care about DLRR reports
506 let dlrrs = xr.reports.iter().flat_map(|report| {
507 let dlrr = report.as_any().downcast_ref::<DLRRReportBlock>();
508
509 dlrr.map(|b| b.reports.iter()).into_iter().flatten()
510 });
511
512 for dlrr in dlrrs {
513 let e = acc.entry(dlrr.ssrc).or_default();
514 let sr_e = {
515 let need_new_entry = e
516 .sender_reports
517 .last()
518 .map(|e| e.dlrr_last_rr.is_some())
519 .unwrap_or(true);
520
521 if need_new_entry {
522 e.sender_reports.push(Default::default());
523 }
524
525 // SAFETY: Unrwap ok because we just added an entry above
526 e.sender_reports.last_mut().unwrap()
527 };
528
529 sr_e.dlrr_last_rr = Some(dlrr.last_rr);
530 sr_e.dlrr_delay_rr = Some(dlrr.dlrr);
531 }
532 }
533
534 acc
535 });
536
537 for (
538 ssrc,
539 Entry {
540 generic_rtcp,
541 mut receiver_reports,
542 mut sender_reports,
543 },
544 ) in updates.into_iter()
545 {
546 // Sort RR by seq number low to high
547 receiver_reports.sort_by(|a, b| a.ext_seq_num.cmp(&b.ext_seq_num));
548 // Sort SR by ntp time, low to high
549 sender_reports
550 .sort_by(|a, b| a.sr_ntp_time.unwrap_or(0).cmp(&b.sr_ntp_time.unwrap_or(0)));
551
552 let _ = self
553 .tx
554 .send(Message::StatUpdate {
555 ssrc,
556 update: StatsUpdate::InboundRTCP {
557 fir_count: generic_rtcp.fir_count,
558 pli_count: generic_rtcp.pli_count,
559 nack_count: generic_rtcp.nack_count,
560 },
561 })
562 .await;
563
564 let futures = receiver_reports.into_iter().map(|rr| {
565 self.tx.send(Message::StatUpdate {
566 ssrc,
567 update: StatsUpdate::InboundRecieverReport {
568 ext_seq_num: rr.ext_seq_num,
569 total_lost: rr.total_lost,
570 jitter: rr.jitter,
571 rtt_ms: rr.rtt_ms,
572 fraction_lost: rr.fraction_lost,
573 },
574 })
575 });
576 for fut in futures {
577 // TODO: Use futures::join_all
578 let _ = fut.await;
579 }
580
581 let futures = sender_reports.into_iter().map(|sr| {
582 let rtt_ms = match (sr.dlrr_last_rr, sr.dlrr_delay_rr, sr.sr_packets_sent) {
583 (Some(last_rr), Some(delay_rr), Some(_)) if last_rr != 0 && delay_rr != 0 => {
584 calculate_rtt_ms(now, delay_rr, last_rr)
585 }
586 _ => None,
587 };
588
589 self.tx.send(Message::StatUpdate {
590 ssrc,
591 update: StatsUpdate::InboundSenderRerport {
592 packets_and_bytes_sent: sr
593 .sr_packets_sent
594 .and_then(|ps| sr.sr_bytes_sent.map(|bs| (ps, bs))),
595 rtt_ms,
596 },
597 })
598 });
599 for fut in futures {
600 // TODO: Use futures::join_all
601 let _ = fut.await;
602 }
603 }
604
605 Ok((n, attributes))
606 }
607 }
608
609 pub struct RTCPWriteInterceptor<F> {
610 rtcp_writer: Arc<dyn RTCPWriter + Send + Sync>,
611 tx: mpsc::Sender<Message>,
612 now_gen: F,
613 }
614
615 #[async_trait]
616 impl<F> RTCPWriter for RTCPWriteInterceptor<F>
617 where
618 F: Fn() -> SystemTime + Send + Sync,
619 {
write( &self, pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>], attributes: &Attributes, ) -> Result<usize>620 async fn write(
621 &self,
622 pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>],
623 attributes: &Attributes,
624 ) -> Result<usize> {
625 #[derive(Default, Debug)]
626 struct Entry {
627 fir_count: Option<u64>,
628 pli_count: Option<u64>,
629 nack_count: Option<u64>,
630 sr_ext_seq_num: Option<u32>,
631 }
632 let updates = pkts
633 .iter()
634 .fold(HashMap::<u32, Entry>::new(), |mut acc, p| {
635 if let Some(fir) = p.as_any().downcast_ref::<FullIntraRequest>() {
636 for fir_entry in &fir.fir {
637 let e = acc.entry(fir_entry.ssrc).or_default();
638 e.fir_count = e.fir_count.map(|v| v + 1).or(Some(1));
639 }
640 } else if let Some(pli) = p.as_any().downcast_ref::<PictureLossIndication>() {
641 let e = acc.entry(pli.media_ssrc).or_default();
642 e.pli_count = e.pli_count.map(|v| v + 1).or(Some(1));
643 } else if let Some(nack) = p.as_any().downcast_ref::<TransportLayerNack>() {
644 let count = nack.nacks.iter().flat_map(|p| p.into_iter()).count() as u64;
645
646 let e = acc.entry(nack.media_ssrc).or_default();
647 e.nack_count = e.nack_count.map(|v| v + count).or(Some(count));
648 } else if let Some(sr) = p.as_any().downcast_ref::<SenderReport>() {
649 for rep in &sr.reports {
650 let e = acc.entry(rep.ssrc).or_default();
651
652 match e.sr_ext_seq_num {
653 // We want the initial value for `last_sequence_number` from the first
654 // SR. It's possible that an RTCP batch contains more than one SR, in
655 // which case we should use the lowest value.
656 Some(seq_num) if seq_num > rep.last_sequence_number => {
657 e.sr_ext_seq_num = Some(rep.last_sequence_number)
658 }
659 None => e.sr_ext_seq_num = Some(rep.last_sequence_number),
660 _ => {}
661 }
662 }
663 }
664
665 acc
666 });
667
668 for (
669 ssrc,
670 Entry {
671 fir_count,
672 pli_count,
673 nack_count,
674 sr_ext_seq_num,
675 },
676 ) in updates.into_iter()
677 {
678 let _ = self
679 .tx
680 .send(Message::StatUpdate {
681 ssrc,
682 update: StatsUpdate::OutboundRTCP {
683 fir_count,
684 pli_count,
685 nack_count,
686 },
687 })
688 .await;
689
690 if let Some(seq_num) = sr_ext_seq_num {
691 let _ = self
692 .tx
693 .send(Message::StatUpdate {
694 ssrc,
695 update: StatsUpdate::OutboundSRExtSeqNum { seq_num },
696 })
697 .await;
698 }
699 }
700
701 self.rtcp_writer.write(pkts, attributes).await
702 }
703 }
704
705 pub struct RTPReadRecorder {
706 rtp_reader: Arc<dyn RTPReader + Send + Sync>,
707 tx: mpsc::Sender<Message>,
708 }
709
710 impl RTPReadRecorder {
new(rtp_reader: Arc<dyn RTPReader + Send + Sync>, tx: mpsc::Sender<Message>) -> Self711 fn new(rtp_reader: Arc<dyn RTPReader + Send + Sync>, tx: mpsc::Sender<Message>) -> Self {
712 Self { rtp_reader, tx }
713 }
714 }
715
716 impl fmt::Debug for RTPReadRecorder {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result717 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
718 f.debug_struct("RTPReadRecorder").finish()
719 }
720 }
721
722 #[async_trait]
723 impl RTPReader for RTPReadRecorder {
read(&self, buf: &mut [u8], attributes: &Attributes) -> Result<(usize, Attributes)>724 async fn read(&self, buf: &mut [u8], attributes: &Attributes) -> Result<(usize, Attributes)> {
725 let (bytes_read, attributes) = self.rtp_reader.read(buf, attributes).await?;
726 // TODO: This parsing happens redundantly in several interceptors, would be good if we
727 // could not do this.
728 let mut b = &buf[..bytes_read];
729 let packet = rtp::packet::Packet::unmarshal(&mut b)?;
730
731 let _ = self
732 .tx
733 .send(Message::StatUpdate {
734 ssrc: packet.header.ssrc,
735 update: StatsUpdate::InboundRTP {
736 packets: 1,
737 header_bytes: (bytes_read - packet.payload.len()) as u64,
738 payload_bytes: packet.payload.len() as u64,
739 last_packet_timestamp: SystemTime::now(),
740 },
741 })
742 .await;
743
744 Ok((bytes_read, attributes))
745 }
746 }
747
748 pub struct RTPWriteRecorder {
749 rtp_writer: Arc<dyn RTPWriter + Send + Sync>,
750 tx: mpsc::Sender<Message>,
751 }
752
753 impl RTPWriteRecorder {
new(rtp_writer: Arc<dyn RTPWriter + Send + Sync>, tx: mpsc::Sender<Message>) -> Self754 fn new(rtp_writer: Arc<dyn RTPWriter + Send + Sync>, tx: mpsc::Sender<Message>) -> Self {
755 Self { rtp_writer, tx }
756 }
757 }
758
759 impl fmt::Debug for RTPWriteRecorder {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result760 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
761 f.debug_struct("RTPWriteRecorder").finish()
762 }
763 }
764
765 #[async_trait]
766 impl RTPWriter for RTPWriteRecorder {
767 /// write a rtp packet
write(&self, pkt: &rtp::packet::Packet, attributes: &Attributes) -> Result<usize>768 async fn write(&self, pkt: &rtp::packet::Packet, attributes: &Attributes) -> Result<usize> {
769 let n = self.rtp_writer.write(pkt, attributes).await?;
770
771 let _ = self
772 .tx
773 .send(Message::StatUpdate {
774 ssrc: pkt.header.ssrc,
775 update: StatsUpdate::OutboundRTP {
776 packets: 1,
777 header_bytes: pkt.header.marshal_size() as u64,
778 payload_bytes: pkt.payload.len() as u64,
779 last_packet_timestamp: SystemTime::now(),
780 },
781 })
782 .await;
783
784 Ok(n)
785 }
786 }
787
788 /// Calculate the round trip time for a given peer as described in
789 /// [RFC3550 6.4.1](https://datatracker.ietf.org/doc/html/rfc3550#section-6.4.1).
790 ///
791 /// ## Params
792 ///
793 /// - `now` the current middle 32 bits of an NTP timestamp for the current time.
794 /// - `delay` the delay(`DLSR`) since last sender report expressed as fractions of a second in 32 bits.
795 /// - `last_report` the middle 32 bits of an NTP timestamp for the most recent sender report(LSR) or Receiver Report(LRR).
calculate_rtt_ms(now: u32, delay: u32, last_report: u32) -> Option<f64>796 fn calculate_rtt_ms(now: u32, delay: u32, last_report: u32) -> Option<f64> {
797 // [10 Nov 1995 11:33:25.125 UTC] [10 Nov 1995 11:33:36.5 UTC]
798 // n SR(n) A=b710:8000 (46864.500 s)
799 // ---------------------------------------------------------------->
800 // v ^
801 // ntp_sec =0xb44db705 v ^ dlsr=0x0005:4000 ( 5.250s)
802 // ntp_frac=0x20000000 v ^ lsr =0xb705:2000 (46853.125s)
803 // (3024992005.125 s) v ^
804 // r v ^ RR(n)
805 // ---------------------------------------------------------------->
806 // |<-DLSR->|
807 // (5.250 s)
808 //
809 // A 0xb710:8000 (46864.500 s)
810 // DLSR -0x0005:4000 ( 5.250 s)
811 // LSR -0xb705:2000 (46853.125 s)
812 // -------------------------------
813 // delay 0x0006:2000 ( 6.125 s)
814
815 let rtt = now.checked_sub(delay)?.checked_sub(last_report)?;
816 let rtt_seconds = rtt >> 16;
817 let rtt_fraction = (rtt & (u16::MAX as u32)) as f64 / (u16::MAX as u32) as f64;
818
819 Some(rtt_seconds as f64 * 1000.0 + rtt_fraction * 1000.0)
820 }
821
822 #[cfg(test)]
823 mod test {
824 // Silence warning on `..Default::default()` with no effect:
825 #![allow(clippy::needless_update)]
826
827 macro_rules! assert_feq {
828 ($left: expr, $right: expr) => {
829 assert_feq!($left, $right, 0.01);
830 };
831 ($left: expr, $right: expr, $eps: expr) => {
832 if ($left - $right).abs() >= $eps {
833 panic!("{:?} was not within {:?} of {:?}", $left, $eps, $right);
834 }
835 };
836 }
837
838 use bytes::Bytes;
839 use rtcp::extended_report::{DLRRReport, DLRRReportBlock, ExtendedReport};
840 use rtcp::payload_feedbacks::full_intra_request::{FirEntry, FullIntraRequest};
841 use rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndication;
842 use rtcp::receiver_report::ReceiverReport;
843 use rtcp::reception_report::ReceptionReport;
844 use rtcp::sender_report::SenderReport;
845 use rtcp::transport_feedbacks::transport_layer_nack::{NackPair, TransportLayerNack};
846
847 use std::sync::Arc;
848 use std::time::{Duration, SystemTime};
849
850 use crate::error::Result;
851 use crate::mock::mock_stream::MockStream;
852 use crate::stream_info::StreamInfo;
853
854 use super::StatsInterceptor;
855
856 #[tokio::test]
test_stats_interceptor_rtp() -> Result<()>857 async fn test_stats_interceptor_rtp() -> Result<()> {
858 let icpr: Arc<_> = Arc::new(StatsInterceptor::new("Hello".to_owned()));
859
860 let recv_stream = MockStream::new(
861 &StreamInfo {
862 ssrc: 123456,
863 ..Default::default()
864 },
865 icpr.clone(),
866 )
867 .await;
868
869 let send_stream = MockStream::new(
870 &StreamInfo {
871 ssrc: 234567,
872 ..Default::default()
873 },
874 icpr.clone(),
875 )
876 .await;
877
878 recv_stream
879 .receive_rtp(rtp::packet::Packet {
880 header: rtp::header::Header {
881 ssrc: 123456,
882 ..Default::default()
883 },
884 payload: Bytes::from_static(b"\xde\xad\xbe\xef"),
885 })
886 .await;
887
888 let _ = recv_stream
889 .read_rtp()
890 .await
891 .expect("After calling receive_rtp read_rtp should return Some")?;
892
893 let _ = send_stream
894 .write_rtp(&rtp::packet::Packet {
895 header: rtp::header::Header {
896 ssrc: 234567,
897 ..Default::default()
898 },
899 payload: Bytes::from_static(b"\xde\xad\xbe\xef\xde\xad\xbe\xef"),
900 })
901 .await;
902
903 let _ = send_stream
904 .write_rtp(&rtp::packet::Packet {
905 header: rtp::header::Header {
906 ssrc: 234567,
907 ..Default::default()
908 },
909 payload: Bytes::from_static(&[0x13, 0x37]),
910 })
911 .await;
912
913 let snapshots = icpr.fetch_inbound_stats(vec![123456]).await;
914 let recv_snapshot = snapshots[0]
915 .as_ref()
916 .expect("Stats should exist for ssrc: 123456");
917 assert_eq!(recv_snapshot.packets_received(), 1);
918 assert_eq!(recv_snapshot.header_bytes_received(), 12);
919 assert_eq!(recv_snapshot.payload_bytes_received(), 4);
920
921 let snapshots = icpr.fetch_outbound_stats(vec![234567]).await;
922 let send_snapshot = snapshots[0]
923 .as_ref()
924 .expect("Stats should exist for ssrc: 234567");
925 assert_eq!(send_snapshot.packets_sent(), 2);
926 assert_eq!(send_snapshot.header_bytes_sent(), 24);
927 assert_eq!(send_snapshot.payload_bytes_sent(), 10);
928
929 Ok(())
930 }
931
932 #[tokio::test]
test_stats_interceptor_rtcp() -> Result<()>933 async fn test_stats_interceptor_rtcp() -> Result<()> {
934 let icpr: Arc<_> = Arc::new(StatsInterceptor::with_time_gen("Hello".to_owned(), || {
935 // 10 Nov 1995 11:33:36.5 UTC
936 SystemTime::UNIX_EPOCH + Duration::from_secs_f64(816003216.5)
937 }));
938
939 let recv_stream = MockStream::new(
940 &StreamInfo {
941 ssrc: 123456,
942 ..Default::default()
943 },
944 icpr.clone(),
945 )
946 .await;
947
948 let send_stream = MockStream::new(
949 &StreamInfo {
950 ssrc: 234567,
951 ..Default::default()
952 },
953 icpr.clone(),
954 )
955 .await;
956
957 send_stream
958 .write_rtcp(&[Box::new(SenderReport {
959 ssrc: 234567,
960 reports: vec![
961 ReceptionReport {
962 ssrc: 234567,
963 last_sequence_number: (5 << 16) | 10,
964 ..Default::default()
965 },
966 ReceptionReport {
967 ssrc: 234567,
968 last_sequence_number: (5 << 16) | 85,
969 ..Default::default()
970 },
971 ],
972 ..Default::default()
973 })])
974 .await
975 .expect("Failed to write RTCP packets");
976
977 send_stream
978 .receive_rtcp(vec![
979 Box::new(ReceiverReport {
980 reports: vec![
981 ReceptionReport {
982 ssrc: 234567,
983 last_sequence_number: (5 << 16) | 64,
984 total_lost: 5,
985 ..Default::default()
986 },
987 ReceptionReport {
988 ssrc: 234567,
989 last_sender_report: 0xb705_2000,
990 delay: 0x0005_4000,
991 last_sequence_number: (5 << 16) | 70,
992 total_lost: 8,
993 fraction_lost: 32,
994 jitter: 2250,
995 ..Default::default()
996 },
997 ],
998 ..Default::default()
999 }),
1000 Box::new(TransportLayerNack {
1001 sender_ssrc: 0,
1002 media_ssrc: 234567,
1003 nacks: vec![NackPair {
1004 packet_id: 5,
1005 lost_packets: 0b0011_0110,
1006 }],
1007 }),
1008 Box::new(TransportLayerNack {
1009 sender_ssrc: 0,
1010 // NB: Different SSRC
1011 media_ssrc: 999999,
1012 nacks: vec![NackPair {
1013 packet_id: 5,
1014 lost_packets: 0b0011_0110,
1015 }],
1016 }),
1017 Box::new(PictureLossIndication {
1018 sender_ssrc: 0,
1019 media_ssrc: 234567,
1020 }),
1021 Box::new(PictureLossIndication {
1022 sender_ssrc: 0,
1023 media_ssrc: 234567,
1024 }),
1025 Box::new(FullIntraRequest {
1026 sender_ssrc: 0,
1027 media_ssrc: 234567,
1028 fir: vec![
1029 FirEntry {
1030 ssrc: 234567,
1031 sequence_number: 132,
1032 },
1033 FirEntry {
1034 ssrc: 234567,
1035 sequence_number: 135,
1036 },
1037 ],
1038 }),
1039 ])
1040 .await;
1041 let snapshots = icpr.fetch_outbound_stats(vec![234567]).await;
1042 let send_snapshot = snapshots[0]
1043 .as_ref()
1044 .expect("Outbound Stats should exist for ssrc: 234567");
1045
1046 assert!(
1047 send_snapshot.remote_round_trip_time().is_none()
1048 && send_snapshot.remote_round_trip_time_measurements() == 0,
1049 "Before receiving the first RR we should not have a remote round trip time"
1050 );
1051 let _ = send_stream
1052 .read_rtcp()
1053 .await
1054 .expect("After calling `receive_rtcp`, `read_rtcp` should return some packets");
1055
1056 recv_stream
1057 .write_rtcp(&[
1058 Box::new(TransportLayerNack {
1059 sender_ssrc: 0,
1060 media_ssrc: 123456,
1061 nacks: vec![NackPair {
1062 packet_id: 5,
1063 lost_packets: 0b0011_0111,
1064 }],
1065 }),
1066 Box::new(TransportLayerNack {
1067 sender_ssrc: 0,
1068 // NB: Different SSRC
1069 media_ssrc: 999999,
1070 nacks: vec![NackPair {
1071 packet_id: 5,
1072 lost_packets: 0b1111_0110,
1073 }],
1074 }),
1075 Box::new(PictureLossIndication {
1076 sender_ssrc: 0,
1077 media_ssrc: 123456,
1078 }),
1079 Box::new(PictureLossIndication {
1080 sender_ssrc: 0,
1081 media_ssrc: 123456,
1082 }),
1083 Box::new(PictureLossIndication {
1084 sender_ssrc: 0,
1085 media_ssrc: 123456,
1086 }),
1087 Box::new(FullIntraRequest {
1088 sender_ssrc: 0,
1089 media_ssrc: 123456,
1090 fir: vec![FirEntry {
1091 ssrc: 123456,
1092 sequence_number: 132,
1093 }],
1094 }),
1095 ])
1096 .await
1097 .expect("Failed to write RTCP packets for recv_stream");
1098
1099 recv_stream
1100 .receive_rtcp(vec![
1101 Box::new(SenderReport {
1102 ssrc: 123456,
1103 ntp_time: 12345, // Used for ordering
1104 packet_count: 52,
1105 octet_count: 8172,
1106 reports: vec![],
1107 ..Default::default()
1108 }),
1109 Box::new(SenderReport {
1110 ssrc: 123456,
1111 ntp_time: 23456, // Used for ordering
1112 packet_count: 82,
1113 octet_count: 10351,
1114 reports: vec![],
1115 ..Default::default()
1116 }),
1117 Box::new(ExtendedReport {
1118 sender_ssrc: 928191,
1119 reports: vec![Box::new(DLRRReportBlock {
1120 reports: vec![DLRRReport {
1121 ssrc: 123456,
1122 last_rr: 0xb705_2000,
1123 dlrr: 0x0005_4000,
1124 }],
1125 })],
1126 }),
1127 Box::new(SenderReport {
1128 /// NB: Different SSRC
1129 ssrc: 9999999,
1130 ntp_time: 99999, // Used for ordering
1131 packet_count: 1231,
1132 octet_count: 193812,
1133 reports: vec![],
1134 ..Default::default()
1135 }),
1136 ])
1137 .await;
1138
1139 let snapshots = icpr.fetch_inbound_stats(vec![123456]).await;
1140 let recv_snapshot = snapshots[0]
1141 .as_ref()
1142 .expect("Stats should exist for ssrc: 123456");
1143 assert!(
1144 recv_snapshot.remote_round_trip_time().is_none()
1145 && recv_snapshot.remote_round_trip_time_measurements() == 0,
1146 "Before receiving the first SR/DLRR we should not have a remote round trip time"
1147 );
1148
1149 let _ = recv_stream.read_rtcp().await.expect("read_rtcp failed");
1150
1151 let snapshots = icpr.fetch_outbound_stats(vec![234567]).await;
1152 let send_snapshot = snapshots[0]
1153 .as_ref()
1154 .expect("Outbound Stats should exist for ssrc: 234567");
1155 let rtt_ms = send_snapshot.remote_round_trip_time().expect(
1156 "After receiving an RR with a DSLR block we should have a remote round trip time",
1157 );
1158 assert_feq!(rtt_ms, 6125.0);
1159
1160 assert_eq!(send_snapshot.nacks_received(), 5);
1161 assert_eq!(send_snapshot.plis_received(), 2);
1162 assert_eq!(send_snapshot.firs_received(), 2);
1163 // Last Seq Num(RR) - total lost(RR) - Initial Seq Num(SR) + 1
1164 // 70 - 8 - 10 + 1 = 53
1165 assert_eq!(send_snapshot.remote_packets_received(), 53);
1166 assert_feq!(
1167 send_snapshot
1168 .remote_fraction_lost()
1169 .expect("Should have a fraction lost values after receiving RR"),
1170 32.0 / 256.0
1171 );
1172 assert_eq!(send_snapshot.remote_total_lost(), 8);
1173 assert_eq!(send_snapshot.remote_jitter(), 2250);
1174
1175 let snapshots = icpr.fetch_inbound_stats(vec![123456]).await;
1176 let recv_snapshot = snapshots[0]
1177 .as_ref()
1178 .expect("Stats should exist for ssrc: 123456");
1179 assert_eq!(recv_snapshot.nacks_sent(), 6);
1180 assert_eq!(recv_snapshot.plis_sent(), 3);
1181 assert_eq!(recv_snapshot.firs_sent(), 1);
1182 assert_eq!(recv_snapshot.remote_packets_sent(), 82);
1183 assert_eq!(recv_snapshot.remote_bytes_sent(), 10351);
1184 let rtt_ms = recv_snapshot
1185 .remote_round_trip_time()
1186 .expect("After reciving SR and DLRR we should have a round trip time ");
1187 assert_feq!(rtt_ms, 6125.0);
1188 assert_eq!(recv_snapshot.remote_reports_sent(), 2);
1189 assert_eq!(recv_snapshot.remote_round_trip_time_measurements(), 1);
1190 assert_feq!(recv_snapshot.remote_total_round_trip_time(), 6125.0);
1191
1192 Ok(())
1193 }
1194 }
1195