1 use crate::rtp::utils;
2 use crate::rtp::RtpPacket;
3 use bytes::BytesMut;
4
5 use super::{
6 rtcp_app::RtcpApp,
7 rtcp_bye::RtcpBye,
8 rtcp_header::RtcpHeader,
9 rtcp_rr::{ReportBlock, RtcpReceiverReport},
10 rtcp_sr::RtcpSenderReport,
11 RTCP_RR,
12 };
13
14 //For example: sequence numbers inserted are 65533, 65534, the new coming one is 2,
15 //the new is 2 and old is 65534, the distance between 2 and 65534 is 4 which is
16 //65535 - 65534 + 2 + 1.(65533,65534,65535,0,1,2)
distance(new: u16, old: u16) -> u1617 pub fn distance(new: u16, old: u16) -> u16 {
18 if new < old {
19 65535 - old + new + 1
20 } else {
21 new - old
22 }
23 }
24
25 const MIN_SEQUENTIAL: u32 = 2;
26 const RTP_SEQ_MOD: u32 = 1 << 16;
27 const MAX_DROPOUT: u32 = 3000;
28 const MAX_MISORDER: u32 = 100;
29
30 /*
31 * Per-source state information
32 */
33 #[derive(Debug, Clone, Default)]
34 struct RtcpSource {
35 max_seq: u16, /* highest seq. number seen */
36 cycles: u32, /* shifted count of seq. number cycles */
37 base_seq: u32, /* base seq number */
38 bad_seq: u32, /* last 'bad' seq number + 1 */
39 probation: u32, /* sequ. packets till source is valid */
40 received: u32, /* packets received */
41 expected_prior: u32, /* packet expected at last interval */
42 received_prior: u32, /* packet received at last interval */
43 jitter: f64, /* estimated jitter */
44 }
45
46 impl RtcpSource {
47 //static int rtp_seq_update(struct rtp_member *sender, uint16_t seq)
update_sequence(&mut self, seq: u16) -> usize48 fn update_sequence(&mut self, seq: u16) -> usize {
49 let delta = distance(seq, self.max_seq);
50
51 if self.probation > 0 {
52 /* packet is in sequence */
53 if seq == self.max_seq + 1 {
54 self.probation -= 1;
55 self.max_seq = seq;
56 if self.probation == 0 {
57 self.init_seq(seq);
58 self.received += 1;
59 return 1;
60 }
61 } else {
62 self.probation = MIN_SEQUENTIAL - 1;
63 self.max_seq = seq;
64 }
65 return 0;
66 } else if delta < MAX_DROPOUT as u16 {
67 /* in order, with permissible gap */
68 if seq < self.max_seq {
69 /*
70 * Sequence number wrapped - count another 64K cycle.
71 */
72 self.cycles += RTP_SEQ_MOD;
73 }
74 self.max_seq = seq;
75 } else if delta as u32 <= RTP_SEQ_MOD - MAX_MISORDER {
76 if seq == self.bad_seq as u16 {
77 /*
78 * Two sequential packets -- assume that the other side
79 * restarted without telling us so just re-sync
80 * (i.e., pretend this was the first packet).
81 */
82 self.init_seq(seq);
83 } else {
84 self.bad_seq = (seq as u32 + 1) & (RTP_SEQ_MOD - 1);
85 return 0;
86 }
87 } else {
88 /* duplicate or reordered packet */
89 }
90 self.received += 1;
91
92 1
93 }
94
init_seq(&mut self, seq: u16)95 fn init_seq(&mut self, seq: u16) {
96 self.base_seq = seq as u32;
97 self.max_seq = seq;
98 self.bad_seq = RTP_SEQ_MOD + 1; /* so seq == bad_seq is false */
99 self.cycles = 0;
100 self.received = 0;
101 self.received_prior = 0;
102 self.expected_prior = 0;
103 /* other initialization */
104
105 self.base_seq = seq as u32;
106 self.max_seq = seq;
107 self.bad_seq = RTP_SEQ_MOD + 1;
108 }
109 }
110
111 #[derive(Debug, Clone, Default)]
112 pub struct RtcpContext {
113 ssrc: u32,
114 sender_ssrc: u32,
115
116 sr_ntp_lsr: u64,
117 sr_clock_time: u64,
118 last_rtp_clock: u64,
119 last_rtp_timestamp: u32,
120 sample_rate: u32,
121 send_bytes: u64,
122 send_packets: u64,
123
124 source: RtcpSource,
125 }
126
127 impl RtcpContext {
new(ssrc: u32, seq: u16, sample_rate: u32) -> Self128 pub fn new(ssrc: u32, seq: u16, sample_rate: u32) -> Self {
129 RtcpContext {
130 ssrc,
131 source: RtcpSource {
132 max_seq: seq - 1,
133 probation: MIN_SEQUENTIAL,
134 ..Default::default()
135 },
136
137 sample_rate,
138 ..Default::default()
139 }
140 }
141
generate_app(&self, name: String, data: BytesMut) -> RtcpApp142 pub fn generate_app(&self, name: String, data: BytesMut) -> RtcpApp {
143 let mut buf = BytesMut::with_capacity(name.len());
144 buf.extend_from_slice(name.as_bytes());
145
146 RtcpApp {
147 ssrc: self.ssrc,
148 name: buf,
149 app_data: data,
150 ..Default::default()
151 }
152 }
153
generate_bye(&self) -> RtcpBye154 pub fn generate_bye(&self) -> RtcpBye {
155 let ssrss = vec![self.ssrc];
156 RtcpBye {
157 header: RtcpHeader {
158 report_count: 1,
159 ..Default::default()
160 },
161 ssrss,
162 ..Default::default()
163 }
164 }
165
166 //int rtcp_report_block(struct rtp_member* sender, uint8_t* ptr, int bytes)
gen_report_block(&mut self) -> ReportBlock167 fn gen_report_block(&mut self) -> ReportBlock {
168 let extend_max = self.source.cycles + self.source.max_seq as u32;
169 let expected = extend_max - self.source.base_seq + 1;
170 let lost = expected - self.source.received;
171 let expected_interval = expected - self.source.expected_prior;
172 self.source.expected_prior = expected;
173
174 let received_interval = self.source.received - self.source.received_prior;
175 self.source.received_prior = self.source.received;
176 let lost_interval = expected_interval as i64 - received_interval as i64;
177
178 let fraction = if expected_interval == 0 || lost_interval < 0 {
179 0
180 } else {
181 ((lost_interval as u32) << 8) / expected_interval
182 };
183
184 let delay = utils::current_time() - self.sr_clock_time;
185 let lsr = self.sr_ntp_lsr >> 8 & 0xFFFFFFFF;
186 let dlsr = (delay as f64 / 1000000. * 65535.) as u32;
187
188 ReportBlock {
189 cumutlative_num_of_packets_lost: lost,
190 fraction_lost: fraction as u8,
191 extended_highest_seq_number: extend_max,
192 lsr: lsr as u32,
193 dlsr,
194 ssrc: self.sender_ssrc,
195 jitter: self.source.jitter as u32,
196 }
197 }
198
generate_rr(&mut self) -> RtcpReceiverReport199 pub fn generate_rr(&mut self) -> RtcpReceiverReport {
200 let block = self.gen_report_block();
201
202 RtcpReceiverReport {
203 header: RtcpHeader {
204 payload_type: RTCP_RR,
205 report_count: 1,
206 version: 2,
207 length: (4 + 24) / 4,
208 ..Default::default()
209 },
210 report_blocks: vec![block],
211 ssrc: self.ssrc,
212 }
213 }
214
send_rtp(&mut self, pkt: RtpPacket)215 pub fn send_rtp(&mut self, pkt: RtpPacket) {
216 self.send_bytes += pkt.payload.len() as u64;
217 self.send_packets += 1;
218 self.last_rtp_timestamp = pkt.header.timestamp;
219 }
220
received_sr(&mut self, sr: &RtcpSenderReport)221 pub fn received_sr(&mut self, sr: &RtcpSenderReport) {
222 self.sr_clock_time = utils::current_time();
223
224 self.sr_ntp_lsr = sr.ntp;
225 self.sender_ssrc = sr.ssrc;
226 }
227
received_rtp(&mut self, pkt: RtpPacket)228 pub fn received_rtp(&mut self, pkt: RtpPacket) {
229 if 0 == self.source.update_sequence(pkt.header.seq_number) {
230 return;
231 }
232
233 let rtp_clock = utils::current_time();
234 if self.last_rtp_clock == 0 {
235 self.source.jitter = 0.;
236 } else {
237 let mut d = ((rtp_clock - self.last_rtp_clock) * self.sample_rate as u64 / 1000000)
238 as i64
239 - (pkt.header.timestamp - self.last_rtp_timestamp) as i64;
240
241 if d < 0 {
242 d = -d;
243 }
244 self.source.jitter += (d as f64 - self.source.jitter) / 16.;
245 }
246
247 self.last_rtp_clock = rtp_clock;
248 self.last_rtp_timestamp = pkt.header.timestamp;
249 }
250 }
251