1 use super::*;
2 use crate::{Attributes, RTPReader};
3 
4 use async_trait::async_trait;
5 use std::time::SystemTime;
6 use util::sync::Mutex;
7 use util::Unmarshal;
8 
9 struct ReceiverStreamInternal {
10     ssrc: u32,
11     receiver_ssrc: u32,
12     clock_rate: f64,
13 
14     packets: Vec<u64>,
15     started: bool,
16     seq_num_cycles: u16,
17     last_seq_num: i32,
18     last_report_seq_num: i32,
19     last_rtp_time_rtp: u32,
20     last_rtp_time_time: SystemTime,
21     jitter: f64,
22     last_sender_report: u32,
23     last_sender_report_time: SystemTime,
24     total_lost: u32,
25 }
26 
27 impl ReceiverStreamInternal {
set_received(&mut self, seq: u16)28     fn set_received(&mut self, seq: u16) {
29         let pos = (seq as usize) % self.packets.len();
30         self.packets[pos / 64] |= 1 << (pos % 64);
31     }
32 
del_received(&mut self, seq: u16)33     fn del_received(&mut self, seq: u16) {
34         let pos = (seq as usize) % self.packets.len();
35         self.packets[pos / 64] &= u64::MAX ^ (1u64 << (pos % 64));
36     }
37 
get_received(&self, seq: u16) -> bool38     fn get_received(&self, seq: u16) -> bool {
39         let pos = (seq as usize) % self.packets.len();
40         (self.packets[pos / 64] & (1 << (pos % 64))) != 0
41     }
42 
process_rtp(&mut self, now: SystemTime, pkt: &rtp::packet::Packet)43     fn process_rtp(&mut self, now: SystemTime, pkt: &rtp::packet::Packet) {
44         if !self.started {
45             // first frame
46             self.started = true;
47             self.set_received(pkt.header.sequence_number);
48             self.last_seq_num = pkt.header.sequence_number as i32;
49             self.last_report_seq_num = pkt.header.sequence_number as i32 - 1;
50         } else {
51             // following frames
52             self.set_received(pkt.header.sequence_number);
53 
54             let diff = pkt.header.sequence_number as i32 - self.last_seq_num;
55             if !(-0x0FFF..=0).contains(&diff) {
56                 // overflow
57                 if diff < -0x0FFF {
58                     self.seq_num_cycles += 1;
59                 }
60 
61                 // set missing packets as missing
62                 for i in self.last_seq_num + 1..pkt.header.sequence_number as i32 {
63                     self.del_received(i as u16);
64                 }
65 
66                 self.last_seq_num = pkt.header.sequence_number as i32;
67             }
68 
69             // compute jitter
70             // https://tools.ietf.org/html/rfc3550#page-39
71             let d = now
72                 .duration_since(self.last_rtp_time_time)
73                 .unwrap_or_else(|_| Duration::from_secs(0))
74                 .as_secs_f64()
75                 * self.clock_rate
76                 - (pkt.header.timestamp as f64 - self.last_rtp_time_rtp as f64);
77             self.jitter += (d.abs() - self.jitter) / 16.0;
78         }
79 
80         self.last_rtp_time_rtp = pkt.header.timestamp;
81         self.last_rtp_time_time = now;
82     }
83 
process_sender_report(&mut self, now: SystemTime, sr: &rtcp::sender_report::SenderReport)84     fn process_sender_report(&mut self, now: SystemTime, sr: &rtcp::sender_report::SenderReport) {
85         self.last_sender_report = (sr.ntp_time >> 16) as u32;
86         self.last_sender_report_time = now;
87     }
88 
generate_report(&mut self, now: SystemTime) -> rtcp::receiver_report::ReceiverReport89     fn generate_report(&mut self, now: SystemTime) -> rtcp::receiver_report::ReceiverReport {
90         let total_since_report = (self.last_seq_num - self.last_report_seq_num) as u16;
91         let mut total_lost_since_report = {
92             if self.last_seq_num == self.last_report_seq_num {
93                 0
94             } else {
95                 let mut ret = 0u32;
96                 let mut i = (self.last_report_seq_num + 1) as u16;
97                 while i != self.last_seq_num as u16 {
98                     if !self.get_received(i) {
99                         ret += 1;
100                     }
101                     i = i.wrapping_add(1);
102                 }
103                 ret
104             }
105         };
106 
107         self.total_lost += total_lost_since_report;
108 
109         // allow up to 24 bits
110         if total_lost_since_report > 0xFFFFFF {
111             total_lost_since_report = 0xFFFFFF;
112         }
113         if self.total_lost > 0xFFFFFF {
114             self.total_lost = 0xFFFFFF
115         }
116 
117         let r = rtcp::receiver_report::ReceiverReport {
118             ssrc: self.receiver_ssrc,
119             reports: vec![rtcp::reception_report::ReceptionReport {
120                 ssrc: self.ssrc,
121                 last_sequence_number: (self.seq_num_cycles as u32) << 16
122                     | (self.last_seq_num as u32),
123                 last_sender_report: self.last_sender_report,
124                 fraction_lost: ((total_lost_since_report * 256) as f64 / total_since_report as f64)
125                     as u8,
126                 total_lost: self.total_lost,
127                 delay: {
128                     if self.last_sender_report_time == SystemTime::UNIX_EPOCH {
129                         0
130                     } else {
131                         match now.duration_since(self.last_sender_report_time) {
132                             Ok(d) => (d.as_secs_f64() * 65536.0) as u32,
133                             Err(_) => 0,
134                         }
135                     }
136                 },
137                 jitter: self.jitter as u32,
138             }],
139             ..Default::default()
140         };
141 
142         self.last_report_seq_num = self.last_seq_num;
143 
144         r
145     }
146 }
147 
148 pub(crate) struct ReceiverStream {
149     parent_rtp_reader: Arc<dyn RTPReader + Send + Sync>,
150     now: Option<FnTimeGen>,
151 
152     internal: Mutex<ReceiverStreamInternal>,
153 }
154 
155 impl ReceiverStream {
new( ssrc: u32, clock_rate: u32, reader: Arc<dyn RTPReader + Send + Sync>, now: Option<FnTimeGen>, ) -> Self156     pub(crate) fn new(
157         ssrc: u32,
158         clock_rate: u32,
159         reader: Arc<dyn RTPReader + Send + Sync>,
160         now: Option<FnTimeGen>,
161     ) -> Self {
162         let receiver_ssrc = rand::random::<u32>();
163         ReceiverStream {
164             parent_rtp_reader: reader,
165             now,
166 
167             internal: Mutex::new(ReceiverStreamInternal {
168                 ssrc,
169                 receiver_ssrc,
170                 clock_rate: clock_rate as f64,
171 
172                 packets: vec![0u64; 128],
173                 started: false,
174                 seq_num_cycles: 0,
175                 last_seq_num: 0,
176                 last_report_seq_num: 0,
177                 last_rtp_time_rtp: 0,
178                 last_rtp_time_time: SystemTime::UNIX_EPOCH,
179                 jitter: 0.0,
180                 last_sender_report: 0,
181                 last_sender_report_time: SystemTime::UNIX_EPOCH,
182                 total_lost: 0,
183             }),
184         }
185     }
186 
process_rtp(&self, now: SystemTime, pkt: &rtp::packet::Packet)187     pub(crate) fn process_rtp(&self, now: SystemTime, pkt: &rtp::packet::Packet) {
188         let mut internal = self.internal.lock();
189         internal.process_rtp(now, pkt);
190     }
191 
process_sender_report( &self, now: SystemTime, sr: &rtcp::sender_report::SenderReport, )192     pub(crate) fn process_sender_report(
193         &self,
194         now: SystemTime,
195         sr: &rtcp::sender_report::SenderReport,
196     ) {
197         let mut internal = self.internal.lock();
198         internal.process_sender_report(now, sr);
199     }
200 
generate_report(&self, now: SystemTime) -> rtcp::receiver_report::ReceiverReport201     pub(crate) fn generate_report(&self, now: SystemTime) -> rtcp::receiver_report::ReceiverReport {
202         let mut internal = self.internal.lock();
203         internal.generate_report(now)
204     }
205 }
206 
207 /// RTPReader is used by Interceptor.bind_remote_stream.
208 #[async_trait]
209 impl RTPReader for ReceiverStream {
210     /// read a rtp packet
read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)>211     async fn read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> {
212         let (n, attr) = self.parent_rtp_reader.read(buf, a).await?;
213 
214         let mut b = &buf[..n];
215         let pkt = rtp::packet::Packet::unmarshal(&mut b)?;
216         let now = if let Some(f) = &self.now {
217             f()
218         } else {
219             SystemTime::now()
220         };
221         self.process_rtp(now, &pkt);
222 
223         Ok((n, attr))
224     }
225 }
226