1 use super::*; 2 use crate::{Attributes, RTPReader}; 3 4 use async_trait::async_trait; 5 use std::time::SystemTime; 6 use util::sync::Mutex; 7 use util::Unmarshal; 8 9 struct ReceiverStreamInternal { 10 ssrc: u32, 11 receiver_ssrc: u32, 12 clock_rate: f64, 13 14 packets: Vec<u64>, 15 started: bool, 16 seq_num_cycles: u16, 17 last_seq_num: i32, 18 last_report_seq_num: i32, 19 last_rtp_time_rtp: u32, 20 last_rtp_time_time: SystemTime, 21 jitter: f64, 22 last_sender_report: u32, 23 last_sender_report_time: SystemTime, 24 total_lost: u32, 25 } 26 27 impl ReceiverStreamInternal { set_received(&mut self, seq: u16)28 fn set_received(&mut self, seq: u16) { 29 let pos = (seq as usize) % self.packets.len(); 30 self.packets[pos / 64] |= 1 << (pos % 64); 31 } 32 del_received(&mut self, seq: u16)33 fn del_received(&mut self, seq: u16) { 34 let pos = (seq as usize) % self.packets.len(); 35 self.packets[pos / 64] &= u64::MAX ^ (1u64 << (pos % 64)); 36 } 37 get_received(&self, seq: u16) -> bool38 fn get_received(&self, seq: u16) -> bool { 39 let pos = (seq as usize) % self.packets.len(); 40 (self.packets[pos / 64] & (1 << (pos % 64))) != 0 41 } 42 process_rtp(&mut self, now: SystemTime, pkt: &rtp::packet::Packet)43 fn process_rtp(&mut self, now: SystemTime, pkt: &rtp::packet::Packet) { 44 if !self.started { 45 // first frame 46 self.started = true; 47 self.set_received(pkt.header.sequence_number); 48 self.last_seq_num = pkt.header.sequence_number as i32; 49 self.last_report_seq_num = pkt.header.sequence_number as i32 - 1; 50 } else { 51 // following frames 52 self.set_received(pkt.header.sequence_number); 53 54 let diff = pkt.header.sequence_number as i32 - self.last_seq_num; 55 if !(-0x0FFF..=0).contains(&diff) { 56 // overflow 57 if diff < -0x0FFF { 58 self.seq_num_cycles += 1; 59 } 60 61 // set missing packets as missing 62 for i in self.last_seq_num + 1..pkt.header.sequence_number as i32 { 63 self.del_received(i as u16); 64 } 65 66 self.last_seq_num = pkt.header.sequence_number as i32; 67 } 68 69 // compute jitter 70 // https://tools.ietf.org/html/rfc3550#page-39 71 let d = now 72 .duration_since(self.last_rtp_time_time) 73 .unwrap_or_else(|_| Duration::from_secs(0)) 74 .as_secs_f64() 75 * self.clock_rate 76 - (pkt.header.timestamp as f64 - self.last_rtp_time_rtp as f64); 77 self.jitter += (d.abs() - self.jitter) / 16.0; 78 } 79 80 self.last_rtp_time_rtp = pkt.header.timestamp; 81 self.last_rtp_time_time = now; 82 } 83 process_sender_report(&mut self, now: SystemTime, sr: &rtcp::sender_report::SenderReport)84 fn process_sender_report(&mut self, now: SystemTime, sr: &rtcp::sender_report::SenderReport) { 85 self.last_sender_report = (sr.ntp_time >> 16) as u32; 86 self.last_sender_report_time = now; 87 } 88 generate_report(&mut self, now: SystemTime) -> rtcp::receiver_report::ReceiverReport89 fn generate_report(&mut self, now: SystemTime) -> rtcp::receiver_report::ReceiverReport { 90 let total_since_report = (self.last_seq_num - self.last_report_seq_num) as u16; 91 let mut total_lost_since_report = { 92 if self.last_seq_num == self.last_report_seq_num { 93 0 94 } else { 95 let mut ret = 0u32; 96 let mut i = (self.last_report_seq_num + 1) as u16; 97 while i != self.last_seq_num as u16 { 98 if !self.get_received(i) { 99 ret += 1; 100 } 101 i = i.wrapping_add(1); 102 } 103 ret 104 } 105 }; 106 107 self.total_lost += total_lost_since_report; 108 109 // allow up to 24 bits 110 if total_lost_since_report > 0xFFFFFF { 111 total_lost_since_report = 0xFFFFFF; 112 } 113 if self.total_lost > 0xFFFFFF { 114 self.total_lost = 0xFFFFFF 115 } 116 117 let r = rtcp::receiver_report::ReceiverReport { 118 ssrc: self.receiver_ssrc, 119 reports: vec![rtcp::reception_report::ReceptionReport { 120 ssrc: self.ssrc, 121 last_sequence_number: (self.seq_num_cycles as u32) << 16 122 | (self.last_seq_num as u32), 123 last_sender_report: self.last_sender_report, 124 fraction_lost: ((total_lost_since_report * 256) as f64 / total_since_report as f64) 125 as u8, 126 total_lost: self.total_lost, 127 delay: { 128 if self.last_sender_report_time == SystemTime::UNIX_EPOCH { 129 0 130 } else { 131 match now.duration_since(self.last_sender_report_time) { 132 Ok(d) => (d.as_secs_f64() * 65536.0) as u32, 133 Err(_) => 0, 134 } 135 } 136 }, 137 jitter: self.jitter as u32, 138 }], 139 ..Default::default() 140 }; 141 142 self.last_report_seq_num = self.last_seq_num; 143 144 r 145 } 146 } 147 148 pub(crate) struct ReceiverStream { 149 parent_rtp_reader: Arc<dyn RTPReader + Send + Sync>, 150 now: Option<FnTimeGen>, 151 152 internal: Mutex<ReceiverStreamInternal>, 153 } 154 155 impl ReceiverStream { new( ssrc: u32, clock_rate: u32, reader: Arc<dyn RTPReader + Send + Sync>, now: Option<FnTimeGen>, ) -> Self156 pub(crate) fn new( 157 ssrc: u32, 158 clock_rate: u32, 159 reader: Arc<dyn RTPReader + Send + Sync>, 160 now: Option<FnTimeGen>, 161 ) -> Self { 162 let receiver_ssrc = rand::random::<u32>(); 163 ReceiverStream { 164 parent_rtp_reader: reader, 165 now, 166 167 internal: Mutex::new(ReceiverStreamInternal { 168 ssrc, 169 receiver_ssrc, 170 clock_rate: clock_rate as f64, 171 172 packets: vec![0u64; 128], 173 started: false, 174 seq_num_cycles: 0, 175 last_seq_num: 0, 176 last_report_seq_num: 0, 177 last_rtp_time_rtp: 0, 178 last_rtp_time_time: SystemTime::UNIX_EPOCH, 179 jitter: 0.0, 180 last_sender_report: 0, 181 last_sender_report_time: SystemTime::UNIX_EPOCH, 182 total_lost: 0, 183 }), 184 } 185 } 186 process_rtp(&self, now: SystemTime, pkt: &rtp::packet::Packet)187 pub(crate) fn process_rtp(&self, now: SystemTime, pkt: &rtp::packet::Packet) { 188 let mut internal = self.internal.lock(); 189 internal.process_rtp(now, pkt); 190 } 191 process_sender_report( &self, now: SystemTime, sr: &rtcp::sender_report::SenderReport, )192 pub(crate) fn process_sender_report( 193 &self, 194 now: SystemTime, 195 sr: &rtcp::sender_report::SenderReport, 196 ) { 197 let mut internal = self.internal.lock(); 198 internal.process_sender_report(now, sr); 199 } 200 generate_report(&self, now: SystemTime) -> rtcp::receiver_report::ReceiverReport201 pub(crate) fn generate_report(&self, now: SystemTime) -> rtcp::receiver_report::ReceiverReport { 202 let mut internal = self.internal.lock(); 203 internal.generate_report(now) 204 } 205 } 206 207 /// RTPReader is used by Interceptor.bind_remote_stream. 208 #[async_trait] 209 impl RTPReader for ReceiverStream { 210 /// read a rtp packet read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)>211 async fn read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> { 212 let (n, attr) = self.parent_rtp_reader.read(buf, a).await?; 213 214 let mut b = &buf[..n]; 215 let pkt = rtp::packet::Packet::unmarshal(&mut b)?; 216 let now = if let Some(f) = &self.now { 217 f() 218 } else { 219 SystemTime::now() 220 }; 221 self.process_rtp(now, &pkt); 222 223 Ok((n, attr)) 224 } 225 } 226