1 #[cfg(test)] 2 mod twcc_test; 3 4 pub mod receiver; 5 pub mod sender; 6 7 use rtcp::transport_feedbacks::transport_layer_cc::{ 8 PacketStatusChunk, RecvDelta, RunLengthChunk, StatusChunkTypeTcc, StatusVectorChunk, 9 SymbolSizeTypeTcc, SymbolTypeTcc, TransportLayerCc, 10 }; 11 use std::cmp::Ordering; 12 13 #[derive(Default, Debug, PartialEq, Clone)] 14 struct PktInfo { 15 sequence_number: u32, 16 arrival_time: i64, 17 } 18 19 /// Recorder records incoming RTP packets and their delays and creates 20 /// transport wide congestion control feedback reports as specified in 21 /// https://datatracker.ietf.org/doc/html/draft-holmer-rmcat-transport-wide-cc-extensions-01 22 #[derive(Default, Debug, PartialEq, Clone)] 23 pub struct Recorder { 24 received_packets: Vec<PktInfo>, 25 26 cycles: u32, 27 last_sequence_number: u16, 28 29 sender_ssrc: u32, 30 media_ssrc: u32, 31 fb_pkt_cnt: u8, 32 } 33 34 impl Recorder { 35 /// new creates a new Recorder which uses the given sender_ssrc in the created 36 /// feedback packets. new(sender_ssrc: u32) -> Self37 pub fn new(sender_ssrc: u32) -> Self { 38 Recorder { 39 sender_ssrc, 40 ..Default::default() 41 } 42 } 43 44 /// record marks a packet with media_ssrc and a transport wide sequence number sequence_number as received at arrival_time. record(&mut self, media_ssrc: u32, sequence_number: u16, arrival_time: i64)45 pub fn record(&mut self, media_ssrc: u32, sequence_number: u16, arrival_time: i64) { 46 self.media_ssrc = media_ssrc; 47 if sequence_number < 0x0fff && self.last_sequence_number > 0xf000 { 48 self.cycles += 1 << 16; 49 } 50 self.received_packets.push(PktInfo { 51 sequence_number: self.cycles | sequence_number as u32, 52 arrival_time, 53 }); 54 self.last_sequence_number = sequence_number; 55 } 56 57 /// build_feedback_packet creates a new RTCP packet containing a TWCC feedback report. build_feedback_packet(&mut self) -> Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>58 pub fn build_feedback_packet(&mut self) -> Vec<Box<dyn rtcp::packet::Packet + Send + Sync>> { 59 if self.received_packets.len() < 2 { 60 return vec![]; 61 } 62 let mut feedback = Feedback::new(self.sender_ssrc, self.media_ssrc, self.fb_pkt_cnt); 63 self.fb_pkt_cnt = self.fb_pkt_cnt.wrapping_add(1); 64 65 self.received_packets 66 .sort_by(|a: &PktInfo, b: &PktInfo| -> Ordering { 67 a.sequence_number.cmp(&b.sequence_number) 68 }); 69 feedback.set_base( 70 (self.received_packets[0].sequence_number & 0xffff) as u16, 71 self.received_packets[0].arrival_time, 72 ); 73 74 let mut pkts = vec![]; 75 for pkt in &self.received_packets { 76 let built = 77 feedback.add_received((pkt.sequence_number & 0xffff) as u16, pkt.arrival_time); 78 if !built { 79 let p: Box<dyn rtcp::packet::Packet + Send + Sync> = Box::new(feedback.get_rtcp()); 80 pkts.push(p); 81 feedback = Feedback::new(self.sender_ssrc, self.media_ssrc, self.fb_pkt_cnt); 82 self.fb_pkt_cnt = self.fb_pkt_cnt.wrapping_add(1); 83 feedback.add_received((pkt.sequence_number & 0xffff) as u16, pkt.arrival_time); 84 } 85 } 86 self.received_packets.clear(); 87 let p: Box<dyn rtcp::packet::Packet + Send + Sync> = Box::new(feedback.get_rtcp()); 88 pkts.push(p); 89 pkts 90 } 91 } 92 93 #[derive(Default, Debug, PartialEq, Clone)] 94 struct Feedback { 95 rtcp: TransportLayerCc, 96 base_sequence_number: u16, 97 ref_timestamp64ms: i64, 98 last_timestamp_us: i64, 99 next_sequence_number: u16, 100 sequence_number_count: u16, 101 len: usize, 102 last_chunk: Chunk, 103 chunks: Vec<PacketStatusChunk>, 104 deltas: Vec<RecvDelta>, 105 } 106 107 impl Feedback { new(sender_ssrc: u32, media_ssrc: u32, fb_pkt_count: u8) -> Self108 fn new(sender_ssrc: u32, media_ssrc: u32, fb_pkt_count: u8) -> Self { 109 Feedback { 110 rtcp: TransportLayerCc { 111 sender_ssrc, 112 media_ssrc, 113 fb_pkt_count, 114 ..Default::default() 115 }, 116 ..Default::default() 117 } 118 } 119 set_base(&mut self, sequence_number: u16, time_us: i64)120 fn set_base(&mut self, sequence_number: u16, time_us: i64) { 121 self.base_sequence_number = sequence_number; 122 self.next_sequence_number = self.base_sequence_number; 123 self.ref_timestamp64ms = time_us / 64000; 124 self.last_timestamp_us = self.ref_timestamp64ms * 64000; 125 } 126 get_rtcp(&mut self) -> TransportLayerCc127 fn get_rtcp(&mut self) -> TransportLayerCc { 128 self.rtcp.packet_status_count = self.sequence_number_count; 129 self.rtcp.reference_time = self.ref_timestamp64ms as u32; 130 self.rtcp.base_sequence_number = self.base_sequence_number; 131 while !self.last_chunk.deltas.is_empty() { 132 self.chunks.push(self.last_chunk.encode()); 133 } 134 self.rtcp.packet_chunks.extend_from_slice(&self.chunks); 135 self.rtcp.recv_deltas = self.deltas.clone(); 136 137 self.rtcp.clone() 138 } 139 add_received(&mut self, sequence_number: u16, timestamp_us: i64) -> bool140 fn add_received(&mut self, sequence_number: u16, timestamp_us: i64) -> bool { 141 let delta_us = timestamp_us - self.last_timestamp_us; 142 let delta250us = delta_us / 250; 143 if delta250us < i16::MIN as i64 || delta250us > i16::MAX as i64 { 144 // delta doesn't fit into 16 bit, need to create new packet 145 return false; 146 } 147 148 while self.next_sequence_number != sequence_number { 149 if !self 150 .last_chunk 151 .can_add(SymbolTypeTcc::PacketNotReceived as u16) 152 { 153 self.chunks.push(self.last_chunk.encode()); 154 } 155 self.last_chunk.add(SymbolTypeTcc::PacketNotReceived as u16); 156 self.sequence_number_count = self.sequence_number_count.wrapping_add(1); 157 self.next_sequence_number = self.next_sequence_number.wrapping_add(1); 158 } 159 160 let recv_delta = if (0..=0xff).contains(&delta250us) { 161 self.len += 1; 162 SymbolTypeTcc::PacketReceivedSmallDelta 163 } else { 164 self.len += 2; 165 SymbolTypeTcc::PacketReceivedLargeDelta 166 }; 167 168 if !self.last_chunk.can_add(recv_delta as u16) { 169 self.chunks.push(self.last_chunk.encode()); 170 } 171 self.last_chunk.add(recv_delta as u16); 172 self.deltas.push(RecvDelta { 173 type_tcc_packet: recv_delta, 174 delta: delta_us, 175 }); 176 self.last_timestamp_us = timestamp_us; 177 self.sequence_number_count = self.sequence_number_count.wrapping_add(1); 178 self.next_sequence_number = self.next_sequence_number.wrapping_add(1); 179 true 180 } 181 } 182 183 const MAX_RUN_LENGTH_CAP: usize = 0x1fff; // 13 bits 184 const MAX_ONE_BIT_CAP: usize = 14; // bits 185 const MAX_TWO_BIT_CAP: usize = 7; // bits 186 187 #[derive(Default, Debug, PartialEq, Clone)] 188 struct Chunk { 189 has_large_delta: bool, 190 has_different_types: bool, 191 deltas: Vec<u16>, 192 } 193 194 impl Chunk { can_add(&self, delta: u16) -> bool195 fn can_add(&self, delta: u16) -> bool { 196 if self.deltas.len() < MAX_TWO_BIT_CAP { 197 return true; 198 } 199 if self.deltas.len() < MAX_ONE_BIT_CAP 200 && !self.has_large_delta 201 && delta != SymbolTypeTcc::PacketReceivedLargeDelta as u16 202 { 203 return true; 204 } 205 if self.deltas.len() < MAX_RUN_LENGTH_CAP 206 && !self.has_different_types 207 && delta == self.deltas[0] 208 { 209 return true; 210 } 211 false 212 } 213 add(&mut self, delta: u16)214 fn add(&mut self, delta: u16) { 215 self.deltas.push(delta); 216 self.has_large_delta = 217 self.has_large_delta || delta == SymbolTypeTcc::PacketReceivedLargeDelta as u16; 218 self.has_different_types = self.has_different_types || delta != self.deltas[0]; 219 } 220 encode(&mut self) -> PacketStatusChunk221 fn encode(&mut self) -> PacketStatusChunk { 222 if !self.has_different_types { 223 let p = PacketStatusChunk::RunLengthChunk(RunLengthChunk { 224 type_tcc: StatusChunkTypeTcc::RunLengthChunk, 225 packet_status_symbol: self.deltas[0].into(), 226 run_length: self.deltas.len() as u16, 227 }); 228 self.reset(); 229 return p; 230 } 231 if self.deltas.len() == MAX_ONE_BIT_CAP { 232 let p = PacketStatusChunk::StatusVectorChunk(StatusVectorChunk { 233 type_tcc: StatusChunkTypeTcc::StatusVectorChunk, 234 symbol_size: SymbolSizeTypeTcc::OneBit, 235 symbol_list: self 236 .deltas 237 .iter() 238 .map(|x| SymbolTypeTcc::from(*x)) 239 .collect::<Vec<SymbolTypeTcc>>(), 240 }); 241 self.reset(); 242 return p; 243 } 244 245 let min_cap = std::cmp::min(MAX_TWO_BIT_CAP, self.deltas.len()); 246 let svc = PacketStatusChunk::StatusVectorChunk(StatusVectorChunk { 247 type_tcc: StatusChunkTypeTcc::StatusVectorChunk, 248 symbol_size: SymbolSizeTypeTcc::TwoBit, 249 symbol_list: self.deltas[..min_cap] 250 .iter() 251 .map(|x| SymbolTypeTcc::from(*x)) 252 .collect::<Vec<SymbolTypeTcc>>(), 253 }); 254 self.deltas.drain(..min_cap); 255 self.has_different_types = false; 256 self.has_large_delta = false; 257 258 if !self.deltas.is_empty() { 259 let tmp = self.deltas[0]; 260 for d in &self.deltas { 261 if tmp != *d { 262 self.has_different_types = true; 263 } 264 if *d == SymbolTypeTcc::PacketReceivedLargeDelta as u16 { 265 self.has_large_delta = true; 266 } 267 } 268 } 269 270 svc 271 } 272 reset(&mut self)273 fn reset(&mut self) { 274 self.deltas = vec![]; 275 self.has_large_delta = false; 276 self.has_different_types = false; 277 } 278 } 279