xref: /webrtc/interceptor/src/twcc/mod.rs (revision 7220f446)
1 #[cfg(test)]
2 mod twcc_test;
3 
4 pub mod receiver;
5 pub mod sender;
6 
7 use rtcp::transport_feedbacks::transport_layer_cc::{
8     PacketStatusChunk, RecvDelta, RunLengthChunk, StatusChunkTypeTcc, StatusVectorChunk,
9     SymbolSizeTypeTcc, SymbolTypeTcc, TransportLayerCc,
10 };
11 use std::cmp::Ordering;
12 
13 #[derive(Default, Debug, PartialEq, Clone)]
14 struct PktInfo {
15     sequence_number: u32,
16     arrival_time: i64,
17 }
18 
19 /// Recorder records incoming RTP packets and their delays and creates
20 /// transport wide congestion control feedback reports as specified in
21 /// https://datatracker.ietf.org/doc/html/draft-holmer-rmcat-transport-wide-cc-extensions-01
22 #[derive(Default, Debug, PartialEq, Clone)]
23 pub struct Recorder {
24     received_packets: Vec<PktInfo>,
25 
26     cycles: u32,
27     last_sequence_number: u16,
28 
29     sender_ssrc: u32,
30     media_ssrc: u32,
31     fb_pkt_cnt: u8,
32 }
33 
34 impl Recorder {
35     /// new creates a new Recorder which uses the given sender_ssrc in the created
36     /// feedback packets.
new(sender_ssrc: u32) -> Self37     pub fn new(sender_ssrc: u32) -> Self {
38         Recorder {
39             sender_ssrc,
40             ..Default::default()
41         }
42     }
43 
44     /// record marks a packet with media_ssrc and a transport wide sequence number sequence_number as received at arrival_time.
record(&mut self, media_ssrc: u32, sequence_number: u16, arrival_time: i64)45     pub fn record(&mut self, media_ssrc: u32, sequence_number: u16, arrival_time: i64) {
46         self.media_ssrc = media_ssrc;
47         if sequence_number < 0x0fff && self.last_sequence_number > 0xf000 {
48             self.cycles += 1 << 16;
49         }
50         self.received_packets.push(PktInfo {
51             sequence_number: self.cycles | sequence_number as u32,
52             arrival_time,
53         });
54         self.last_sequence_number = sequence_number;
55     }
56 
57     /// build_feedback_packet creates a new RTCP packet containing a TWCC feedback report.
build_feedback_packet(&mut self) -> Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>58     pub fn build_feedback_packet(&mut self) -> Vec<Box<dyn rtcp::packet::Packet + Send + Sync>> {
59         if self.received_packets.len() < 2 {
60             return vec![];
61         }
62         let mut feedback = Feedback::new(self.sender_ssrc, self.media_ssrc, self.fb_pkt_cnt);
63         self.fb_pkt_cnt = self.fb_pkt_cnt.wrapping_add(1);
64 
65         self.received_packets
66             .sort_by(|a: &PktInfo, b: &PktInfo| -> Ordering {
67                 a.sequence_number.cmp(&b.sequence_number)
68             });
69         feedback.set_base(
70             (self.received_packets[0].sequence_number & 0xffff) as u16,
71             self.received_packets[0].arrival_time,
72         );
73 
74         let mut pkts = vec![];
75         for pkt in &self.received_packets {
76             let built =
77                 feedback.add_received((pkt.sequence_number & 0xffff) as u16, pkt.arrival_time);
78             if !built {
79                 let p: Box<dyn rtcp::packet::Packet + Send + Sync> = Box::new(feedback.get_rtcp());
80                 pkts.push(p);
81                 feedback = Feedback::new(self.sender_ssrc, self.media_ssrc, self.fb_pkt_cnt);
82                 self.fb_pkt_cnt = self.fb_pkt_cnt.wrapping_add(1);
83                 feedback.add_received((pkt.sequence_number & 0xffff) as u16, pkt.arrival_time);
84             }
85         }
86         self.received_packets.clear();
87         let p: Box<dyn rtcp::packet::Packet + Send + Sync> = Box::new(feedback.get_rtcp());
88         pkts.push(p);
89         pkts
90     }
91 }
92 
93 #[derive(Default, Debug, PartialEq, Clone)]
94 struct Feedback {
95     rtcp: TransportLayerCc,
96     base_sequence_number: u16,
97     ref_timestamp64ms: i64,
98     last_timestamp_us: i64,
99     next_sequence_number: u16,
100     sequence_number_count: u16,
101     len: usize,
102     last_chunk: Chunk,
103     chunks: Vec<PacketStatusChunk>,
104     deltas: Vec<RecvDelta>,
105 }
106 
107 impl Feedback {
new(sender_ssrc: u32, media_ssrc: u32, fb_pkt_count: u8) -> Self108     fn new(sender_ssrc: u32, media_ssrc: u32, fb_pkt_count: u8) -> Self {
109         Feedback {
110             rtcp: TransportLayerCc {
111                 sender_ssrc,
112                 media_ssrc,
113                 fb_pkt_count,
114                 ..Default::default()
115             },
116             ..Default::default()
117         }
118     }
119 
set_base(&mut self, sequence_number: u16, time_us: i64)120     fn set_base(&mut self, sequence_number: u16, time_us: i64) {
121         self.base_sequence_number = sequence_number;
122         self.next_sequence_number = self.base_sequence_number;
123         self.ref_timestamp64ms = time_us / 64000;
124         self.last_timestamp_us = self.ref_timestamp64ms * 64000;
125     }
126 
get_rtcp(&mut self) -> TransportLayerCc127     fn get_rtcp(&mut self) -> TransportLayerCc {
128         self.rtcp.packet_status_count = self.sequence_number_count;
129         self.rtcp.reference_time = self.ref_timestamp64ms as u32;
130         self.rtcp.base_sequence_number = self.base_sequence_number;
131         while !self.last_chunk.deltas.is_empty() {
132             self.chunks.push(self.last_chunk.encode());
133         }
134         self.rtcp.packet_chunks.extend_from_slice(&self.chunks);
135         self.rtcp.recv_deltas = self.deltas.clone();
136 
137         self.rtcp.clone()
138     }
139 
add_received(&mut self, sequence_number: u16, timestamp_us: i64) -> bool140     fn add_received(&mut self, sequence_number: u16, timestamp_us: i64) -> bool {
141         let delta_us = timestamp_us - self.last_timestamp_us;
142         let delta250us = delta_us / 250;
143         if delta250us < i16::MIN as i64 || delta250us > i16::MAX as i64 {
144             // delta doesn't fit into 16 bit, need to create new packet
145             return false;
146         }
147 
148         while self.next_sequence_number != sequence_number {
149             if !self
150                 .last_chunk
151                 .can_add(SymbolTypeTcc::PacketNotReceived as u16)
152             {
153                 self.chunks.push(self.last_chunk.encode());
154             }
155             self.last_chunk.add(SymbolTypeTcc::PacketNotReceived as u16);
156             self.sequence_number_count = self.sequence_number_count.wrapping_add(1);
157             self.next_sequence_number = self.next_sequence_number.wrapping_add(1);
158         }
159 
160         let recv_delta = if (0..=0xff).contains(&delta250us) {
161             self.len += 1;
162             SymbolTypeTcc::PacketReceivedSmallDelta
163         } else {
164             self.len += 2;
165             SymbolTypeTcc::PacketReceivedLargeDelta
166         };
167 
168         if !self.last_chunk.can_add(recv_delta as u16) {
169             self.chunks.push(self.last_chunk.encode());
170         }
171         self.last_chunk.add(recv_delta as u16);
172         self.deltas.push(RecvDelta {
173             type_tcc_packet: recv_delta,
174             delta: delta_us,
175         });
176         self.last_timestamp_us = timestamp_us;
177         self.sequence_number_count = self.sequence_number_count.wrapping_add(1);
178         self.next_sequence_number = self.next_sequence_number.wrapping_add(1);
179         true
180     }
181 }
182 
183 const MAX_RUN_LENGTH_CAP: usize = 0x1fff; // 13 bits
184 const MAX_ONE_BIT_CAP: usize = 14; // bits
185 const MAX_TWO_BIT_CAP: usize = 7; // bits
186 
187 #[derive(Default, Debug, PartialEq, Clone)]
188 struct Chunk {
189     has_large_delta: bool,
190     has_different_types: bool,
191     deltas: Vec<u16>,
192 }
193 
194 impl Chunk {
can_add(&self, delta: u16) -> bool195     fn can_add(&self, delta: u16) -> bool {
196         if self.deltas.len() < MAX_TWO_BIT_CAP {
197             return true;
198         }
199         if self.deltas.len() < MAX_ONE_BIT_CAP
200             && !self.has_large_delta
201             && delta != SymbolTypeTcc::PacketReceivedLargeDelta as u16
202         {
203             return true;
204         }
205         if self.deltas.len() < MAX_RUN_LENGTH_CAP
206             && !self.has_different_types
207             && delta == self.deltas[0]
208         {
209             return true;
210         }
211         false
212     }
213 
add(&mut self, delta: u16)214     fn add(&mut self, delta: u16) {
215         self.deltas.push(delta);
216         self.has_large_delta =
217             self.has_large_delta || delta == SymbolTypeTcc::PacketReceivedLargeDelta as u16;
218         self.has_different_types = self.has_different_types || delta != self.deltas[0];
219     }
220 
encode(&mut self) -> PacketStatusChunk221     fn encode(&mut self) -> PacketStatusChunk {
222         if !self.has_different_types {
223             let p = PacketStatusChunk::RunLengthChunk(RunLengthChunk {
224                 type_tcc: StatusChunkTypeTcc::RunLengthChunk,
225                 packet_status_symbol: self.deltas[0].into(),
226                 run_length: self.deltas.len() as u16,
227             });
228             self.reset();
229             return p;
230         }
231         if self.deltas.len() == MAX_ONE_BIT_CAP {
232             let p = PacketStatusChunk::StatusVectorChunk(StatusVectorChunk {
233                 type_tcc: StatusChunkTypeTcc::StatusVectorChunk,
234                 symbol_size: SymbolSizeTypeTcc::OneBit,
235                 symbol_list: self
236                     .deltas
237                     .iter()
238                     .map(|x| SymbolTypeTcc::from(*x))
239                     .collect::<Vec<SymbolTypeTcc>>(),
240             });
241             self.reset();
242             return p;
243         }
244 
245         let min_cap = std::cmp::min(MAX_TWO_BIT_CAP, self.deltas.len());
246         let svc = PacketStatusChunk::StatusVectorChunk(StatusVectorChunk {
247             type_tcc: StatusChunkTypeTcc::StatusVectorChunk,
248             symbol_size: SymbolSizeTypeTcc::TwoBit,
249             symbol_list: self.deltas[..min_cap]
250                 .iter()
251                 .map(|x| SymbolTypeTcc::from(*x))
252                 .collect::<Vec<SymbolTypeTcc>>(),
253         });
254         self.deltas.drain(..min_cap);
255         self.has_different_types = false;
256         self.has_large_delta = false;
257 
258         if !self.deltas.is_empty() {
259             let tmp = self.deltas[0];
260             for d in &self.deltas {
261                 if tmp != *d {
262                     self.has_different_types = true;
263                 }
264                 if *d == SymbolTypeTcc::PacketReceivedLargeDelta as u16 {
265                     self.has_large_delta = true;
266                 }
267             }
268         }
269 
270         svc
271     }
272 
reset(&mut self)273     fn reset(&mut self) {
274         self.deltas = vec![];
275         self.has_large_delta = false;
276         self.has_different_types = false;
277     }
278 }
279