1 use super::*; 2 use crate::{Attributes, RTPWriter}; 3 4 use async_trait::async_trait; 5 use rtp::extension::abs_send_time_extension::unix2ntp; 6 use std::convert::TryInto; 7 use std::sync::Arc; 8 use std::time::{Duration, SystemTime}; 9 use tokio::sync::Mutex; 10 11 struct SenderStreamInternal { 12 ssrc: u32, 13 clock_rate: f64, 14 15 /// data from rtp packets 16 last_rtp_time_rtp: u32, 17 last_rtp_time_time: SystemTime, 18 counters: Counters, 19 } 20 21 impl SenderStreamInternal { process_rtp(&mut self, now: SystemTime, pkt: &rtp::packet::Packet)22 fn process_rtp(&mut self, now: SystemTime, pkt: &rtp::packet::Packet) { 23 // always update time to minimize errors 24 self.last_rtp_time_rtp = pkt.header.timestamp; 25 self.last_rtp_time_time = now; 26 27 self.counters.increment_packets(); 28 self.counters.count_octets(pkt.payload.len()); 29 } 30 generate_report(&mut self, now: SystemTime) -> rtcp::sender_report::SenderReport31 fn generate_report(&mut self, now: SystemTime) -> rtcp::sender_report::SenderReport { 32 rtcp::sender_report::SenderReport { 33 ssrc: self.ssrc, 34 ntp_time: unix2ntp(now), 35 rtp_time: self.last_rtp_time_rtp.wrapping_add( 36 (now.duration_since(self.last_rtp_time_time) 37 .unwrap_or_else(|_| Duration::from_secs(0)) 38 .as_secs_f64() 39 * self.clock_rate) as u32, 40 ), 41 packet_count: self.counters.packet_count(), 42 octet_count: self.counters.octet_count(), 43 ..Default::default() 44 } 45 } 46 } 47 48 pub(crate) struct SenderStream { 49 next_rtp_writer: Arc<dyn RTPWriter + Send + Sync>, 50 now: Option<FnTimeGen>, 51 52 internal: Mutex<SenderStreamInternal>, 53 } 54 55 impl SenderStream { new( ssrc: u32, clock_rate: u32, writer: Arc<dyn RTPWriter + Send + Sync>, now: Option<FnTimeGen>, ) -> Self56 pub(crate) fn new( 57 ssrc: u32, 58 clock_rate: u32, 59 writer: Arc<dyn RTPWriter + Send + Sync>, 60 now: Option<FnTimeGen>, 61 ) -> Self { 62 SenderStream { 63 next_rtp_writer: writer, 64 now, 65 66 internal: Mutex::new(SenderStreamInternal { 67 ssrc, 68 clock_rate: clock_rate as f64, 69 last_rtp_time_rtp: 0, 70 last_rtp_time_time: SystemTime::UNIX_EPOCH, 71 counters: Default::default(), 72 }), 73 } 74 } 75 process_rtp(&self, now: SystemTime, pkt: &rtp::packet::Packet)76 async fn process_rtp(&self, now: SystemTime, pkt: &rtp::packet::Packet) { 77 let mut internal = self.internal.lock().await; 78 internal.process_rtp(now, pkt); 79 } 80 generate_report( &self, now: SystemTime, ) -> rtcp::sender_report::SenderReport81 pub(crate) async fn generate_report( 82 &self, 83 now: SystemTime, 84 ) -> rtcp::sender_report::SenderReport { 85 let mut internal = self.internal.lock().await; 86 internal.generate_report(now) 87 } 88 } 89 90 /// RTPWriter is used by Interceptor.bind_local_stream. 91 #[async_trait] 92 impl RTPWriter for SenderStream { 93 /// write a rtp packet write(&self, pkt: &rtp::packet::Packet, a: &Attributes) -> Result<usize>94 async fn write(&self, pkt: &rtp::packet::Packet, a: &Attributes) -> Result<usize> { 95 let now = if let Some(f) = &self.now { 96 f() 97 } else { 98 SystemTime::now() 99 }; 100 self.process_rtp(now, pkt).await; 101 102 self.next_rtp_writer.write(pkt, a).await 103 } 104 } 105 106 #[derive(Default)] 107 pub(crate) struct Counters { 108 packets: u32, 109 octets: u32, 110 } 111 112 /// Wrapping counters used for generating [`rtcp::sender_report::SenderReport`] 113 impl Counters { increment_packets(&mut self)114 pub fn increment_packets(&mut self) { 115 self.packets = self.packets.wrapping_add(1); 116 } 117 count_octets(&mut self, octets: usize)118 pub fn count_octets(&mut self, octets: usize) { 119 // account for a payload size of at most `u32::MAX` 120 // and log a message if larger 121 self.octets = self 122 .octets 123 .wrapping_add(octets.try_into().unwrap_or_else(|_| { 124 log::warn!("packet payload larger than 32 bits"); 125 u32::MAX 126 })); 127 } 128 packet_count(&self) -> u32129 pub fn packet_count(&self) -> u32 { 130 self.packets 131 } 132 octet_count(&self) -> u32133 pub fn octet_count(&self) -> u32 { 134 self.octets 135 } 136 137 #[cfg(test)] mock(packets: u32, octets: u32) -> Self138 pub fn mock(packets: u32, octets: u32) -> Self { 139 Self { packets, octets } 140 } 141 } 142