1 use super::define; 2 use super::errors::PackerError; 3 use super::errors::UnPackerError; 4 use super::RtpPacket; 5 use async_trait::async_trait; 6 use bytes::BytesMut; 7 use bytesio::bytes_reader::BytesReader; 8 use bytesio::bytesio::TNetIO; 9 use std::future::Future; 10 use std::pin::Pin; 11 use std::sync::Arc; 12 use std::time::SystemTime; 13 use streamhub::define::FrameData; 14 use tokio::sync::Mutex; 15 16 pub trait Unmarshal<T1, T2> { 17 fn unmarshal(data: T1) -> T2 18 where 19 Self: Sized; 20 } 21 22 pub trait Marshal<T> { 23 fn marshal(&self) -> T; 24 } 25 26 pub type OnFrameFn = Box<dyn Fn(FrameData) -> Result<(), UnPackerError> + Send + Sync>; 27 28 //Arc<Mutex<Box<dyn TNetIO + Send + Sync>>> : The network connection used by packer to send a/v data 29 //BytesMut: The Rtp packet data that will be sent using the TNetIO 30 pub type OnRtpPacketFn = Box< 31 dyn Fn( 32 Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, 33 RtpPacket, 34 ) -> Pin<Box<dyn Future<Output = Result<(), PackerError>> + Send + 'static>> 35 + Send 36 + Sync, 37 >; 38 39 pub type OnRtpPacketFn2 = 40 Box<dyn Fn(RtpPacket) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> + Send + Sync>; 41 // pub type OnPacketFn2 = Box<dyn Fn(&RtpPacket) + Send + Sync>; 42 43 pub trait TRtpReceiverForRtcp { 44 fn on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2); 45 } 46 47 #[async_trait] 48 pub trait TPacker: TRtpReceiverForRtcp + Send + Sync { 49 /*Split frame to rtp packets and send out*/ 50 async fn pack(&mut self, nalus: &mut BytesMut, timestamp: u32) -> Result<(), PackerError>; 51 /*Call back function used for processing a rtp packet.*/ 52 fn on_packet_handler(&mut self, f: OnRtpPacketFn); 53 } 54 55 #[async_trait] 56 pub trait TVideoPacker: TPacker { 57 /*pack one nalu to rtp packets*/ 58 async fn pack_nalu(&mut self, nalu: BytesMut) -> Result<(), PackerError>; 59 } 60 61 pub trait TUnPacker: TRtpReceiverForRtcp + Send + Sync { 62 /*Assemble rtp fragments into complete frame and send to stream hub*/ 63 fn unpack(&mut self, reader: &mut BytesReader) -> Result<(), UnPackerError>; 64 /*Call back function used for processing a frame.*/ 65 fn on_frame_handler(&mut self, f: OnFrameFn); 66 } 67 68 pub(super) fn is_fu_start(fu_header: u8) -> bool { 69 fu_header & define::FU_START > 0 70 } 71 72 pub(super) fn is_fu_end(fu_header: u8) -> bool { 73 fu_header & define::FU_END > 0 74 } 75 76 pub fn find_start_code(nalus: &[u8]) -> Option<usize> { 77 let pattern = [0x00, 0x00, 0x01]; 78 nalus.windows(pattern.len()).position(|w| w == pattern) 79 } 80 81 pub async fn split_annexb_and_process<T: TVideoPacker>( 82 nalus: &mut BytesMut, 83 packer: &mut T, 84 ) -> Result<(), PackerError> { 85 while !nalus.is_empty() { 86 /* 0x02,...,0x00,0x00,0x01,0x02..,0x00,0x00,0x01 */ 87 /* | | | | */ 88 /* ----------- -------- */ 89 /* first_pos distance_to_first_pos */ 90 if let Some(first_pos) = find_start_code(&nalus[..]) { 91 let mut nalu_with_start_code = 92 if let Some(distance_to_first_pos) = find_start_code(&nalus[first_pos + 3..]) { 93 let mut second_pos = first_pos + 3 + distance_to_first_pos; 94 while second_pos > 0 && nalus[second_pos - 1] == 0 { 95 second_pos -= 1; 96 } 97 nalus.split_to(second_pos) 98 } else { 99 nalus.split_to(nalus.len()) 100 }; 101 102 let nalu = nalu_with_start_code.split_off(first_pos + 3); 103 packer.pack_nalu(nalu).await?; 104 } else { 105 break; 106 } 107 } 108 Ok(()) 109 } 110 111 pub fn current_time() -> u64 { 112 let duration = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH); 113 114 match duration { 115 Ok(result) => (result.as_nanos() / 1000) as u64, 116 _ => 0, 117 } 118 } 119 120 #[cfg(test)] 121 mod tests { 122 123 use bytes::BytesMut; 124 125 fn find_start_code(nalus: &[u8]) -> Option<usize> { 126 let pattern = [0x00, 0x00, 0x01]; 127 nalus.windows(pattern.len()).position(|w| w == pattern) 128 } 129 130 #[test] 131 pub fn test_annexb_split() { 132 let mut nalus = BytesMut::new(); 133 nalus.extend_from_slice(&[ 134 0x00, 0x00, 0x01, 0x02, 0x03, 0x05, 0x06, 0x00, 0x00, 0x00, 0x01, 0x02, 0x03, 0x04, 135 0x00, 0x00, 0x01, 0x02, 0x03, 136 ]); 137 138 while !nalus.is_empty() { 139 /* 0x02,...,0x00,0x00,0x01,0x02..,0x00,0x00,0x01 */ 140 /* | | | | */ 141 /* ----------- -------- */ 142 /* first_pos second_pos */ 143 if let Some(first_pos) = find_start_code(&nalus[..]) { 144 let mut nalu_with_start_code = 145 if let Some(distance_to_first_pos) = find_start_code(&nalus[first_pos + 3..]) { 146 let mut second_pos = first_pos + 3 + distance_to_first_pos; 147 println!("left: {first_pos} right: {distance_to_first_pos}"); 148 while second_pos > 0 && nalus[second_pos - 1] == 0 { 149 second_pos -= 1; 150 } 151 // while nalus[pos_right ] 152 nalus.split_to(second_pos) 153 } else { 154 nalus.split_to(nalus.len()) 155 }; 156 157 println!("nalu_with_start_code: {:?}", nalu_with_start_code.to_vec()); 158 159 let nalu = nalu_with_start_code.split_off(first_pos + 3); 160 println!("nalu: {:?}", nalu.to_vec()); 161 } else { 162 break; 163 } 164 } 165 } 166 } 167