xref: /webrtc/rtp/src/packetizer/mod.rs (revision ad555d10)
1 #[cfg(test)]
2 mod packetizer_test;
3 
4 use crate::error::Result;
5 use crate::{extension::abs_send_time_extension::*, header::*, packet::*, sequence::*};
6 use util::marshal::{Marshal, MarshalSize};
7 
8 use bytes::{Bytes, BytesMut};
9 use std::fmt;
10 use std::sync::Arc;
11 use std::time::SystemTime;
12 
13 /// Payloader payloads a byte array for use as rtp.Packet payloads
14 pub trait Payloader: fmt::Debug {
payload(&mut self, mtu: usize, b: &Bytes) -> Result<Vec<Bytes>>15     fn payload(&mut self, mtu: usize, b: &Bytes) -> Result<Vec<Bytes>>;
clone_to(&self) -> Box<dyn Payloader + Send + Sync>16     fn clone_to(&self) -> Box<dyn Payloader + Send + Sync>;
17 }
18 
19 impl Clone for Box<dyn Payloader + Send + Sync> {
clone(&self) -> Box<dyn Payloader + Send + Sync>20     fn clone(&self) -> Box<dyn Payloader + Send + Sync> {
21         self.clone_to()
22     }
23 }
24 
25 /// Packetizer packetizes a payload
26 pub trait Packetizer: fmt::Debug {
enable_abs_send_time(&mut self, value: u8)27     fn enable_abs_send_time(&mut self, value: u8);
packetize(&mut self, payload: &Bytes, samples: u32) -> Result<Vec<Packet>>28     fn packetize(&mut self, payload: &Bytes, samples: u32) -> Result<Vec<Packet>>;
skip_samples(&mut self, skipped_samples: u32)29     fn skip_samples(&mut self, skipped_samples: u32);
clone_to(&self) -> Box<dyn Packetizer + Send + Sync>30     fn clone_to(&self) -> Box<dyn Packetizer + Send + Sync>;
31 }
32 
33 impl Clone for Box<dyn Packetizer + Send + Sync> {
clone(&self) -> Box<dyn Packetizer + Send + Sync>34     fn clone(&self) -> Box<dyn Packetizer + Send + Sync> {
35         self.clone_to()
36     }
37 }
38 
39 /// Depacketizer depacketizes a RTP payload, removing any RTP specific data from the payload
40 pub trait Depacketizer {
depacketize(&mut self, b: &Bytes) -> Result<Bytes>41     fn depacketize(&mut self, b: &Bytes) -> Result<Bytes>;
42 
43     /// Checks if the packet is at the beginning of a partition.  This
44     /// should return false if the result could not be determined, in
45     /// which case the caller will detect timestamp discontinuities.
is_partition_head(&self, payload: &Bytes) -> bool46     fn is_partition_head(&self, payload: &Bytes) -> bool;
47 
48     /// Checks if the packet is at the end of a partition.  This should
49     /// return false if the result could not be determined.
is_partition_tail(&self, marker: bool, payload: &Bytes) -> bool50     fn is_partition_tail(&self, marker: bool, payload: &Bytes) -> bool;
51 }
52 
53 //TODO: SystemTime vs Instant?
54 // non-monotonic clock vs monotonically non-decreasing clock
55 /// FnTimeGen provides current SystemTime
56 pub type FnTimeGen = Arc<dyn (Fn() -> SystemTime) + Send + Sync>;
57 
58 #[derive(Clone)]
59 pub(crate) struct PacketizerImpl {
60     pub(crate) mtu: usize,
61     pub(crate) payload_type: u8,
62     pub(crate) ssrc: u32,
63     pub(crate) payloader: Box<dyn Payloader + Send + Sync>,
64     pub(crate) sequencer: Box<dyn Sequencer + Send + Sync>,
65     pub(crate) timestamp: u32,
66     pub(crate) clock_rate: u32,
67     pub(crate) abs_send_time: u8, //http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time
68     pub(crate) time_gen: Option<FnTimeGen>,
69 }
70 
71 impl fmt::Debug for PacketizerImpl {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result72     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73         f.debug_struct("PacketizerImpl")
74             .field("mtu", &self.mtu)
75             .field("payload_type", &self.payload_type)
76             .field("ssrc", &self.ssrc)
77             .field("timestamp", &self.timestamp)
78             .field("clock_rate", &self.clock_rate)
79             .field("abs_send_time", &self.abs_send_time)
80             .finish()
81     }
82 }
83 
new_packetizer( mtu: usize, payload_type: u8, ssrc: u32, payloader: Box<dyn Payloader + Send + Sync>, sequencer: Box<dyn Sequencer + Send + Sync>, clock_rate: u32, ) -> impl Packetizer84 pub fn new_packetizer(
85     mtu: usize,
86     payload_type: u8,
87     ssrc: u32,
88     payloader: Box<dyn Payloader + Send + Sync>,
89     sequencer: Box<dyn Sequencer + Send + Sync>,
90     clock_rate: u32,
91 ) -> impl Packetizer {
92     PacketizerImpl {
93         mtu,
94         payload_type,
95         ssrc,
96         payloader,
97         sequencer,
98         timestamp: rand::random::<u32>(),
99         clock_rate,
100         abs_send_time: 0,
101         time_gen: None,
102     }
103 }
104 
105 impl Packetizer for PacketizerImpl {
enable_abs_send_time(&mut self, value: u8)106     fn enable_abs_send_time(&mut self, value: u8) {
107         self.abs_send_time = value
108     }
109 
packetize(&mut self, payload: &Bytes, samples: u32) -> Result<Vec<Packet>>110     fn packetize(&mut self, payload: &Bytes, samples: u32) -> Result<Vec<Packet>> {
111         let payloads = self.payloader.payload(self.mtu - 12, payload)?;
112         let payloads_len = payloads.len();
113         let mut packets = Vec::with_capacity(payloads_len);
114         for (i, payload) in payloads.into_iter().enumerate() {
115             packets.push(Packet {
116                 header: Header {
117                     version: 2,
118                     padding: false,
119                     extension: false,
120                     marker: i == payloads_len - 1,
121                     payload_type: self.payload_type,
122                     sequence_number: self.sequencer.next_sequence_number(),
123                     timestamp: self.timestamp, //TODO: Figure out how to do timestamps
124                     ssrc: self.ssrc,
125                     ..Default::default()
126                 },
127                 payload,
128             });
129         }
130 
131         self.timestamp = self.timestamp.wrapping_add(samples);
132 
133         if payloads_len != 0 && self.abs_send_time != 0 {
134             let st = if let Some(fn_time_gen) = &self.time_gen {
135                 fn_time_gen()
136             } else {
137                 SystemTime::now()
138             };
139             let send_time = AbsSendTimeExtension::new(st);
140             //apply http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time
141             let mut raw = BytesMut::with_capacity(send_time.marshal_size());
142             raw.resize(send_time.marshal_size(), 0);
143             let _ = send_time.marshal_to(&mut raw)?;
144             packets[payloads_len - 1]
145                 .header
146                 .set_extension(self.abs_send_time, raw.freeze())?;
147         }
148 
149         Ok(packets)
150     }
151 
152     /// skip_samples causes a gap in sample count between Packetize requests so the
153     /// RTP payloads produced have a gap in timestamps
skip_samples(&mut self, skipped_samples: u32)154     fn skip_samples(&mut self, skipped_samples: u32) {
155         self.timestamp = self.timestamp.wrapping_add(skipped_samples);
156     }
157 
clone_to(&self) -> Box<dyn Packetizer + Send + Sync>158     fn clone_to(&self) -> Box<dyn Packetizer + Send + Sync> {
159         Box::new(self.clone())
160     }
161 }
162