1 #[cfg(test)]
2 mod packetizer_test;
3
4 use crate::error::Result;
5 use crate::{extension::abs_send_time_extension::*, header::*, packet::*, sequence::*};
6 use util::marshal::{Marshal, MarshalSize};
7
8 use bytes::{Bytes, BytesMut};
9 use std::fmt;
10 use std::sync::Arc;
11 use std::time::SystemTime;
12
13 /// Payloader payloads a byte array for use as rtp.Packet payloads
14 pub trait Payloader: fmt::Debug {
payload(&mut self, mtu: usize, b: &Bytes) -> Result<Vec<Bytes>>15 fn payload(&mut self, mtu: usize, b: &Bytes) -> Result<Vec<Bytes>>;
clone_to(&self) -> Box<dyn Payloader + Send + Sync>16 fn clone_to(&self) -> Box<dyn Payloader + Send + Sync>;
17 }
18
19 impl Clone for Box<dyn Payloader + Send + Sync> {
clone(&self) -> Box<dyn Payloader + Send + Sync>20 fn clone(&self) -> Box<dyn Payloader + Send + Sync> {
21 self.clone_to()
22 }
23 }
24
25 /// Packetizer packetizes a payload
26 pub trait Packetizer: fmt::Debug {
enable_abs_send_time(&mut self, value: u8)27 fn enable_abs_send_time(&mut self, value: u8);
packetize(&mut self, payload: &Bytes, samples: u32) -> Result<Vec<Packet>>28 fn packetize(&mut self, payload: &Bytes, samples: u32) -> Result<Vec<Packet>>;
skip_samples(&mut self, skipped_samples: u32)29 fn skip_samples(&mut self, skipped_samples: u32);
clone_to(&self) -> Box<dyn Packetizer + Send + Sync>30 fn clone_to(&self) -> Box<dyn Packetizer + Send + Sync>;
31 }
32
33 impl Clone for Box<dyn Packetizer + Send + Sync> {
clone(&self) -> Box<dyn Packetizer + Send + Sync>34 fn clone(&self) -> Box<dyn Packetizer + Send + Sync> {
35 self.clone_to()
36 }
37 }
38
39 /// Depacketizer depacketizes a RTP payload, removing any RTP specific data from the payload
40 pub trait Depacketizer {
depacketize(&mut self, b: &Bytes) -> Result<Bytes>41 fn depacketize(&mut self, b: &Bytes) -> Result<Bytes>;
42
43 /// Checks if the packet is at the beginning of a partition. This
44 /// should return false if the result could not be determined, in
45 /// which case the caller will detect timestamp discontinuities.
is_partition_head(&self, payload: &Bytes) -> bool46 fn is_partition_head(&self, payload: &Bytes) -> bool;
47
48 /// Checks if the packet is at the end of a partition. This should
49 /// return false if the result could not be determined.
is_partition_tail(&self, marker: bool, payload: &Bytes) -> bool50 fn is_partition_tail(&self, marker: bool, payload: &Bytes) -> bool;
51 }
52
53 //TODO: SystemTime vs Instant?
54 // non-monotonic clock vs monotonically non-decreasing clock
55 /// FnTimeGen provides current SystemTime
56 pub type FnTimeGen = Arc<dyn (Fn() -> SystemTime) + Send + Sync>;
57
58 #[derive(Clone)]
59 pub(crate) struct PacketizerImpl {
60 pub(crate) mtu: usize,
61 pub(crate) payload_type: u8,
62 pub(crate) ssrc: u32,
63 pub(crate) payloader: Box<dyn Payloader + Send + Sync>,
64 pub(crate) sequencer: Box<dyn Sequencer + Send + Sync>,
65 pub(crate) timestamp: u32,
66 pub(crate) clock_rate: u32,
67 pub(crate) abs_send_time: u8, //http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time
68 pub(crate) time_gen: Option<FnTimeGen>,
69 }
70
71 impl fmt::Debug for PacketizerImpl {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result72 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73 f.debug_struct("PacketizerImpl")
74 .field("mtu", &self.mtu)
75 .field("payload_type", &self.payload_type)
76 .field("ssrc", &self.ssrc)
77 .field("timestamp", &self.timestamp)
78 .field("clock_rate", &self.clock_rate)
79 .field("abs_send_time", &self.abs_send_time)
80 .finish()
81 }
82 }
83
new_packetizer( mtu: usize, payload_type: u8, ssrc: u32, payloader: Box<dyn Payloader + Send + Sync>, sequencer: Box<dyn Sequencer + Send + Sync>, clock_rate: u32, ) -> impl Packetizer84 pub fn new_packetizer(
85 mtu: usize,
86 payload_type: u8,
87 ssrc: u32,
88 payloader: Box<dyn Payloader + Send + Sync>,
89 sequencer: Box<dyn Sequencer + Send + Sync>,
90 clock_rate: u32,
91 ) -> impl Packetizer {
92 PacketizerImpl {
93 mtu,
94 payload_type,
95 ssrc,
96 payloader,
97 sequencer,
98 timestamp: rand::random::<u32>(),
99 clock_rate,
100 abs_send_time: 0,
101 time_gen: None,
102 }
103 }
104
105 impl Packetizer for PacketizerImpl {
enable_abs_send_time(&mut self, value: u8)106 fn enable_abs_send_time(&mut self, value: u8) {
107 self.abs_send_time = value
108 }
109
packetize(&mut self, payload: &Bytes, samples: u32) -> Result<Vec<Packet>>110 fn packetize(&mut self, payload: &Bytes, samples: u32) -> Result<Vec<Packet>> {
111 let payloads = self.payloader.payload(self.mtu - 12, payload)?;
112 let payloads_len = payloads.len();
113 let mut packets = Vec::with_capacity(payloads_len);
114 for (i, payload) in payloads.into_iter().enumerate() {
115 packets.push(Packet {
116 header: Header {
117 version: 2,
118 padding: false,
119 extension: false,
120 marker: i == payloads_len - 1,
121 payload_type: self.payload_type,
122 sequence_number: self.sequencer.next_sequence_number(),
123 timestamp: self.timestamp, //TODO: Figure out how to do timestamps
124 ssrc: self.ssrc,
125 ..Default::default()
126 },
127 payload,
128 });
129 }
130
131 self.timestamp = self.timestamp.wrapping_add(samples);
132
133 if payloads_len != 0 && self.abs_send_time != 0 {
134 let st = if let Some(fn_time_gen) = &self.time_gen {
135 fn_time_gen()
136 } else {
137 SystemTime::now()
138 };
139 let send_time = AbsSendTimeExtension::new(st);
140 //apply http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time
141 let mut raw = BytesMut::with_capacity(send_time.marshal_size());
142 raw.resize(send_time.marshal_size(), 0);
143 let _ = send_time.marshal_to(&mut raw)?;
144 packets[payloads_len - 1]
145 .header
146 .set_extension(self.abs_send_time, raw.freeze())?;
147 }
148
149 Ok(packets)
150 }
151
152 /// skip_samples causes a gap in sample count between Packetize requests so the
153 /// RTP payloads produced have a gap in timestamps
skip_samples(&mut self, skipped_samples: u32)154 fn skip_samples(&mut self, skipped_samples: u32) {
155 self.timestamp = self.timestamp.wrapping_add(skipped_samples);
156 }
157
clone_to(&self) -> Box<dyn Packetizer + Send + Sync>158 fn clone_to(&self) -> Box<dyn Packetizer + Send + Sync> {
159 Box::new(self.clone())
160 }
161 }
162