1 #[cfg(test)]
2 mod sample_builder_test;
3 #[cfg(test)]
4 mod sample_sequence_location_test;
5
6 pub mod sample_sequence_location;
7
8 use std::time::{Duration, SystemTime};
9
10 use bytes::Bytes;
11 use rtp::{packet::Packet, packetizer::Depacketizer};
12
13 use crate::Sample;
14
15 use self::sample_sequence_location::{Comparison, SampleSequenceLocation};
16
17 /// SampleBuilder buffers packets until media frames are complete.
18 pub struct SampleBuilder<T: Depacketizer> {
19 /// how many packets to wait until we get a valid Sample
20 max_late: u16,
21 /// max timestamp between old and new timestamps before dropping packets
22 max_late_timestamp: u32,
23 buffer: Vec<Option<Packet>>,
24 prepared_samples: Vec<Option<Sample>>,
25 last_sample_timestamp: Option<u32>,
26
27 /// Interface that allows us to take RTP packets to samples
28 depacketizer: T,
29
30 /// sample_rate allows us to compute duration of media.SamplecA
31 sample_rate: u32,
32
33 /// filled contains the head/tail of the packets inserted into the buffer
34 filled: SampleSequenceLocation,
35
36 /// active contains the active head/tail of the timestamp being actively processed
37 active: SampleSequenceLocation,
38
39 /// prepared contains the samples that have been processed to date
40 prepared: SampleSequenceLocation,
41
42 /// number of packets forced to be dropped
43 dropped_packets: u16,
44
45 /// number of padding packets detected and dropped. This number will be a subset of
46 /// `droppped_packets`
47 padding_packets: u16,
48 }
49
50 impl<T: Depacketizer> SampleBuilder<T> {
51 /// Constructs a new SampleBuilder.
52 /// `max_late` is how long to wait until we can construct a completed [`Sample`].
53 /// `max_late` is measured in RTP packet sequence numbers.
54 /// A large max_late will result in less packet loss but higher latency.
55 /// The depacketizer extracts media samples from RTP packets.
56 /// Several depacketizers are available in package [github.com/pion/rtp/codecs](https://github.com/webrtc-rs/rtp/tree/main/src/codecs).
new(max_late: u16, depacketizer: T, sample_rate: u32) -> Self57 pub fn new(max_late: u16, depacketizer: T, sample_rate: u32) -> Self {
58 Self {
59 max_late,
60 max_late_timestamp: 0,
61 buffer: vec![None; u16::MAX as usize + 1],
62 prepared_samples: (0..=u16::MAX as usize).map(|_| None).collect(),
63 last_sample_timestamp: None,
64 depacketizer,
65 sample_rate,
66 filled: SampleSequenceLocation::new(),
67 active: SampleSequenceLocation::new(),
68 prepared: SampleSequenceLocation::new(),
69 dropped_packets: 0,
70 padding_packets: 0,
71 }
72 }
73
with_max_time_delay(mut self, max_late_duration: Duration) -> Self74 pub fn with_max_time_delay(mut self, max_late_duration: Duration) -> Self {
75 self.max_late_timestamp =
76 (self.sample_rate as u128 * max_late_duration.as_millis() / 1000) as u32;
77 self
78 }
79
too_old(&self, location: &SampleSequenceLocation) -> bool80 fn too_old(&self, location: &SampleSequenceLocation) -> bool {
81 if self.max_late_timestamp == 0 {
82 return false;
83 }
84
85 let mut found_head: Option<u32> = None;
86 let mut found_tail: Option<u32> = None;
87
88 let mut i = location.head;
89 while i != location.tail {
90 if let Some(ref packet) = self.buffer[i as usize] {
91 found_head = Some(packet.header.timestamp);
92 break;
93 }
94 i = i.wrapping_add(1);
95 }
96
97 if found_head.is_none() {
98 return false;
99 }
100
101 let mut i = location.tail.wrapping_sub(1);
102 while i != location.head {
103 if let Some(ref packet) = self.buffer[i as usize] {
104 found_tail = Some(packet.header.timestamp);
105 break;
106 }
107 i = i.wrapping_sub(1);
108 }
109
110 if found_tail.is_none() {
111 return false;
112 }
113
114 found_tail.unwrap() - found_head.unwrap() > self.max_late_timestamp
115 }
116
117 /// Returns the timestamp associated with a given sample location
fetch_timestamp(&self, location: &SampleSequenceLocation) -> Option<u32>118 fn fetch_timestamp(&self, location: &SampleSequenceLocation) -> Option<u32> {
119 if location.empty() {
120 None
121 } else {
122 Some(
123 (self.buffer[location.head as usize])
124 .as_ref()?
125 .header
126 .timestamp,
127 )
128 }
129 }
130
release_packet(&mut self, i: u16)131 fn release_packet(&mut self, i: u16) {
132 self.buffer[i as usize] = None;
133 }
134
135 /// Clears all buffers that have already been consumed by
136 /// popping.
purge_consumed_buffers(&mut self)137 fn purge_consumed_buffers(&mut self) {
138 let active = self.active;
139 self.purge_consumed_location(&active, false);
140 }
141
142 /// Clears all buffers that have already been consumed
143 /// during a sample building method.
purge_consumed_location(&mut self, consume: &SampleSequenceLocation, force_consume: bool)144 fn purge_consumed_location(&mut self, consume: &SampleSequenceLocation, force_consume: bool) {
145 if !self.filled.has_data() {
146 return;
147 }
148 match consume.compare(self.filled.head) {
149 Comparison::Inside if force_consume => {
150 self.release_packet(self.filled.head);
151 self.filled.head = self.filled.head.wrapping_add(1);
152 }
153 Comparison::Before => {
154 self.release_packet(self.filled.head);
155 self.filled.head = self.filled.head.wrapping_add(1);
156 }
157 _ => {}
158 }
159 }
160
161 /// Flushes all buffers that are already consumed or those buffers
162 /// that are too late to consume.
purge_buffers(&mut self)163 fn purge_buffers(&mut self) {
164 self.purge_consumed_buffers();
165
166 while (self.too_old(&self.filled) || (self.filled.count() > self.max_late))
167 && self.filled.has_data()
168 {
169 if self.active.empty() {
170 // refill the active based on the filled packets
171 self.active = self.filled;
172 }
173
174 if self.active.has_data() && (self.active.head == self.filled.head) {
175 // attempt to force the active packet to be consumed even though
176 // outstanding data may be pending arrival
177 let err = match self.build_sample(true) {
178 Ok(_) => continue,
179 Err(e) => e,
180 };
181
182 if !matches!(err, BuildError::InvalidParition(_)) {
183 // In the InvalidParition case `build_sample` will have already adjusted `droppped_packets`.
184 self.dropped_packets += 1;
185 }
186
187 // could not build the sample so drop it
188 self.active.head = self.active.head.wrapping_add(1);
189 }
190
191 self.release_packet(self.filled.head);
192 self.filled.head = self.filled.head.wrapping_add(1);
193 }
194 }
195
196 /// Adds an RTP Packet to self's buffer.
197 ///
198 /// Push does not copy the input. If you wish to reuse
199 /// this memory make sure to copy before calling push
push(&mut self, p: Packet)200 pub fn push(&mut self, p: Packet) {
201 let sequence_number = p.header.sequence_number;
202 self.buffer[sequence_number as usize] = Some(p);
203 match self.filled.compare(sequence_number) {
204 Comparison::Void => {
205 self.filled.head = sequence_number;
206 self.filled.tail = sequence_number.wrapping_add(1);
207 }
208 Comparison::Before => {
209 self.filled.head = sequence_number;
210 }
211 Comparison::After => {
212 self.filled.tail = sequence_number.wrapping_add(1);
213 }
214 _ => {}
215 }
216 self.purge_buffers();
217 }
218
219 /// Creates a sample from a valid collection of RTP Packets by
220 /// walking forwards building a sample if everything looks good clear and
221 /// update buffer+values
build_sample( &mut self, purging_buffers: bool, ) -> Result<SampleSequenceLocation, BuildError>222 fn build_sample(
223 &mut self,
224 purging_buffers: bool,
225 ) -> Result<SampleSequenceLocation, BuildError> {
226 if self.active.empty() {
227 self.active = self.filled;
228 }
229
230 if self.active.empty() {
231 return Err(BuildError::NoActiveSegment);
232 }
233
234 if self.filled.compare(self.active.tail) == Comparison::Inside {
235 self.active.tail = self.filled.tail;
236 }
237
238 let mut consume = SampleSequenceLocation::new();
239
240 let mut i = self.active.head;
241 // `self.active` isn't modified in the loop, fetch the timestamp once and cache it.
242 let head_timestamp = self.fetch_timestamp(&self.active);
243 while let Some(ref packet) = self.buffer[i as usize] {
244 if self.active.compare(i) == Comparison::After {
245 break;
246 }
247 let is_same_timestamp = head_timestamp.map(|t| packet.header.timestamp == t);
248 let is_different_timestamp = is_same_timestamp.map(std::ops::Not::not);
249 let is_partition_tail = self
250 .depacketizer
251 .is_partition_tail(packet.header.marker, &packet.payload);
252
253 // If the timestamp is not the same it might be because the next packet is both a start
254 // and end of the next parition in which case a sample should be generated now. This
255 // can happen when padding packets are used .e.g:
256 //
257 // p1(t=1), p2(t=1), p3(t=1), p4(t=2, marker=true, start=true)
258 //
259 // In thic case the generated sample should be p1 through p3, but excluding p4 which is
260 // its own sample.
261 if is_partition_tail && is_same_timestamp.unwrap_or(true) {
262 consume.head = self.active.head;
263 consume.tail = i.wrapping_add(1);
264 break;
265 }
266
267 if is_different_timestamp.unwrap_or(false) {
268 consume.head = self.active.head;
269 consume.tail = i;
270 break;
271 }
272 i = i.wrapping_add(1);
273 }
274
275 if consume.empty() {
276 return Err(BuildError::NothingToConsume);
277 }
278
279 if !purging_buffers && self.buffer[consume.tail as usize].is_none() {
280 // wait for the next packet after this set of packets to arrive
281 // to ensure at least one post sample timestamp is known
282 // (unless we have to release right now)
283 return Err(BuildError::PendingTimestampPacket);
284 }
285
286 let sample_timestamp = self.fetch_timestamp(&self.active).unwrap_or(0);
287 let mut after_timestamp = sample_timestamp;
288
289 // scan for any packet after the current and use that time stamp as the diff point
290 for i in consume.tail..self.active.tail {
291 if let Some(ref packet) = self.buffer[i as usize] {
292 after_timestamp = packet.header.timestamp;
293 break;
294 }
295 }
296
297 // prior to decoding all the packets, check if this packet
298 // would end being disposed anyway
299 let head_payload = self.buffer[consume.head as usize]
300 .as_ref()
301 .map(|p| &p.payload)
302 .ok_or(BuildError::GapInSegment)?;
303 if !self.depacketizer.is_partition_head(head_payload) {
304 // libWebRTC will sometimes send several empty padding packets to smooth out send
305 // rate. These packets don't carry any media payloads.
306 let is_padding = consume.range(&self.buffer).all(|p| {
307 p.map(|p| {
308 self.last_sample_timestamp == Some(p.header.timestamp) && p.payload.is_empty()
309 })
310 .unwrap_or(false)
311 });
312
313 self.dropped_packets += consume.count();
314 if is_padding {
315 self.padding_packets += consume.count();
316 }
317 self.purge_consumed_location(&consume, true);
318 self.purge_consumed_buffers();
319
320 self.active.head = consume.tail;
321 return Err(BuildError::InvalidParition(consume));
322 }
323
324 // the head set of packets is now fully consumed
325 self.active.head = consume.tail;
326
327 // merge all the buffers into a sample
328 let mut data: Vec<u8> = Vec::new();
329 let mut i = consume.head;
330 while i != consume.tail {
331 let payload = self.buffer[i as usize]
332 .as_ref()
333 .map(|p| &p.payload)
334 .ok_or(BuildError::GapInSegment)?;
335
336 let p = self
337 .depacketizer
338 .depacketize(payload)
339 .map_err(|_| BuildError::DepacketizerFailed)?;
340
341 data.extend_from_slice(&p);
342 i = i.wrapping_add(1);
343 }
344 let samples = after_timestamp - sample_timestamp;
345
346 let sample = Sample {
347 data: Bytes::copy_from_slice(&data),
348 timestamp: SystemTime::now(),
349 duration: Duration::from_secs_f64((samples as f64) / (self.sample_rate as f64)),
350 packet_timestamp: sample_timestamp,
351 prev_dropped_packets: self.dropped_packets,
352 prev_padding_packets: self.padding_packets,
353 };
354
355 self.dropped_packets = 0;
356 self.padding_packets = 0;
357 self.last_sample_timestamp = Some(sample_timestamp);
358
359 self.prepared_samples[self.prepared.tail as usize] = Some(sample);
360 self.prepared.tail = self.prepared.tail.wrapping_add(1);
361
362 self.purge_consumed_location(&consume, true);
363 self.purge_consumed_buffers();
364
365 Ok(consume)
366 }
367
368 /// Compiles pushed RTP packets into media samples and then
369 /// returns the next valid sample (or None if no sample is compiled).
pop(&mut self) -> Option<Sample>370 pub fn pop(&mut self) -> Option<Sample> {
371 let _ = self.build_sample(false);
372
373 if self.prepared.empty() {
374 return None;
375 }
376 let result = std::mem::replace(
377 &mut self.prepared_samples[self.prepared.head as usize],
378 None,
379 );
380 self.prepared.head = self.prepared.head.wrapping_add(1);
381 result
382 }
383
384 /// Compiles pushed RTP packets into media samples and then
385 /// returns the next valid sample with its associated RTP timestamp (or `None` if
386 /// no sample is compiled).
pop_with_timestamp(&mut self) -> Option<(Sample, u32)>387 pub fn pop_with_timestamp(&mut self) -> Option<(Sample, u32)> {
388 if let Some(sample) = self.pop() {
389 let timestamp = sample.packet_timestamp;
390 Some((sample, timestamp))
391 } else {
392 None
393 }
394 }
395 }
396
397 /// Computes the distance between two sequence numbers
398 /*pub(crate) fn seqnum_distance(head: u16, tail: u16) -> u16 {
399 if head > tail {
400 head.wrapping_add(tail)
401 } else {
402 tail - head
403 }
404 }*/
405
seqnum_distance(x: u16, y: u16) -> u16406 pub(crate) fn seqnum_distance(x: u16, y: u16) -> u16 {
407 let diff = x.wrapping_sub(y);
408 if diff > 0xFFFF / 2 {
409 0xFFFF - diff + 1
410 } else {
411 diff
412 }
413 }
414
415 #[derive(Debug)]
416 enum BuildError {
417 /// There's no active segment of RTP packets to consider yet.
418 NoActiveSegment,
419
420 /// No sample partition could be found in the active segment.
421 NothingToConsume,
422
423 /// A segment to consume was identified, but a subsequent packet is needed to determine the
424 /// duration of the sample.
425 PendingTimestampPacket,
426
427 /// The active segment's head was not aligned with a sample parition head. Some packets were
428 /// dropped.
429 InvalidParition(SampleSequenceLocation),
430
431 /// There was a gap in the active segment because of one or more missing RTP packets.
432 GapInSegment,
433
434 /// We failed to depacketize an RTP packet.
435 DepacketizerFailed,
436 }
437