xref: /webrtc/media/src/io/sample_builder/mod.rs (revision d897e473)
1 #[cfg(test)]
2 mod sample_builder_test;
3 #[cfg(test)]
4 mod sample_sequence_location_test;
5 
6 pub mod sample_sequence_location;
7 
8 use std::time::{Duration, SystemTime};
9 
10 use bytes::Bytes;
11 use rtp::{packet::Packet, packetizer::Depacketizer};
12 
13 use crate::Sample;
14 
15 use self::sample_sequence_location::{Comparison, SampleSequenceLocation};
16 
17 /// SampleBuilder buffers packets until media frames are complete.
18 pub struct SampleBuilder<T: Depacketizer> {
19     /// how many packets to wait until we get a valid Sample
20     max_late: u16,
21     /// max timestamp between old and new timestamps before dropping packets
22     max_late_timestamp: u32,
23     buffer: Vec<Option<Packet>>,
24     prepared_samples: Vec<Option<Sample>>,
25     last_sample_timestamp: Option<u32>,
26 
27     /// Interface that allows us to take RTP packets to samples
28     depacketizer: T,
29 
30     /// sample_rate allows us to compute duration of media.SamplecA
31     sample_rate: u32,
32 
33     /// filled contains the head/tail of the packets inserted into the buffer
34     filled: SampleSequenceLocation,
35 
36     /// active contains the active head/tail of the timestamp being actively processed
37     active: SampleSequenceLocation,
38 
39     /// prepared contains the samples that have been processed to date
40     prepared: SampleSequenceLocation,
41 
42     /// number of packets forced to be dropped
43     dropped_packets: u16,
44 
45     /// number of padding packets detected and dropped. This number will be a subset of
46     /// `droppped_packets`
47     padding_packets: u16,
48 }
49 
50 impl<T: Depacketizer> SampleBuilder<T> {
51     /// Constructs a new SampleBuilder.
52     /// `max_late` is how long to wait until we can construct a completed [`Sample`].
53     /// `max_late` is measured in RTP packet sequence numbers.
54     /// A large max_late will result in less packet loss but higher latency.
55     /// The depacketizer extracts media samples from RTP packets.
56     /// Several depacketizers are available in package [github.com/pion/rtp/codecs](https://github.com/webrtc-rs/rtp/tree/main/src/codecs).
new(max_late: u16, depacketizer: T, sample_rate: u32) -> Self57     pub fn new(max_late: u16, depacketizer: T, sample_rate: u32) -> Self {
58         Self {
59             max_late,
60             max_late_timestamp: 0,
61             buffer: vec![None; u16::MAX as usize + 1],
62             prepared_samples: (0..=u16::MAX as usize).map(|_| None).collect(),
63             last_sample_timestamp: None,
64             depacketizer,
65             sample_rate,
66             filled: SampleSequenceLocation::new(),
67             active: SampleSequenceLocation::new(),
68             prepared: SampleSequenceLocation::new(),
69             dropped_packets: 0,
70             padding_packets: 0,
71         }
72     }
73 
with_max_time_delay(mut self, max_late_duration: Duration) -> Self74     pub fn with_max_time_delay(mut self, max_late_duration: Duration) -> Self {
75         self.max_late_timestamp =
76             (self.sample_rate as u128 * max_late_duration.as_millis() / 1000) as u32;
77         self
78     }
79 
too_old(&self, location: &SampleSequenceLocation) -> bool80     fn too_old(&self, location: &SampleSequenceLocation) -> bool {
81         if self.max_late_timestamp == 0 {
82             return false;
83         }
84 
85         let mut found_head: Option<u32> = None;
86         let mut found_tail: Option<u32> = None;
87 
88         let mut i = location.head;
89         while i != location.tail {
90             if let Some(ref packet) = self.buffer[i as usize] {
91                 found_head = Some(packet.header.timestamp);
92                 break;
93             }
94             i = i.wrapping_add(1);
95         }
96 
97         if found_head.is_none() {
98             return false;
99         }
100 
101         let mut i = location.tail.wrapping_sub(1);
102         while i != location.head {
103             if let Some(ref packet) = self.buffer[i as usize] {
104                 found_tail = Some(packet.header.timestamp);
105                 break;
106             }
107             i = i.wrapping_sub(1);
108         }
109 
110         if found_tail.is_none() {
111             return false;
112         }
113 
114         found_tail.unwrap() - found_head.unwrap() > self.max_late_timestamp
115     }
116 
117     /// Returns the timestamp associated with a given sample location
fetch_timestamp(&self, location: &SampleSequenceLocation) -> Option<u32>118     fn fetch_timestamp(&self, location: &SampleSequenceLocation) -> Option<u32> {
119         if location.empty() {
120             None
121         } else {
122             Some(
123                 (self.buffer[location.head as usize])
124                     .as_ref()?
125                     .header
126                     .timestamp,
127             )
128         }
129     }
130 
release_packet(&mut self, i: u16)131     fn release_packet(&mut self, i: u16) {
132         self.buffer[i as usize] = None;
133     }
134 
135     /// Clears all buffers that have already been consumed by
136     /// popping.
purge_consumed_buffers(&mut self)137     fn purge_consumed_buffers(&mut self) {
138         let active = self.active;
139         self.purge_consumed_location(&active, false);
140     }
141 
142     /// Clears all buffers that have already been consumed
143     /// during a sample building method.
purge_consumed_location(&mut self, consume: &SampleSequenceLocation, force_consume: bool)144     fn purge_consumed_location(&mut self, consume: &SampleSequenceLocation, force_consume: bool) {
145         if !self.filled.has_data() {
146             return;
147         }
148         match consume.compare(self.filled.head) {
149             Comparison::Inside if force_consume => {
150                 self.release_packet(self.filled.head);
151                 self.filled.head = self.filled.head.wrapping_add(1);
152             }
153             Comparison::Before => {
154                 self.release_packet(self.filled.head);
155                 self.filled.head = self.filled.head.wrapping_add(1);
156             }
157             _ => {}
158         }
159     }
160 
161     /// Flushes all buffers that are already consumed or those buffers
162     /// that are too late to consume.
purge_buffers(&mut self)163     fn purge_buffers(&mut self) {
164         self.purge_consumed_buffers();
165 
166         while (self.too_old(&self.filled) || (self.filled.count() > self.max_late))
167             && self.filled.has_data()
168         {
169             if self.active.empty() {
170                 // refill the active based on the filled packets
171                 self.active = self.filled;
172             }
173 
174             if self.active.has_data() && (self.active.head == self.filled.head) {
175                 // attempt to force the active packet to be consumed even though
176                 // outstanding data may be pending arrival
177                 let err = match self.build_sample(true) {
178                     Ok(_) => continue,
179                     Err(e) => e,
180                 };
181 
182                 if !matches!(err, BuildError::InvalidParition(_)) {
183                     // In the InvalidParition case `build_sample` will have already adjusted `droppped_packets`.
184                     self.dropped_packets += 1;
185                 }
186 
187                 // could not build the sample so drop it
188                 self.active.head = self.active.head.wrapping_add(1);
189             }
190 
191             self.release_packet(self.filled.head);
192             self.filled.head = self.filled.head.wrapping_add(1);
193         }
194     }
195 
196     /// Adds an RTP Packet to self's buffer.
197     ///
198     /// Push does not copy the input. If you wish to reuse
199     /// this memory make sure to copy before calling push
push(&mut self, p: Packet)200     pub fn push(&mut self, p: Packet) {
201         let sequence_number = p.header.sequence_number;
202         self.buffer[sequence_number as usize] = Some(p);
203         match self.filled.compare(sequence_number) {
204             Comparison::Void => {
205                 self.filled.head = sequence_number;
206                 self.filled.tail = sequence_number.wrapping_add(1);
207             }
208             Comparison::Before => {
209                 self.filled.head = sequence_number;
210             }
211             Comparison::After => {
212                 self.filled.tail = sequence_number.wrapping_add(1);
213             }
214             _ => {}
215         }
216         self.purge_buffers();
217     }
218 
219     /// Creates a sample from a valid collection of RTP Packets by
220     /// walking forwards building a sample if everything looks good clear and
221     /// update buffer+values
build_sample( &mut self, purging_buffers: bool, ) -> Result<SampleSequenceLocation, BuildError>222     fn build_sample(
223         &mut self,
224         purging_buffers: bool,
225     ) -> Result<SampleSequenceLocation, BuildError> {
226         if self.active.empty() {
227             self.active = self.filled;
228         }
229 
230         if self.active.empty() {
231             return Err(BuildError::NoActiveSegment);
232         }
233 
234         if self.filled.compare(self.active.tail) == Comparison::Inside {
235             self.active.tail = self.filled.tail;
236         }
237 
238         let mut consume = SampleSequenceLocation::new();
239 
240         let mut i = self.active.head;
241         // `self.active` isn't modified in the loop, fetch the timestamp once and cache it.
242         let head_timestamp = self.fetch_timestamp(&self.active);
243         while let Some(ref packet) = self.buffer[i as usize] {
244             if self.active.compare(i) == Comparison::After {
245                 break;
246             }
247             let is_same_timestamp = head_timestamp.map(|t| packet.header.timestamp == t);
248             let is_different_timestamp = is_same_timestamp.map(std::ops::Not::not);
249             let is_partition_tail = self
250                 .depacketizer
251                 .is_partition_tail(packet.header.marker, &packet.payload);
252 
253             // If the timestamp is not the same it might be because the next packet is both a start
254             // and end of the next parition in which case a sample should be generated now. This
255             // can happen when padding packets are used .e.g:
256             //
257             // p1(t=1), p2(t=1), p3(t=1), p4(t=2, marker=true, start=true)
258             //
259             // In thic case the generated sample should be p1 through p3, but excluding p4 which is
260             // its own sample.
261             if is_partition_tail && is_same_timestamp.unwrap_or(true) {
262                 consume.head = self.active.head;
263                 consume.tail = i.wrapping_add(1);
264                 break;
265             }
266 
267             if is_different_timestamp.unwrap_or(false) {
268                 consume.head = self.active.head;
269                 consume.tail = i;
270                 break;
271             }
272             i = i.wrapping_add(1);
273         }
274 
275         if consume.empty() {
276             return Err(BuildError::NothingToConsume);
277         }
278 
279         if !purging_buffers && self.buffer[consume.tail as usize].is_none() {
280             // wait for the next packet after this set of packets to arrive
281             // to ensure at least one post sample timestamp is known
282             // (unless we have to release right now)
283             return Err(BuildError::PendingTimestampPacket);
284         }
285 
286         let sample_timestamp = self.fetch_timestamp(&self.active).unwrap_or(0);
287         let mut after_timestamp = sample_timestamp;
288 
289         // scan for any packet after the current and use that time stamp as the diff point
290         for i in consume.tail..self.active.tail {
291             if let Some(ref packet) = self.buffer[i as usize] {
292                 after_timestamp = packet.header.timestamp;
293                 break;
294             }
295         }
296 
297         // prior to decoding all the packets, check if this packet
298         // would end being disposed anyway
299         let head_payload = self.buffer[consume.head as usize]
300             .as_ref()
301             .map(|p| &p.payload)
302             .ok_or(BuildError::GapInSegment)?;
303         if !self.depacketizer.is_partition_head(head_payload) {
304             // libWebRTC will sometimes send several empty padding packets to smooth out send
305             // rate. These packets don't carry any media payloads.
306             let is_padding = consume.range(&self.buffer).all(|p| {
307                 p.map(|p| {
308                     self.last_sample_timestamp == Some(p.header.timestamp) && p.payload.is_empty()
309                 })
310                 .unwrap_or(false)
311             });
312 
313             self.dropped_packets += consume.count();
314             if is_padding {
315                 self.padding_packets += consume.count();
316             }
317             self.purge_consumed_location(&consume, true);
318             self.purge_consumed_buffers();
319 
320             self.active.head = consume.tail;
321             return Err(BuildError::InvalidParition(consume));
322         }
323 
324         // the head set of packets is now fully consumed
325         self.active.head = consume.tail;
326 
327         // merge all the buffers into a sample
328         let mut data: Vec<u8> = Vec::new();
329         let mut i = consume.head;
330         while i != consume.tail {
331             let payload = self.buffer[i as usize]
332                 .as_ref()
333                 .map(|p| &p.payload)
334                 .ok_or(BuildError::GapInSegment)?;
335 
336             let p = self
337                 .depacketizer
338                 .depacketize(payload)
339                 .map_err(|_| BuildError::DepacketizerFailed)?;
340 
341             data.extend_from_slice(&p);
342             i = i.wrapping_add(1);
343         }
344         let samples = after_timestamp - sample_timestamp;
345 
346         let sample = Sample {
347             data: Bytes::copy_from_slice(&data),
348             timestamp: SystemTime::now(),
349             duration: Duration::from_secs_f64((samples as f64) / (self.sample_rate as f64)),
350             packet_timestamp: sample_timestamp,
351             prev_dropped_packets: self.dropped_packets,
352             prev_padding_packets: self.padding_packets,
353         };
354 
355         self.dropped_packets = 0;
356         self.padding_packets = 0;
357         self.last_sample_timestamp = Some(sample_timestamp);
358 
359         self.prepared_samples[self.prepared.tail as usize] = Some(sample);
360         self.prepared.tail = self.prepared.tail.wrapping_add(1);
361 
362         self.purge_consumed_location(&consume, true);
363         self.purge_consumed_buffers();
364 
365         Ok(consume)
366     }
367 
368     /// Compiles pushed RTP packets into media samples and then
369     /// returns the next valid sample (or None if no sample is compiled).
pop(&mut self) -> Option<Sample>370     pub fn pop(&mut self) -> Option<Sample> {
371         let _ = self.build_sample(false);
372 
373         if self.prepared.empty() {
374             return None;
375         }
376         let result = std::mem::replace(
377             &mut self.prepared_samples[self.prepared.head as usize],
378             None,
379         );
380         self.prepared.head = self.prepared.head.wrapping_add(1);
381         result
382     }
383 
384     /// Compiles pushed RTP packets into media samples and then
385     /// returns the next valid sample with its associated RTP timestamp (or `None` if
386     /// no sample is compiled).
pop_with_timestamp(&mut self) -> Option<(Sample, u32)>387     pub fn pop_with_timestamp(&mut self) -> Option<(Sample, u32)> {
388         if let Some(sample) = self.pop() {
389             let timestamp = sample.packet_timestamp;
390             Some((sample, timestamp))
391         } else {
392             None
393         }
394     }
395 }
396 
397 /// Computes the distance between two sequence numbers
398 /*pub(crate) fn seqnum_distance(head: u16, tail: u16) -> u16 {
399     if head > tail {
400         head.wrapping_add(tail)
401     } else {
402         tail - head
403     }
404 }*/
405 
seqnum_distance(x: u16, y: u16) -> u16406 pub(crate) fn seqnum_distance(x: u16, y: u16) -> u16 {
407     let diff = x.wrapping_sub(y);
408     if diff > 0xFFFF / 2 {
409         0xFFFF - diff + 1
410     } else {
411         diff
412     }
413 }
414 
415 #[derive(Debug)]
416 enum BuildError {
417     /// There's no active segment of RTP packets to consider yet.
418     NoActiveSegment,
419 
420     /// No sample partition could be found in the active segment.
421     NothingToConsume,
422 
423     /// A segment to consume was identified, but a subsequent packet is needed to determine the
424     /// duration of the sample.
425     PendingTimestampPacket,
426 
427     /// The active segment's head was not aligned with a sample parition head. Some packets were
428     /// dropped.
429     InvalidParition(SampleSequenceLocation),
430 
431     /// There was a gap in the active segment because of one or more missing RTP packets.
432     GapInSegment,
433 
434     /// We failed to depacketize an RTP packet.
435     DepacketizerFailed,
436 }
437