1 use std::collections::HashMap; 2 use std::fmt; 3 use std::sync::Arc; 4 use std::time::SystemTime; 5 6 use super::{inbound, outbound, StatsContainer}; 7 use async_trait::async_trait; 8 use rtcp::extended_report::{DLRRReportBlock, ExtendedReport}; 9 use rtcp::payload_feedbacks::full_intra_request::FullIntraRequest; 10 use rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndication; 11 use rtcp::receiver_report::ReceiverReport; 12 use rtcp::sender_report::SenderReport; 13 use rtcp::transport_feedbacks::transport_layer_nack::TransportLayerNack; 14 use rtp::extension::abs_send_time_extension::unix2ntp; 15 use tokio::sync::{mpsc, oneshot}; 16 use tokio::time::Duration; 17 18 use util::sync::Mutex; 19 use util::{MarshalSize, Unmarshal}; 20 21 use crate::error::Result; 22 use crate::stream_info::StreamInfo; 23 use crate::{Attributes, Interceptor, RTCPReader, RTCPWriter, RTPReader, RTPWriter}; 24 25 #[derive(Debug)] 26 enum Message { 27 StatUpdate { 28 ssrc: u32, 29 update: StatsUpdate, 30 }, 31 RequestInboundSnapshot { 32 ssrcs: Vec<u32>, 33 chan: oneshot::Sender<Vec<Option<inbound::StatsSnapshot>>>, 34 }, 35 RequestOutboundSnapshot { 36 ssrcs: Vec<u32>, 37 chan: oneshot::Sender<Vec<Option<outbound::StatsSnapshot>>>, 38 }, 39 } 40 41 #[derive(Debug)] 42 enum StatsUpdate { 43 /// Stats collected on the receiving end(inbound) of an RTP stream. 44 InboundRTP { 45 packets: u64, 46 header_bytes: u64, 47 payload_bytes: u64, 48 last_packet_timestamp: SystemTime, 49 }, 50 /// Stats collected on the sending end(outbound) of an RTP stream. 51 OutboundRTP { 52 packets: u64, 53 header_bytes: u64, 54 payload_bytes: u64, 55 last_packet_timestamp: SystemTime, 56 }, 57 /// Stats collected from received RTCP packets. 58 InboundRTCP { 59 fir_count: Option<u64>, 60 pli_count: Option<u64>, 61 nack_count: Option<u64>, 62 }, 63 /// Stats collected from sent RTCP packets. 64 OutboundRTCP { 65 fir_count: Option<u64>, 66 pli_count: Option<u64>, 67 nack_count: Option<u64>, 68 }, 69 /// An extended sequence number sent in an SR. 70 OutboundSRExtSeqNum { seq_num: u32 }, 71 /// Stats collected from received Receiver Reports i.e. where we have an outbound RTP stream. 72 InboundRecieverReport { 73 ext_seq_num: u32, 74 total_lost: u32, 75 jitter: u32, 76 rtt_ms: Option<f64>, 77 fraction_lost: u8, 78 }, 79 /// Stats collected from recieved Sender Reports i.e. where we have an inbound RTP stream. 80 InboundSenderRerport { 81 packets_and_bytes_sent: Option<(u32, u32)>, 82 rtt_ms: Option<f64>, 83 }, 84 } 85 86 pub struct StatsInterceptor { 87 // Wrapped RTP streams 88 recv_streams: Mutex<HashMap<u32, Arc<RTPReadRecorder>>>, 89 send_streams: Mutex<HashMap<u32, Arc<RTPWriteRecorder>>>, 90 91 tx: mpsc::Sender<Message>, 92 93 id: String, 94 now_gen: Arc<dyn Fn() -> SystemTime + Send + Sync>, 95 } 96 97 impl StatsInterceptor { 98 pub fn new(id: String) -> Self { 99 let (tx, rx) = mpsc::channel(100); 100 101 tokio::spawn(run_stats_reducer(rx)); 102 103 Self { 104 id, 105 recv_streams: Default::default(), 106 send_streams: Default::default(), 107 tx, 108 now_gen: Arc::new(SystemTime::now), 109 } 110 } 111 112 fn with_time_gen<F>(id: String, now_gen: F) -> Self 113 where 114 F: Fn() -> SystemTime + Send + Sync + 'static, 115 { 116 let (tx, rx) = mpsc::channel(100); 117 tokio::spawn(run_stats_reducer(rx)); 118 119 Self { 120 id, 121 recv_streams: Default::default(), 122 send_streams: Default::default(), 123 tx, 124 now_gen: Arc::new(now_gen), 125 } 126 } 127 128 pub async fn fetch_inbound_stats( 129 &self, 130 ssrcs: Vec<u32>, 131 ) -> Vec<Option<inbound::StatsSnapshot>> { 132 let (tx, rx) = oneshot::channel(); 133 134 if let Err(e) = self 135 .tx 136 .send(Message::RequestInboundSnapshot { ssrcs, chan: tx }) 137 .await 138 { 139 log::debug!( 140 "Failed to fetch inbound RTP stream stats from stats task with error: {}", 141 e 142 ); 143 144 return vec![]; 145 } 146 147 rx.await.unwrap_or_default() 148 } 149 150 pub async fn fetch_outbound_stats( 151 &self, 152 ssrcs: Vec<u32>, 153 ) -> Vec<Option<outbound::StatsSnapshot>> { 154 let (tx, rx) = oneshot::channel(); 155 156 if let Err(e) = self 157 .tx 158 .send(Message::RequestOutboundSnapshot { ssrcs, chan: tx }) 159 .await 160 { 161 log::debug!( 162 "Failed to fetch outbound RTP stream stats from stats task with error: {}", 163 e 164 ); 165 166 return vec![]; 167 } 168 169 rx.await.unwrap_or_default() 170 } 171 } 172 173 async fn run_stats_reducer(mut rx: mpsc::Receiver<Message>) { 174 let mut ssrc_stats: StatsContainer = Default::default(); 175 let mut cleanup_ticker = tokio::time::interval(Duration::from_secs(10)); 176 177 loop { 178 tokio::select! { 179 maybe_msg = rx.recv() => { 180 let msg = match maybe_msg { 181 Some(m) => m, 182 None => break, 183 }; 184 185 match msg { 186 Message::StatUpdate { ssrc, update } => { 187 handle_stats_update(&mut ssrc_stats, ssrc, update); 188 } 189 Message::RequestInboundSnapshot { ssrcs, chan} => { 190 let result = ssrcs 191 .into_iter() 192 .map(|ssrc| ssrc_stats.get_inbound_stats(ssrc).map(inbound::StreamStats::snapshot)) 193 .collect(); 194 195 let _ = chan.send(result); 196 } 197 Message::RequestOutboundSnapshot { ssrcs, chan} => { 198 let result = ssrcs 199 .into_iter() 200 .map(|ssrc| ssrc_stats.get_outbound_stats(ssrc).map(outbound::StreamStats::snapshot)) 201 .collect(); 202 203 let _ = chan.send(result); 204 205 } 206 } 207 208 } 209 _ = cleanup_ticker.tick() => { 210 ssrc_stats.remove_stale_entries(); 211 } 212 } 213 } 214 } 215 216 fn handle_stats_update(ssrc_stats: &mut StatsContainer, ssrc: u32, update: StatsUpdate) { 217 match update { 218 StatsUpdate::InboundRTP { 219 packets, 220 header_bytes, 221 payload_bytes, 222 last_packet_timestamp, 223 } => { 224 let stats = ssrc_stats.get_or_create_inbound_stream_stats(ssrc); 225 226 stats 227 .rtp_stats 228 .update(header_bytes, payload_bytes, packets, last_packet_timestamp); 229 stats.mark_updated(); 230 } 231 StatsUpdate::OutboundRTP { 232 packets, 233 header_bytes, 234 payload_bytes, 235 last_packet_timestamp, 236 } => { 237 let stats = ssrc_stats.get_or_create_outbound_stream_stats(ssrc); 238 stats 239 .rtp_stats 240 .update(header_bytes, payload_bytes, packets, last_packet_timestamp); 241 stats.mark_updated(); 242 } 243 StatsUpdate::InboundRTCP { 244 fir_count, 245 pli_count, 246 nack_count, 247 } => { 248 let stats = ssrc_stats.get_or_create_outbound_stream_stats(ssrc); 249 stats.rtcp_stats.update(fir_count, pli_count, nack_count); 250 stats.mark_updated(); 251 } 252 StatsUpdate::OutboundRTCP { 253 fir_count, 254 pli_count, 255 nack_count, 256 } => { 257 let stats = ssrc_stats.get_or_create_inbound_stream_stats(ssrc); 258 stats.rtcp_stats.update(fir_count, pli_count, nack_count); 259 stats.mark_updated(); 260 } 261 StatsUpdate::OutboundSRExtSeqNum { seq_num } => { 262 let stats = ssrc_stats.get_or_create_outbound_stream_stats(ssrc); 263 stats.record_sr_ext_seq_num(seq_num); 264 stats.mark_updated(); 265 } 266 StatsUpdate::InboundRecieverReport { 267 ext_seq_num, 268 total_lost, 269 jitter, 270 rtt_ms, 271 fraction_lost, 272 } => { 273 let stats = ssrc_stats.get_or_create_outbound_stream_stats(ssrc); 274 stats.record_remote_round_trip_time(rtt_ms); 275 stats.update_remote_fraction_lost(fraction_lost); 276 stats.update_remote_total_lost(total_lost); 277 stats.update_remote_inbound_packets_received(ext_seq_num, total_lost); 278 stats.update_remote_jitter(jitter); 279 280 stats.mark_updated(); 281 } 282 StatsUpdate::InboundSenderRerport { 283 rtt_ms, 284 packets_and_bytes_sent, 285 } => { 286 // This is a sender report we received, as such it concerns an RTP stream that's 287 // outbound at the remote. 288 let stats = ssrc_stats.get_or_create_inbound_stream_stats(ssrc); 289 290 if let Some((packets_sent, bytes_sent)) = packets_and_bytes_sent { 291 stats.record_sender_report(packets_sent, bytes_sent); 292 } 293 stats.record_remote_round_trip_time(rtt_ms); 294 295 stats.mark_updated(); 296 } 297 } 298 } 299 300 #[async_trait] 301 impl Interceptor for StatsInterceptor { 302 /// bind_remote_stream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method 303 /// will be called once per rtp packet. 304 async fn bind_remote_stream( 305 &self, 306 info: &StreamInfo, 307 reader: Arc<dyn RTPReader + Send + Sync>, 308 ) -> Arc<dyn RTPReader + Send + Sync> { 309 let mut lock = self.recv_streams.lock(); 310 311 let e = lock 312 .entry(info.ssrc) 313 .or_insert_with(|| Arc::new(RTPReadRecorder::new(reader, self.tx.clone()))); 314 315 e.clone() 316 } 317 318 /// unbind_remote_stream is called when the Stream is removed. It can be used to clean up any data related to that track. 319 async fn unbind_remote_stream(&self, info: &StreamInfo) { 320 let mut lock = self.recv_streams.lock(); 321 322 lock.remove(&info.ssrc); 323 } 324 325 /// bind_local_stream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method 326 /// will be called once per rtp packet. 327 async fn bind_local_stream( 328 &self, 329 info: &StreamInfo, 330 writer: Arc<dyn RTPWriter + Send + Sync>, 331 ) -> Arc<dyn RTPWriter + Send + Sync> { 332 let mut lock = self.send_streams.lock(); 333 334 let e = lock 335 .entry(info.ssrc) 336 .or_insert_with(|| Arc::new(RTPWriteRecorder::new(writer, self.tx.clone()))); 337 338 e.clone() 339 } 340 341 /// unbind_local_stream is called when the Stream is removed. It can be used to clean up any data related to that track. 342 async fn unbind_local_stream(&self, info: &StreamInfo) { 343 let mut lock = self.send_streams.lock(); 344 345 lock.remove(&info.ssrc); 346 } 347 348 async fn close(&self) -> Result<()> { 349 Ok(()) 350 } 351 352 /// bind_rtcp_writer lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method 353 /// will be called once per packet batch. 354 async fn bind_rtcp_writer( 355 &self, 356 writer: Arc<dyn RTCPWriter + Send + Sync>, 357 ) -> Arc<dyn RTCPWriter + Send + Sync> { 358 let now = self.now_gen.clone(); 359 360 Arc::new(RTCPWriteInterceptor { 361 rtcp_writer: writer, 362 tx: self.tx.clone(), 363 now_gen: move || now(), 364 }) 365 } 366 367 /// bind_rtcp_reader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might 368 /// change in the future. The returned method will be called once per packet batch. 369 async fn bind_rtcp_reader( 370 &self, 371 reader: Arc<dyn RTCPReader + Send + Sync>, 372 ) -> Arc<dyn RTCPReader + Send + Sync> { 373 let now = self.now_gen.clone(); 374 375 Arc::new(RTCPReadInterceptor { 376 rtcp_reader: reader, 377 tx: self.tx.clone(), 378 now_gen: move || now(), 379 }) 380 } 381 } 382 383 pub struct RTCPReadInterceptor<F> { 384 rtcp_reader: Arc<dyn RTCPReader + Send + Sync>, 385 tx: mpsc::Sender<Message>, 386 now_gen: F, 387 } 388 389 #[async_trait] 390 impl<F> RTCPReader for RTCPReadInterceptor<F> 391 where 392 F: Fn() -> SystemTime + Send + Sync, 393 { 394 /// read a batch of rtcp packets 395 async fn read(&self, buf: &mut [u8], attributes: &Attributes) -> Result<(usize, Attributes)> { 396 let (n, attributes) = self.rtcp_reader.read(buf, attributes).await?; 397 398 let mut b = &buf[..n]; 399 let pkts = rtcp::packet::unmarshal(&mut b)?; 400 // Middle 32 bits 401 let now = (unix2ntp((self.now_gen)()) >> 16) as u32; 402 403 #[derive(Default, Debug)] 404 struct GenericRTCP { 405 fir_count: Option<u64>, 406 pli_count: Option<u64>, 407 nack_count: Option<u64>, 408 } 409 410 #[derive(Default, Debug)] 411 struct ReceiverReportEntry { 412 /// Extended sequence number value from Receiver Report, used to calculate remote 413 /// stats. 414 ext_seq_num: u32, 415 /// Total loss value from Receiver Report, used to calculate remote 416 /// stats. 417 total_lost: u32, 418 /// Jitter from Receiver Report. 419 jitter: u32, 420 /// Round Trip Time calculated from Receiver Report. 421 rtt_ms: Option<f64>, 422 /// Fraction of packets lost. 423 fraction_lost: u8, 424 } 425 426 #[derive(Default, Debug)] 427 struct SenderReportEntry { 428 /// NTP timestamp(from Sender Report). 429 sr_ntp_time: Option<u64>, 430 /// Packets Sent(from Sender Report). 431 sr_packets_sent: Option<u32>, 432 /// Bytes Sent(from Sender Report). 433 sr_bytes_sent: Option<u32>, 434 /// Last RR timestamp(middle bits) from DLRR extended report block. 435 dlrr_last_rr: Option<u32>, 436 /// Delay since last RR from DLRR extended report block. 437 dlrr_delay_rr: Option<u32>, 438 } 439 440 #[derive(Default, Debug)] 441 struct Entry { 442 generic_rtcp: GenericRTCP, 443 receiver_reports: Vec<ReceiverReportEntry>, 444 sender_reports: Vec<SenderReportEntry>, 445 } 446 let updates = pkts 447 .iter() 448 .fold(HashMap::<u32, Entry>::new(), |mut acc, p| { 449 if let Some(rr) = p.as_any().downcast_ref::<ReceiverReport>() { 450 for recp in &rr.reports { 451 let e = acc.entry(recp.ssrc).or_default(); 452 453 let rtt_ms = if recp.delay != 0 { 454 calculate_rtt_ms(now, recp.delay, recp.last_sender_report) 455 } else { 456 None 457 }; 458 459 e.receiver_reports.push(ReceiverReportEntry { 460 ext_seq_num: recp.last_sequence_number, 461 total_lost: recp.total_lost, 462 jitter: recp.jitter, 463 rtt_ms, 464 fraction_lost: recp.fraction_lost, 465 }); 466 } 467 } else if let Some(fir) = p.as_any().downcast_ref::<FullIntraRequest>() { 468 for fir_entry in &fir.fir { 469 let e = acc.entry(fir_entry.ssrc).or_default(); 470 e.generic_rtcp.fir_count = 471 e.generic_rtcp.fir_count.map(|v| v + 1).or(Some(1)); 472 } 473 } else if let Some(pli) = p.as_any().downcast_ref::<PictureLossIndication>() { 474 let e = acc.entry(pli.media_ssrc).or_default(); 475 e.generic_rtcp.pli_count = e.generic_rtcp.pli_count.map(|v| v + 1).or(Some(1)); 476 } else if let Some(nack) = p.as_any().downcast_ref::<TransportLayerNack>() { 477 let count = nack.nacks.iter().flat_map(|p| p.into_iter()).count() as u64; 478 479 let e = acc.entry(nack.media_ssrc).or_default(); 480 e.generic_rtcp.nack_count = 481 e.generic_rtcp.nack_count.map(|v| v + count).or(Some(count)); 482 } else if let Some(sr) = p.as_any().downcast_ref::<SenderReport>() { 483 let e = acc.entry(sr.ssrc).or_default(); 484 let sr_e = { 485 let need_new_entry = e 486 .sender_reports 487 .last() 488 .map(|e| e.sr_packets_sent.is_some()) 489 .unwrap_or(true); 490 491 if need_new_entry { 492 e.sender_reports.push(Default::default()); 493 } 494 495 // SAFETY: Unrwap ok because we just added an entry above 496 e.sender_reports.last_mut().unwrap() 497 }; 498 499 sr_e.sr_ntp_time = Some(sr.ntp_time); 500 sr_e.sr_packets_sent = Some(sr.packet_count); 501 sr_e.sr_bytes_sent = Some(sr.octet_count); 502 } else if let Some(xr) = p.as_any().downcast_ref::<ExtendedReport>() { 503 // Extended Report(XR) 504 505 // We only care about DLRR reports 506 let dlrrs = xr.reports.iter().flat_map(|report| { 507 let dlrr = report.as_any().downcast_ref::<DLRRReportBlock>(); 508 509 dlrr.map(|b| b.reports.iter()).into_iter().flatten() 510 }); 511 512 for dlrr in dlrrs { 513 let e = acc.entry(dlrr.ssrc).or_default(); 514 let sr_e = { 515 let need_new_entry = e 516 .sender_reports 517 .last() 518 .map(|e| e.dlrr_last_rr.is_some()) 519 .unwrap_or(true); 520 521 if need_new_entry { 522 e.sender_reports.push(Default::default()); 523 } 524 525 // SAFETY: Unrwap ok because we just added an entry above 526 e.sender_reports.last_mut().unwrap() 527 }; 528 529 sr_e.dlrr_last_rr = Some(dlrr.last_rr); 530 sr_e.dlrr_delay_rr = Some(dlrr.dlrr); 531 } 532 } 533 534 acc 535 }); 536 537 for ( 538 ssrc, 539 Entry { 540 generic_rtcp, 541 mut receiver_reports, 542 mut sender_reports, 543 }, 544 ) in updates.into_iter() 545 { 546 // Sort RR by seq number low to high 547 receiver_reports.sort_by(|a, b| a.ext_seq_num.cmp(&b.ext_seq_num)); 548 // Sort SR by ntp time, low to high 549 sender_reports 550 .sort_by(|a, b| a.sr_ntp_time.unwrap_or(0).cmp(&b.sr_ntp_time.unwrap_or(0))); 551 552 let _ = self 553 .tx 554 .send(Message::StatUpdate { 555 ssrc, 556 update: StatsUpdate::InboundRTCP { 557 fir_count: generic_rtcp.fir_count, 558 pli_count: generic_rtcp.pli_count, 559 nack_count: generic_rtcp.nack_count, 560 }, 561 }) 562 .await; 563 564 let futures = receiver_reports.into_iter().map(|rr| { 565 self.tx.send(Message::StatUpdate { 566 ssrc, 567 update: StatsUpdate::InboundRecieverReport { 568 ext_seq_num: rr.ext_seq_num, 569 total_lost: rr.total_lost, 570 jitter: rr.jitter, 571 rtt_ms: rr.rtt_ms, 572 fraction_lost: rr.fraction_lost, 573 }, 574 }) 575 }); 576 for fut in futures { 577 // TODO: Use futures::join_all 578 let _ = fut.await; 579 } 580 581 let futures = sender_reports.into_iter().map(|sr| { 582 let rtt_ms = match (sr.dlrr_last_rr, sr.dlrr_delay_rr, sr.sr_packets_sent) { 583 (Some(last_rr), Some(delay_rr), Some(_)) if last_rr != 0 && delay_rr != 0 => { 584 calculate_rtt_ms(now, delay_rr, last_rr) 585 } 586 _ => None, 587 }; 588 589 self.tx.send(Message::StatUpdate { 590 ssrc, 591 update: StatsUpdate::InboundSenderRerport { 592 packets_and_bytes_sent: sr 593 .sr_packets_sent 594 .and_then(|ps| sr.sr_bytes_sent.map(|bs| (ps, bs))), 595 rtt_ms, 596 }, 597 }) 598 }); 599 for fut in futures { 600 // TODO: Use futures::join_all 601 let _ = fut.await; 602 } 603 } 604 605 Ok((n, attributes)) 606 } 607 } 608 609 pub struct RTCPWriteInterceptor<F> { 610 rtcp_writer: Arc<dyn RTCPWriter + Send + Sync>, 611 tx: mpsc::Sender<Message>, 612 now_gen: F, 613 } 614 615 #[async_trait] 616 impl<F> RTCPWriter for RTCPWriteInterceptor<F> 617 where 618 F: Fn() -> SystemTime + Send + Sync, 619 { 620 async fn write( 621 &self, 622 pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>], 623 attributes: &Attributes, 624 ) -> Result<usize> { 625 #[derive(Default, Debug)] 626 struct Entry { 627 fir_count: Option<u64>, 628 pli_count: Option<u64>, 629 nack_count: Option<u64>, 630 sr_ext_seq_num: Option<u32>, 631 } 632 let updates = pkts 633 .iter() 634 .fold(HashMap::<u32, Entry>::new(), |mut acc, p| { 635 if let Some(fir) = p.as_any().downcast_ref::<FullIntraRequest>() { 636 for fir_entry in &fir.fir { 637 let e = acc.entry(fir_entry.ssrc).or_default(); 638 e.fir_count = e.fir_count.map(|v| v + 1).or(Some(1)); 639 } 640 } else if let Some(pli) = p.as_any().downcast_ref::<PictureLossIndication>() { 641 let e = acc.entry(pli.media_ssrc).or_default(); 642 e.pli_count = e.pli_count.map(|v| v + 1).or(Some(1)); 643 } else if let Some(nack) = p.as_any().downcast_ref::<TransportLayerNack>() { 644 let count = nack.nacks.iter().flat_map(|p| p.into_iter()).count() as u64; 645 646 let e = acc.entry(nack.media_ssrc).or_default(); 647 e.nack_count = e.nack_count.map(|v| v + count).or(Some(count)); 648 } else if let Some(sr) = p.as_any().downcast_ref::<SenderReport>() { 649 for rep in &sr.reports { 650 let e = acc.entry(rep.ssrc).or_default(); 651 652 match e.sr_ext_seq_num { 653 // We want the initial value for `last_sequence_number` from the first 654 // SR. It's possible that an RTCP batch contains more than one SR, in 655 // which case we should use the lowest value. 656 Some(seq_num) if seq_num > rep.last_sequence_number => { 657 e.sr_ext_seq_num = Some(rep.last_sequence_number) 658 } 659 None => e.sr_ext_seq_num = Some(rep.last_sequence_number), 660 _ => {} 661 } 662 } 663 } 664 665 acc 666 }); 667 668 for ( 669 ssrc, 670 Entry { 671 fir_count, 672 pli_count, 673 nack_count, 674 sr_ext_seq_num, 675 }, 676 ) in updates.into_iter() 677 { 678 let _ = self 679 .tx 680 .send(Message::StatUpdate { 681 ssrc, 682 update: StatsUpdate::OutboundRTCP { 683 fir_count, 684 pli_count, 685 nack_count, 686 }, 687 }) 688 .await; 689 690 if let Some(seq_num) = sr_ext_seq_num { 691 let _ = self 692 .tx 693 .send(Message::StatUpdate { 694 ssrc, 695 update: StatsUpdate::OutboundSRExtSeqNum { seq_num }, 696 }) 697 .await; 698 } 699 } 700 701 self.rtcp_writer.write(pkts, attributes).await 702 } 703 } 704 705 pub struct RTPReadRecorder { 706 rtp_reader: Arc<dyn RTPReader + Send + Sync>, 707 tx: mpsc::Sender<Message>, 708 } 709 710 impl RTPReadRecorder { 711 fn new(rtp_reader: Arc<dyn RTPReader + Send + Sync>, tx: mpsc::Sender<Message>) -> Self { 712 Self { rtp_reader, tx } 713 } 714 } 715 716 impl fmt::Debug for RTPReadRecorder { 717 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 718 f.debug_struct("RTPReadRecorder").finish() 719 } 720 } 721 722 #[async_trait] 723 impl RTPReader for RTPReadRecorder { 724 async fn read(&self, buf: &mut [u8], attributes: &Attributes) -> Result<(usize, Attributes)> { 725 let (bytes_read, attributes) = self.rtp_reader.read(buf, attributes).await?; 726 // TODO: This parsing happens redundantly in several interceptors, would be good if we 727 // could not do this. 728 let mut b = &buf[..bytes_read]; 729 let packet = rtp::packet::Packet::unmarshal(&mut b)?; 730 731 let _ = self 732 .tx 733 .send(Message::StatUpdate { 734 ssrc: packet.header.ssrc, 735 update: StatsUpdate::InboundRTP { 736 packets: 1, 737 header_bytes: (bytes_read - packet.payload.len()) as u64, 738 payload_bytes: packet.payload.len() as u64, 739 last_packet_timestamp: SystemTime::now(), 740 }, 741 }) 742 .await; 743 744 Ok((bytes_read, attributes)) 745 } 746 } 747 748 pub struct RTPWriteRecorder { 749 rtp_writer: Arc<dyn RTPWriter + Send + Sync>, 750 tx: mpsc::Sender<Message>, 751 } 752 753 impl RTPWriteRecorder { 754 fn new(rtp_writer: Arc<dyn RTPWriter + Send + Sync>, tx: mpsc::Sender<Message>) -> Self { 755 Self { rtp_writer, tx } 756 } 757 } 758 759 impl fmt::Debug for RTPWriteRecorder { 760 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 761 f.debug_struct("RTPWriteRecorder").finish() 762 } 763 } 764 765 #[async_trait] 766 impl RTPWriter for RTPWriteRecorder { 767 /// write a rtp packet 768 async fn write(&self, pkt: &rtp::packet::Packet, attributes: &Attributes) -> Result<usize> { 769 let n = self.rtp_writer.write(pkt, attributes).await?; 770 771 let _ = self 772 .tx 773 .send(Message::StatUpdate { 774 ssrc: pkt.header.ssrc, 775 update: StatsUpdate::OutboundRTP { 776 packets: 1, 777 header_bytes: pkt.header.marshal_size() as u64, 778 payload_bytes: pkt.payload.len() as u64, 779 last_packet_timestamp: SystemTime::now(), 780 }, 781 }) 782 .await; 783 784 Ok(n) 785 } 786 } 787 788 /// Calculate the round trip time for a given peer as described in 789 /// [RFC3550 6.4.1](https://datatracker.ietf.org/doc/html/rfc3550#section-6.4.1). 790 /// 791 /// ## Params 792 /// 793 /// - `now` the current middle 32 bits of an NTP timestamp for the current time. 794 /// - `delay` the delay(`DLSR`) since last sender report expressed as fractions of a second in 32 bits. 795 /// - `last_report` the middle 32 bits of an NTP timestamp for the most recent sender report(LSR) or Receiver Report(LRR). 796 fn calculate_rtt_ms(now: u32, delay: u32, last_report: u32) -> Option<f64> { 797 // [10 Nov 1995 11:33:25.125 UTC] [10 Nov 1995 11:33:36.5 UTC] 798 // n SR(n) A=b710:8000 (46864.500 s) 799 // ----------------------------------------------------------------> 800 // v ^ 801 // ntp_sec =0xb44db705 v ^ dlsr=0x0005:4000 ( 5.250s) 802 // ntp_frac=0x20000000 v ^ lsr =0xb705:2000 (46853.125s) 803 // (3024992005.125 s) v ^ 804 // r v ^ RR(n) 805 // ----------------------------------------------------------------> 806 // |<-DLSR->| 807 // (5.250 s) 808 // 809 // A 0xb710:8000 (46864.500 s) 810 // DLSR -0x0005:4000 ( 5.250 s) 811 // LSR -0xb705:2000 (46853.125 s) 812 // ------------------------------- 813 // delay 0x0006:2000 ( 6.125 s) 814 815 let rtt = now.checked_sub(delay)?.checked_sub(last_report)?; 816 let rtt_seconds = rtt >> 16; 817 let rtt_fraction = (rtt & (u16::MAX as u32)) as f64 / (u16::MAX as u32) as f64; 818 819 Some(rtt_seconds as f64 * 1000.0 + rtt_fraction * 1000.0) 820 } 821 822 #[cfg(test)] 823 mod test { 824 // Silence warning on `..Default::default()` with no effect: 825 #![allow(clippy::needless_update)] 826 827 macro_rules! assert_feq { 828 ($left: expr, $right: expr) => { 829 assert_feq!($left, $right, 0.01); 830 }; 831 ($left: expr, $right: expr, $eps: expr) => { 832 if ($left - $right).abs() >= $eps { 833 panic!("{:?} was not within {:?} of {:?}", $left, $eps, $right); 834 } 835 }; 836 } 837 838 use bytes::Bytes; 839 use rtcp::extended_report::{DLRRReport, DLRRReportBlock, ExtendedReport}; 840 use rtcp::payload_feedbacks::full_intra_request::{FirEntry, FullIntraRequest}; 841 use rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndication; 842 use rtcp::receiver_report::ReceiverReport; 843 use rtcp::reception_report::ReceptionReport; 844 use rtcp::sender_report::SenderReport; 845 use rtcp::transport_feedbacks::transport_layer_nack::{NackPair, TransportLayerNack}; 846 847 use std::sync::Arc; 848 use std::time::{Duration, SystemTime}; 849 850 use crate::error::Result; 851 use crate::mock::mock_stream::MockStream; 852 use crate::stream_info::StreamInfo; 853 854 use super::StatsInterceptor; 855 856 #[tokio::test] 857 async fn test_stats_interceptor_rtp() -> Result<()> { 858 let icpr: Arc<_> = Arc::new(StatsInterceptor::new("Hello".to_owned())); 859 860 let recv_stream = MockStream::new( 861 &StreamInfo { 862 ssrc: 123456, 863 ..Default::default() 864 }, 865 icpr.clone(), 866 ) 867 .await; 868 869 let send_stream = MockStream::new( 870 &StreamInfo { 871 ssrc: 234567, 872 ..Default::default() 873 }, 874 icpr.clone(), 875 ) 876 .await; 877 878 recv_stream 879 .receive_rtp(rtp::packet::Packet { 880 header: rtp::header::Header { 881 ssrc: 123456, 882 ..Default::default() 883 }, 884 payload: Bytes::from_static(b"\xde\xad\xbe\xef"), 885 }) 886 .await; 887 888 let _ = recv_stream 889 .read_rtp() 890 .await 891 .expect("After calling receive_rtp read_rtp should return Some")?; 892 893 let _ = send_stream 894 .write_rtp(&rtp::packet::Packet { 895 header: rtp::header::Header { 896 ssrc: 234567, 897 ..Default::default() 898 }, 899 payload: Bytes::from_static(b"\xde\xad\xbe\xef\xde\xad\xbe\xef"), 900 }) 901 .await; 902 903 let _ = send_stream 904 .write_rtp(&rtp::packet::Packet { 905 header: rtp::header::Header { 906 ssrc: 234567, 907 ..Default::default() 908 }, 909 payload: Bytes::from_static(&[0x13, 0x37]), 910 }) 911 .await; 912 913 let snapshots = icpr.fetch_inbound_stats(vec![123456]).await; 914 let recv_snapshot = snapshots[0] 915 .as_ref() 916 .expect("Stats should exist for ssrc: 123456"); 917 assert_eq!(recv_snapshot.packets_received(), 1); 918 assert_eq!(recv_snapshot.header_bytes_received(), 12); 919 assert_eq!(recv_snapshot.payload_bytes_received(), 4); 920 921 let snapshots = icpr.fetch_outbound_stats(vec![234567]).await; 922 let send_snapshot = snapshots[0] 923 .as_ref() 924 .expect("Stats should exist for ssrc: 234567"); 925 assert_eq!(send_snapshot.packets_sent(), 2); 926 assert_eq!(send_snapshot.header_bytes_sent(), 24); 927 assert_eq!(send_snapshot.payload_bytes_sent(), 10); 928 929 Ok(()) 930 } 931 932 #[tokio::test] 933 async fn test_stats_interceptor_rtcp() -> Result<()> { 934 let icpr: Arc<_> = Arc::new(StatsInterceptor::with_time_gen("Hello".to_owned(), || { 935 // 10 Nov 1995 11:33:36.5 UTC 936 SystemTime::UNIX_EPOCH + Duration::from_secs_f64(816003216.5) 937 })); 938 939 let recv_stream = MockStream::new( 940 &StreamInfo { 941 ssrc: 123456, 942 ..Default::default() 943 }, 944 icpr.clone(), 945 ) 946 .await; 947 948 let send_stream = MockStream::new( 949 &StreamInfo { 950 ssrc: 234567, 951 ..Default::default() 952 }, 953 icpr.clone(), 954 ) 955 .await; 956 957 send_stream 958 .write_rtcp(&[Box::new(SenderReport { 959 ssrc: 234567, 960 reports: vec![ 961 ReceptionReport { 962 ssrc: 234567, 963 last_sequence_number: (5 << 16) | 10, 964 ..Default::default() 965 }, 966 ReceptionReport { 967 ssrc: 234567, 968 last_sequence_number: (5 << 16) | 85, 969 ..Default::default() 970 }, 971 ], 972 ..Default::default() 973 })]) 974 .await 975 .expect("Failed to write RTCP packets"); 976 977 send_stream 978 .receive_rtcp(vec![ 979 Box::new(ReceiverReport { 980 reports: vec![ 981 ReceptionReport { 982 ssrc: 234567, 983 last_sequence_number: (5 << 16) | 64, 984 total_lost: 5, 985 ..Default::default() 986 }, 987 ReceptionReport { 988 ssrc: 234567, 989 last_sender_report: 0xb705_2000, 990 delay: 0x0005_4000, 991 last_sequence_number: (5 << 16) | 70, 992 total_lost: 8, 993 fraction_lost: 32, 994 jitter: 2250, 995 ..Default::default() 996 }, 997 ], 998 ..Default::default() 999 }), 1000 Box::new(TransportLayerNack { 1001 sender_ssrc: 0, 1002 media_ssrc: 234567, 1003 nacks: vec![NackPair { 1004 packet_id: 5, 1005 lost_packets: 0b0011_0110, 1006 }], 1007 }), 1008 Box::new(TransportLayerNack { 1009 sender_ssrc: 0, 1010 // NB: Different SSRC 1011 media_ssrc: 999999, 1012 nacks: vec![NackPair { 1013 packet_id: 5, 1014 lost_packets: 0b0011_0110, 1015 }], 1016 }), 1017 Box::new(PictureLossIndication { 1018 sender_ssrc: 0, 1019 media_ssrc: 234567, 1020 }), 1021 Box::new(PictureLossIndication { 1022 sender_ssrc: 0, 1023 media_ssrc: 234567, 1024 }), 1025 Box::new(FullIntraRequest { 1026 sender_ssrc: 0, 1027 media_ssrc: 234567, 1028 fir: vec![ 1029 FirEntry { 1030 ssrc: 234567, 1031 sequence_number: 132, 1032 }, 1033 FirEntry { 1034 ssrc: 234567, 1035 sequence_number: 135, 1036 }, 1037 ], 1038 }), 1039 ]) 1040 .await; 1041 let snapshots = icpr.fetch_outbound_stats(vec![234567]).await; 1042 let send_snapshot = snapshots[0] 1043 .as_ref() 1044 .expect("Outbound Stats should exist for ssrc: 234567"); 1045 1046 assert!( 1047 send_snapshot.remote_round_trip_time().is_none() 1048 && send_snapshot.remote_round_trip_time_measurements() == 0, 1049 "Before receiving the first RR we should not have a remote round trip time" 1050 ); 1051 let _ = send_stream 1052 .read_rtcp() 1053 .await 1054 .expect("After calling `receive_rtcp`, `read_rtcp` should return some packets"); 1055 1056 recv_stream 1057 .write_rtcp(&[ 1058 Box::new(TransportLayerNack { 1059 sender_ssrc: 0, 1060 media_ssrc: 123456, 1061 nacks: vec![NackPair { 1062 packet_id: 5, 1063 lost_packets: 0b0011_0111, 1064 }], 1065 }), 1066 Box::new(TransportLayerNack { 1067 sender_ssrc: 0, 1068 // NB: Different SSRC 1069 media_ssrc: 999999, 1070 nacks: vec![NackPair { 1071 packet_id: 5, 1072 lost_packets: 0b1111_0110, 1073 }], 1074 }), 1075 Box::new(PictureLossIndication { 1076 sender_ssrc: 0, 1077 media_ssrc: 123456, 1078 }), 1079 Box::new(PictureLossIndication { 1080 sender_ssrc: 0, 1081 media_ssrc: 123456, 1082 }), 1083 Box::new(PictureLossIndication { 1084 sender_ssrc: 0, 1085 media_ssrc: 123456, 1086 }), 1087 Box::new(FullIntraRequest { 1088 sender_ssrc: 0, 1089 media_ssrc: 123456, 1090 fir: vec![FirEntry { 1091 ssrc: 123456, 1092 sequence_number: 132, 1093 }], 1094 }), 1095 ]) 1096 .await 1097 .expect("Failed to write RTCP packets for recv_stream"); 1098 1099 recv_stream 1100 .receive_rtcp(vec![ 1101 Box::new(SenderReport { 1102 ssrc: 123456, 1103 ntp_time: 12345, // Used for ordering 1104 packet_count: 52, 1105 octet_count: 8172, 1106 reports: vec![], 1107 ..Default::default() 1108 }), 1109 Box::new(SenderReport { 1110 ssrc: 123456, 1111 ntp_time: 23456, // Used for ordering 1112 packet_count: 82, 1113 octet_count: 10351, 1114 reports: vec![], 1115 ..Default::default() 1116 }), 1117 Box::new(ExtendedReport { 1118 sender_ssrc: 928191, 1119 reports: vec![Box::new(DLRRReportBlock { 1120 reports: vec![DLRRReport { 1121 ssrc: 123456, 1122 last_rr: 0xb705_2000, 1123 dlrr: 0x0005_4000, 1124 }], 1125 })], 1126 }), 1127 Box::new(SenderReport { 1128 /// NB: Different SSRC 1129 ssrc: 9999999, 1130 ntp_time: 99999, // Used for ordering 1131 packet_count: 1231, 1132 octet_count: 193812, 1133 reports: vec![], 1134 ..Default::default() 1135 }), 1136 ]) 1137 .await; 1138 1139 let snapshots = icpr.fetch_inbound_stats(vec![123456]).await; 1140 let recv_snapshot = snapshots[0] 1141 .as_ref() 1142 .expect("Stats should exist for ssrc: 123456"); 1143 assert!( 1144 recv_snapshot.remote_round_trip_time().is_none() 1145 && recv_snapshot.remote_round_trip_time_measurements() == 0, 1146 "Before receiving the first SR/DLRR we should not have a remote round trip time" 1147 ); 1148 1149 let _ = recv_stream.read_rtcp().await.expect("read_rtcp failed"); 1150 1151 let snapshots = icpr.fetch_outbound_stats(vec![234567]).await; 1152 let send_snapshot = snapshots[0] 1153 .as_ref() 1154 .expect("Outbound Stats should exist for ssrc: 234567"); 1155 let rtt_ms = send_snapshot.remote_round_trip_time().expect( 1156 "After receiving an RR with a DSLR block we should have a remote round trip time", 1157 ); 1158 assert_feq!(rtt_ms, 6125.0); 1159 1160 assert_eq!(send_snapshot.nacks_received(), 5); 1161 assert_eq!(send_snapshot.plis_received(), 2); 1162 assert_eq!(send_snapshot.firs_received(), 2); 1163 // Last Seq Num(RR) - total lost(RR) - Initial Seq Num(SR) + 1 1164 // 70 - 8 - 10 + 1 = 53 1165 assert_eq!(send_snapshot.remote_packets_received(), 53); 1166 assert_feq!( 1167 send_snapshot 1168 .remote_fraction_lost() 1169 .expect("Should have a fraction lost values after receiving RR"), 1170 32.0 / 256.0 1171 ); 1172 assert_eq!(send_snapshot.remote_total_lost(), 8); 1173 assert_eq!(send_snapshot.remote_jitter(), 2250); 1174 1175 let snapshots = icpr.fetch_inbound_stats(vec![123456]).await; 1176 let recv_snapshot = snapshots[0] 1177 .as_ref() 1178 .expect("Stats should exist for ssrc: 123456"); 1179 assert_eq!(recv_snapshot.nacks_sent(), 6); 1180 assert_eq!(recv_snapshot.plis_sent(), 3); 1181 assert_eq!(recv_snapshot.firs_sent(), 1); 1182 assert_eq!(recv_snapshot.remote_packets_sent(), 82); 1183 assert_eq!(recv_snapshot.remote_bytes_sent(), 10351); 1184 let rtt_ms = recv_snapshot 1185 .remote_round_trip_time() 1186 .expect("After reciving SR and DLRR we should have a round trip time "); 1187 assert_feq!(rtt_ms, 6125.0); 1188 assert_eq!(recv_snapshot.remote_reports_sent(), 2); 1189 assert_eq!(recv_snapshot.remote_round_trip_time_measurements(), 1); 1190 assert_feq!(recv_snapshot.remote_total_round_trip_time(), 6125.0); 1191 1192 Ok(()) 1193 } 1194 } 1195