xref: /webrtc/rtp/src/packetizer/mod.rs (revision 04f0bd9e)
1 #[cfg(test)]
2 mod packetizer_test;
3 
4 use crate::error::Result;
5 use crate::{extension::abs_send_time_extension::*, header::*, packet::*, sequence::*};
6 use util::marshal::{Marshal, MarshalSize};
7 
8 use async_trait::async_trait;
9 use bytes::{Bytes, BytesMut};
10 use std::fmt;
11 use std::future::Future;
12 use std::pin::Pin;
13 use std::sync::Arc;
14 use std::time::SystemTime;
15 
16 /// Payloader payloads a byte array for use as rtp.Packet payloads
17 pub trait Payloader: fmt::Debug {
18     fn payload(&mut self, mtu: usize, b: &Bytes) -> Result<Vec<Bytes>>;
19     fn clone_to(&self) -> Box<dyn Payloader + Send + Sync>;
20 }
21 
22 impl Clone for Box<dyn Payloader + Send + Sync> {
23     fn clone(&self) -> Box<dyn Payloader + Send + Sync> {
24         self.clone_to()
25     }
26 }
27 
28 /// Packetizer packetizes a payload
29 #[async_trait]
30 pub trait Packetizer: fmt::Debug {
31     fn enable_abs_send_time(&mut self, value: u8);
32     async fn packetize(&mut self, payload: &Bytes, samples: u32) -> Result<Vec<Packet>>;
33     fn skip_samples(&mut self, skipped_samples: u32);
34     fn clone_to(&self) -> Box<dyn Packetizer + Send + Sync>;
35 }
36 
37 impl Clone for Box<dyn Packetizer + Send + Sync> {
38     fn clone(&self) -> Box<dyn Packetizer + Send + Sync> {
39         self.clone_to()
40     }
41 }
42 
43 /// Depacketizer depacketizes a RTP payload, removing any RTP specific data from the payload
44 pub trait Depacketizer {
45     fn depacketize(&mut self, b: &Bytes) -> Result<Bytes>;
46 
47     /// Checks if the packet is at the beginning of a partition.  This
48     /// should return false if the result could not be determined, in
49     /// which case the caller will detect timestamp discontinuities.
50     fn is_partition_head(&self, payload: &Bytes) -> bool;
51 
52     /// Checks if the packet is at the end of a partition.  This should
53     /// return false if the result could not be determined.
54     fn is_partition_tail(&self, marker: bool, payload: &Bytes) -> bool;
55 }
56 
57 //TODO: SystemTime vs Instant?
58 // non-monotonic clock vs monotonically non-decreasing clock
59 /// FnTimeGen provides current SystemTime
60 pub type FnTimeGen =
61     Arc<dyn (Fn() -> Pin<Box<dyn Future<Output = SystemTime> + Send + 'static>>) + Send + Sync>;
62 
63 #[derive(Clone)]
64 pub(crate) struct PacketizerImpl {
65     pub(crate) mtu: usize,
66     pub(crate) payload_type: u8,
67     pub(crate) ssrc: u32,
68     pub(crate) payloader: Box<dyn Payloader + Send + Sync>,
69     pub(crate) sequencer: Box<dyn Sequencer + Send + Sync>,
70     pub(crate) timestamp: u32,
71     pub(crate) clock_rate: u32,
72     pub(crate) abs_send_time: u8, //http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time
73     pub(crate) time_gen: Option<FnTimeGen>,
74 }
75 
76 impl fmt::Debug for PacketizerImpl {
77     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78         f.debug_struct("PacketizerImpl")
79             .field("mtu", &self.mtu)
80             .field("payload_type", &self.payload_type)
81             .field("ssrc", &self.ssrc)
82             .field("timestamp", &self.timestamp)
83             .field("clock_rate", &self.clock_rate)
84             .field("abs_send_time", &self.abs_send_time)
85             .finish()
86     }
87 }
88 
89 pub fn new_packetizer(
90     mtu: usize,
91     payload_type: u8,
92     ssrc: u32,
93     payloader: Box<dyn Payloader + Send + Sync>,
94     sequencer: Box<dyn Sequencer + Send + Sync>,
95     clock_rate: u32,
96 ) -> impl Packetizer {
97     PacketizerImpl {
98         mtu,
99         payload_type,
100         ssrc,
101         payloader,
102         sequencer,
103         timestamp: rand::random::<u32>(),
104         clock_rate,
105         abs_send_time: 0,
106         time_gen: None,
107     }
108 }
109 
110 #[async_trait]
111 impl Packetizer for PacketizerImpl {
112     fn enable_abs_send_time(&mut self, value: u8) {
113         self.abs_send_time = value
114     }
115 
116     async fn packetize(&mut self, payload: &Bytes, samples: u32) -> Result<Vec<Packet>> {
117         let payloads = self.payloader.payload(self.mtu - 12, payload)?;
118         let payloads_len = payloads.len();
119         let mut packets = Vec::with_capacity(payloads_len);
120         for (i, payload) in payloads.into_iter().enumerate() {
121             packets.push(Packet {
122                 header: Header {
123                     version: 2,
124                     padding: false,
125                     extension: false,
126                     marker: i == payloads_len - 1,
127                     payload_type: self.payload_type,
128                     sequence_number: self.sequencer.next_sequence_number(),
129                     timestamp: self.timestamp, //TODO: Figure out how to do timestamps
130                     ssrc: self.ssrc,
131                     ..Default::default()
132                 },
133                 payload,
134             });
135         }
136 
137         self.timestamp = self.timestamp.wrapping_add(samples);
138 
139         if payloads_len != 0 && self.abs_send_time != 0 {
140             let st = if let Some(fn_time_gen) = &self.time_gen {
141                 fn_time_gen().await
142             } else {
143                 SystemTime::now()
144             };
145             let send_time = AbsSendTimeExtension::new(st);
146             //apply http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time
147             let mut raw = BytesMut::with_capacity(send_time.marshal_size());
148             raw.resize(send_time.marshal_size(), 0);
149             let _ = send_time.marshal_to(&mut raw)?;
150             packets[payloads_len - 1]
151                 .header
152                 .set_extension(self.abs_send_time, raw.freeze())?;
153         }
154 
155         Ok(packets)
156     }
157 
158     /// skip_samples causes a gap in sample count between Packetize requests so the
159     /// RTP payloads produced have a gap in timestamps
160     fn skip_samples(&mut self, skipped_samples: u32) {
161         self.timestamp = self.timestamp.wrapping_add(skipped_samples);
162     }
163 
164     fn clone_to(&self) -> Box<dyn Packetizer + Send + Sync> {
165         Box::new(self.clone())
166     }
167 }
168