xref: /xiu/protocol/rtsp/src/rtp/rtcp/rtcp_context.rs (revision b36cf5da)
1 use crate::rtp::utils;
2 use crate::rtp::RtpPacket;
3 use bytes::BytesMut;
4 
5 use super::{
6     rtcp_app::RtcpApp,
7     rtcp_bye::RtcpBye,
8     rtcp_header::RtcpHeader,
9     rtcp_rr::{ReportBlock, RtcpReceiverReport},
10     rtcp_sr::RtcpSenderReport,
11     RTCP_RR,
12 };
13 
14 //For example: sequence numbers inserted are 65533, 65534, the new coming one is 2,
15 //the new is 2 and old is 65534, the distance between 2 and 65534 is 4 which is
16 //65535 - 65534 + 2 + 1.(65533,65534,65535,0,1,2)
distance(new: u16, old: u16) -> u1617 pub fn distance(new: u16, old: u16) -> u16 {
18     if new < old {
19         65535 - old + new + 1
20     } else {
21         new - old
22     }
23 }
24 
25 const MIN_SEQUENTIAL: u32 = 2;
26 const RTP_SEQ_MOD: u32 = 1 << 16;
27 const MAX_DROPOUT: u32 = 3000;
28 const MAX_MISORDER: u32 = 100;
29 
30 /*
31  * Per-source state information
32  */
33 #[derive(Debug, Clone, Default)]
34 struct RtcpSource {
35     max_seq: u16,        /* highest seq. number seen */
36     cycles: u32,         /* shifted count of seq. number cycles */
37     base_seq: u32,       /* base seq number */
38     bad_seq: u32,        /* last 'bad' seq number + 1 */
39     probation: u32,      /* sequ. packets till source is valid */
40     received: u32,       /* packets received */
41     expected_prior: u32, /* packet expected at last interval */
42     received_prior: u32, /* packet received at last interval */
43     jitter: f64,         /* estimated jitter */
44 }
45 
46 impl RtcpSource {
47     //static int rtp_seq_update(struct rtp_member *sender, uint16_t seq)
update_sequence(&mut self, seq: u16) -> usize48     fn update_sequence(&mut self, seq: u16) -> usize {
49         let delta = distance(seq, self.max_seq);
50 
51         if self.probation > 0 {
52             /* packet is in sequence */
53             if seq == self.max_seq + 1 {
54                 self.probation -= 1;
55                 self.max_seq = seq;
56                 if self.probation == 0 {
57                     self.init_seq(seq);
58                     self.received += 1;
59                     return 1;
60                 }
61             } else {
62                 self.probation = MIN_SEQUENTIAL - 1;
63                 self.max_seq = seq;
64             }
65             return 0;
66         } else if delta < MAX_DROPOUT as u16 {
67             /* in order, with permissible gap */
68             if seq < self.max_seq {
69                 /*
70                  * Sequence number wrapped - count another 64K cycle.
71                  */
72                 self.cycles += RTP_SEQ_MOD;
73             }
74             self.max_seq = seq;
75         } else if delta as u32 <= RTP_SEQ_MOD - MAX_MISORDER {
76             if seq == self.bad_seq as u16 {
77                 /*
78                  * Two sequential packets -- assume that the other side
79                  * restarted without telling us so just re-sync
80                  * (i.e., pretend this was the first packet).
81                  */
82                 self.init_seq(seq);
83             } else {
84                 self.bad_seq = (seq as u32 + 1) & (RTP_SEQ_MOD - 1);
85                 return 0;
86             }
87         } else {
88             /* duplicate or reordered packet */
89         }
90         self.received += 1;
91 
92         1
93     }
94 
init_seq(&mut self, seq: u16)95     fn init_seq(&mut self, seq: u16) {
96         self.base_seq = seq as u32;
97         self.max_seq = seq;
98         self.bad_seq = RTP_SEQ_MOD + 1; /* so seq == bad_seq is false */
99         self.cycles = 0;
100         self.received = 0;
101         self.received_prior = 0;
102         self.expected_prior = 0;
103         /* other initialization */
104 
105         self.base_seq = seq as u32;
106         self.max_seq = seq;
107         self.bad_seq = RTP_SEQ_MOD + 1;
108     }
109 }
110 
111 #[derive(Debug, Clone, Default)]
112 pub struct RtcpContext {
113     ssrc: u32,
114     sender_ssrc: u32,
115 
116     sr_ntp_lsr: u64,
117     sr_clock_time: u64,
118     last_rtp_clock: u64,
119     last_rtp_timestamp: u32,
120     sample_rate: u32,
121     send_bytes: u64,
122     send_packets: u64,
123 
124     source: RtcpSource,
125 }
126 
127 impl RtcpContext {
new(ssrc: u32, seq: u16, sample_rate: u32) -> Self128     pub fn new(ssrc: u32, seq: u16, sample_rate: u32) -> Self {
129         RtcpContext {
130             ssrc,
131             source: RtcpSource {
132                 max_seq: seq - 1,
133                 probation: MIN_SEQUENTIAL,
134                 ..Default::default()
135             },
136 
137             sample_rate,
138             ..Default::default()
139         }
140     }
141 
generate_app(&self, name: String, data: BytesMut) -> RtcpApp142     pub fn generate_app(&self, name: String, data: BytesMut) -> RtcpApp {
143         let mut buf = BytesMut::with_capacity(name.len());
144         buf.extend_from_slice(name.as_bytes());
145 
146         RtcpApp {
147             ssrc: self.ssrc,
148             name: buf,
149             app_data: data,
150             ..Default::default()
151         }
152     }
153 
generate_bye(&self) -> RtcpBye154     pub fn generate_bye(&self) -> RtcpBye {
155         let ssrss = vec![self.ssrc];
156         RtcpBye {
157             header: RtcpHeader {
158                 report_count: 1,
159                 ..Default::default()
160             },
161             ssrss,
162             ..Default::default()
163         }
164     }
165 
166     //int rtcp_report_block(struct rtp_member* sender, uint8_t* ptr, int bytes)
gen_report_block(&mut self) -> ReportBlock167     fn gen_report_block(&mut self) -> ReportBlock {
168         let extend_max = self.source.cycles + self.source.max_seq as u32;
169         let expected = extend_max - self.source.base_seq + 1;
170         let lost = expected - self.source.received;
171         let expected_interval = expected - self.source.expected_prior;
172         self.source.expected_prior = expected;
173 
174         let received_interval = self.source.received - self.source.received_prior;
175         self.source.received_prior = self.source.received;
176         let lost_interval = expected_interval as i64 - received_interval as i64;
177 
178         let fraction = if expected_interval == 0 || lost_interval < 0 {
179             0
180         } else {
181             ((lost_interval as u32) << 8) / expected_interval
182         };
183 
184         let delay = utils::current_time() - self.sr_clock_time;
185         let lsr = self.sr_ntp_lsr >> 8 & 0xFFFFFFFF;
186         let dlsr = (delay as f64 / 1000000. * 65535.) as u32;
187 
188         ReportBlock {
189             cumutlative_num_of_packets_lost: lost,
190             fraction_lost: fraction as u8,
191             extended_highest_seq_number: extend_max,
192             lsr: lsr as u32,
193             dlsr,
194             ssrc: self.sender_ssrc,
195             jitter: self.source.jitter as u32,
196         }
197     }
198 
generate_rr(&mut self) -> RtcpReceiverReport199     pub fn generate_rr(&mut self) -> RtcpReceiverReport {
200         let block = self.gen_report_block();
201 
202         RtcpReceiverReport {
203             header: RtcpHeader {
204                 payload_type: RTCP_RR,
205                 report_count: 1,
206                 version: 2,
207                 length: (4 + 24) / 4,
208                 ..Default::default()
209             },
210             report_blocks: vec![block],
211             ssrc: self.ssrc,
212         }
213     }
214 
send_rtp(&mut self, pkt: RtpPacket)215     pub fn send_rtp(&mut self, pkt: RtpPacket) {
216         self.send_bytes += pkt.payload.len() as u64;
217         self.send_packets += 1;
218         self.last_rtp_timestamp = pkt.header.timestamp;
219     }
220 
received_sr(&mut self, sr: &RtcpSenderReport)221     pub fn received_sr(&mut self, sr: &RtcpSenderReport) {
222         self.sr_clock_time = utils::current_time();
223 
224         self.sr_ntp_lsr = sr.ntp;
225         self.sender_ssrc = sr.ssrc;
226     }
227 
received_rtp(&mut self, pkt: RtpPacket)228     pub fn received_rtp(&mut self, pkt: RtpPacket) {
229         if 0 == self.source.update_sequence(pkt.header.seq_number) {
230             return;
231         }
232 
233         let rtp_clock = utils::current_time();
234         if self.last_rtp_clock == 0 {
235             self.source.jitter = 0.;
236         } else {
237             let mut d = ((rtp_clock - self.last_rtp_clock) * self.sample_rate as u64 / 1000000)
238                 as i64
239                 - (pkt.header.timestamp - self.last_rtp_timestamp) as i64;
240 
241             if d < 0 {
242                 d = -d;
243             }
244             self.source.jitter += (d as f64 - self.source.jitter) / 16.;
245         }
246 
247         self.last_rtp_clock = rtp_clock;
248         self.last_rtp_timestamp = pkt.header.timestamp;
249     }
250 }
251