1 #[cfg(test)] 2 mod packetizer_test; 3 4 use crate::error::Result; 5 use crate::{extension::abs_send_time_extension::*, header::*, packet::*, sequence::*}; 6 use util::marshal::{Marshal, MarshalSize}; 7 8 use async_trait::async_trait; 9 use bytes::{Bytes, BytesMut}; 10 use std::fmt; 11 use std::future::Future; 12 use std::pin::Pin; 13 use std::sync::Arc; 14 use std::time::SystemTime; 15 16 /// Payloader payloads a byte array for use as rtp.Packet payloads 17 pub trait Payloader: fmt::Debug { 18 fn payload(&mut self, mtu: usize, b: &Bytes) -> Result<Vec<Bytes>>; 19 fn clone_to(&self) -> Box<dyn Payloader + Send + Sync>; 20 } 21 22 impl Clone for Box<dyn Payloader + Send + Sync> { 23 fn clone(&self) -> Box<dyn Payloader + Send + Sync> { 24 self.clone_to() 25 } 26 } 27 28 /// Packetizer packetizes a payload 29 #[async_trait] 30 pub trait Packetizer: fmt::Debug { 31 fn enable_abs_send_time(&mut self, value: u8); 32 async fn packetize(&mut self, payload: &Bytes, samples: u32) -> Result<Vec<Packet>>; 33 fn skip_samples(&mut self, skipped_samples: u32); 34 fn clone_to(&self) -> Box<dyn Packetizer + Send + Sync>; 35 } 36 37 impl Clone for Box<dyn Packetizer + Send + Sync> { 38 fn clone(&self) -> Box<dyn Packetizer + Send + Sync> { 39 self.clone_to() 40 } 41 } 42 43 /// Depacketizer depacketizes a RTP payload, removing any RTP specific data from the payload 44 pub trait Depacketizer { 45 fn depacketize(&mut self, b: &Bytes) -> Result<Bytes>; 46 47 /// Checks if the packet is at the beginning of a partition. This 48 /// should return false if the result could not be determined, in 49 /// which case the caller will detect timestamp discontinuities. 50 fn is_partition_head(&self, payload: &Bytes) -> bool; 51 52 /// Checks if the packet is at the end of a partition. This should 53 /// return false if the result could not be determined. 54 fn is_partition_tail(&self, marker: bool, payload: &Bytes) -> bool; 55 } 56 57 //TODO: SystemTime vs Instant? 58 // non-monotonic clock vs monotonically non-decreasing clock 59 /// FnTimeGen provides current SystemTime 60 pub type FnTimeGen = 61 Arc<dyn (Fn() -> Pin<Box<dyn Future<Output = SystemTime> + Send + 'static>>) + Send + Sync>; 62 63 #[derive(Clone)] 64 pub(crate) struct PacketizerImpl { 65 pub(crate) mtu: usize, 66 pub(crate) payload_type: u8, 67 pub(crate) ssrc: u32, 68 pub(crate) payloader: Box<dyn Payloader + Send + Sync>, 69 pub(crate) sequencer: Box<dyn Sequencer + Send + Sync>, 70 pub(crate) timestamp: u32, 71 pub(crate) clock_rate: u32, 72 pub(crate) abs_send_time: u8, //http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time 73 pub(crate) time_gen: Option<FnTimeGen>, 74 } 75 76 impl fmt::Debug for PacketizerImpl { 77 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 78 f.debug_struct("PacketizerImpl") 79 .field("mtu", &self.mtu) 80 .field("payload_type", &self.payload_type) 81 .field("ssrc", &self.ssrc) 82 .field("timestamp", &self.timestamp) 83 .field("clock_rate", &self.clock_rate) 84 .field("abs_send_time", &self.abs_send_time) 85 .finish() 86 } 87 } 88 89 pub fn new_packetizer( 90 mtu: usize, 91 payload_type: u8, 92 ssrc: u32, 93 payloader: Box<dyn Payloader + Send + Sync>, 94 sequencer: Box<dyn Sequencer + Send + Sync>, 95 clock_rate: u32, 96 ) -> impl Packetizer { 97 PacketizerImpl { 98 mtu, 99 payload_type, 100 ssrc, 101 payloader, 102 sequencer, 103 timestamp: rand::random::<u32>(), 104 clock_rate, 105 abs_send_time: 0, 106 time_gen: None, 107 } 108 } 109 110 #[async_trait] 111 impl Packetizer for PacketizerImpl { 112 fn enable_abs_send_time(&mut self, value: u8) { 113 self.abs_send_time = value 114 } 115 116 async fn packetize(&mut self, payload: &Bytes, samples: u32) -> Result<Vec<Packet>> { 117 let payloads = self.payloader.payload(self.mtu - 12, payload)?; 118 let payloads_len = payloads.len(); 119 let mut packets = Vec::with_capacity(payloads_len); 120 for (i, payload) in payloads.into_iter().enumerate() { 121 packets.push(Packet { 122 header: Header { 123 version: 2, 124 padding: false, 125 extension: false, 126 marker: i == payloads_len - 1, 127 payload_type: self.payload_type, 128 sequence_number: self.sequencer.next_sequence_number(), 129 timestamp: self.timestamp, //TODO: Figure out how to do timestamps 130 ssrc: self.ssrc, 131 ..Default::default() 132 }, 133 payload, 134 }); 135 } 136 137 self.timestamp = self.timestamp.wrapping_add(samples); 138 139 if payloads_len != 0 && self.abs_send_time != 0 { 140 let st = if let Some(fn_time_gen) = &self.time_gen { 141 fn_time_gen().await 142 } else { 143 SystemTime::now() 144 }; 145 let send_time = AbsSendTimeExtension::new(st); 146 //apply http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time 147 let mut raw = BytesMut::with_capacity(send_time.marshal_size()); 148 raw.resize(send_time.marshal_size(), 0); 149 let _ = send_time.marshal_to(&mut raw)?; 150 packets[payloads_len - 1] 151 .header 152 .set_extension(self.abs_send_time, raw.freeze())?; 153 } 154 155 Ok(packets) 156 } 157 158 /// skip_samples causes a gap in sample count between Packetize requests so the 159 /// RTP payloads produced have a gap in timestamps 160 fn skip_samples(&mut self, skipped_samples: u32) { 161 self.timestamp = self.timestamp.wrapping_add(skipped_samples); 162 } 163 164 fn clone_to(&self) -> Box<dyn Packetizer + Send + Sync> { 165 Box::new(self.clone()) 166 } 167 } 168