1 use super::define;
2 use super::errors::PackerError;
3 use super::errors::UnPackerError;
4 use super::RtpPacket;
5 use async_trait::async_trait;
6 use bytes::BytesMut;
7 use bytesio::bytes_reader::BytesReader;
8 use bytesio::bytesio::TNetIO;
9 use std::future::Future;
10 use std::pin::Pin;
11 use std::sync::Arc;
12 use std::time::SystemTime;
13 use streamhub::define::FrameData;
14 use tokio::sync::Mutex;
15
16 pub trait Unmarshal<T1, T2> {
unmarshal(data: T1) -> T2 where Self: Sized17 fn unmarshal(data: T1) -> T2
18 where
19 Self: Sized;
20 }
21
22 pub trait Marshal<T> {
marshal(&self) -> T23 fn marshal(&self) -> T;
24 }
25
26 pub type OnFrameFn = Box<dyn Fn(FrameData) -> Result<(), UnPackerError> + Send + Sync>;
27
28 //Arc<Mutex<Box<dyn TNetIO + Send + Sync>>> : The network connection used by packer to send a/v data
29 //BytesMut: The Rtp packet data that will be sent using the TNetIO
30 pub type OnRtpPacketFn = Box<
31 dyn Fn(
32 Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>,
33 RtpPacket,
34 ) -> Pin<Box<dyn Future<Output = Result<(), PackerError>> + Send + 'static>>
35 + Send
36 + Sync,
37 >;
38
39 pub type OnRtpPacketFn2 =
40 Box<dyn Fn(RtpPacket) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> + Send + Sync>;
41 // pub type OnPacketFn2 = Box<dyn Fn(&RtpPacket) + Send + Sync>;
42
43 pub trait TRtpReceiverForRtcp {
on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2)44 fn on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2);
45 }
46
47 #[async_trait]
48 pub trait TPacker: TRtpReceiverForRtcp + Send + Sync {
49 /*Split frame to rtp packets and send out*/
pack(&mut self, nalus: &mut BytesMut, timestamp: u32) -> Result<(), PackerError>50 async fn pack(&mut self, nalus: &mut BytesMut, timestamp: u32) -> Result<(), PackerError>;
51 /*Call back function used for processing a rtp packet.*/
on_packet_handler(&mut self, f: OnRtpPacketFn)52 fn on_packet_handler(&mut self, f: OnRtpPacketFn);
53 }
54
55 #[async_trait]
56 pub trait TVideoPacker: TPacker {
57 /*pack one nalu to rtp packets*/
pack_nalu(&mut self, nalu: BytesMut) -> Result<(), PackerError>58 async fn pack_nalu(&mut self, nalu: BytesMut) -> Result<(), PackerError>;
59 }
60
61 pub trait TUnPacker: TRtpReceiverForRtcp + Send + Sync {
62 /*Assemble rtp fragments into complete frame and send to stream hub*/
unpack(&mut self, reader: &mut BytesReader) -> Result<(), UnPackerError>63 fn unpack(&mut self, reader: &mut BytesReader) -> Result<(), UnPackerError>;
64 /*Call back function used for processing a frame.*/
on_frame_handler(&mut self, f: OnFrameFn)65 fn on_frame_handler(&mut self, f: OnFrameFn);
66 }
67
is_fu_start(fu_header: u8) -> bool68 pub(super) fn is_fu_start(fu_header: u8) -> bool {
69 fu_header & define::FU_START > 0
70 }
71
is_fu_end(fu_header: u8) -> bool72 pub(super) fn is_fu_end(fu_header: u8) -> bool {
73 fu_header & define::FU_END > 0
74 }
75
find_start_code(nalus: &[u8]) -> Option<usize>76 pub fn find_start_code(nalus: &[u8]) -> Option<usize> {
77 let pattern = [0x00, 0x00, 0x01];
78 nalus.windows(pattern.len()).position(|w| w == pattern)
79 }
80
split_annexb_and_process<T: TVideoPacker>( nalus: &mut BytesMut, packer: &mut T, ) -> Result<(), PackerError>81 pub async fn split_annexb_and_process<T: TVideoPacker>(
82 nalus: &mut BytesMut,
83 packer: &mut T,
84 ) -> Result<(), PackerError> {
85 while !nalus.is_empty() {
86 /* 0x02,...,0x00,0x00,0x01,0x02..,0x00,0x00,0x01 */
87 /* | | | | */
88 /* ----------- -------- */
89 /* first_pos distance_to_first_pos */
90 if let Some(first_pos) = find_start_code(&nalus[..]) {
91 let mut nalu_with_start_code =
92 if let Some(distance_to_first_pos) = find_start_code(&nalus[first_pos + 3..]) {
93 let mut second_pos = first_pos + 3 + distance_to_first_pos;
94 while second_pos > 0 && nalus[second_pos - 1] == 0 {
95 second_pos -= 1;
96 }
97 nalus.split_to(second_pos)
98 } else {
99 nalus.split_to(nalus.len())
100 };
101
102 let nalu = nalu_with_start_code.split_off(first_pos + 3);
103 packer.pack_nalu(nalu).await?;
104 } else {
105 break;
106 }
107 }
108 Ok(())
109 }
110
current_time() -> u64111 pub fn current_time() -> u64 {
112 let duration = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH);
113
114 match duration {
115 Ok(result) => (result.as_nanos() / 1000) as u64,
116 _ => 0,
117 }
118 }
119
120 #[cfg(test)]
121 mod tests {
122
123 use bytes::BytesMut;
124
find_start_code(nalus: &[u8]) -> Option<usize>125 fn find_start_code(nalus: &[u8]) -> Option<usize> {
126 let pattern = [0x00, 0x00, 0x01];
127 nalus.windows(pattern.len()).position(|w| w == pattern)
128 }
129
130 #[test]
test_annexb_split()131 pub fn test_annexb_split() {
132 let mut nalus = BytesMut::new();
133 nalus.extend_from_slice(&[
134 0x00, 0x00, 0x01, 0x02, 0x03, 0x05, 0x06, 0x00, 0x00, 0x00, 0x01, 0x02, 0x03, 0x04,
135 0x00, 0x00, 0x01, 0x02, 0x03,
136 ]);
137
138 while !nalus.is_empty() {
139 /* 0x02,...,0x00,0x00,0x01,0x02..,0x00,0x00,0x01 */
140 /* | | | | */
141 /* ----------- -------- */
142 /* first_pos second_pos */
143 if let Some(first_pos) = find_start_code(&nalus[..]) {
144 let mut nalu_with_start_code =
145 if let Some(distance_to_first_pos) = find_start_code(&nalus[first_pos + 3..]) {
146 let mut second_pos = first_pos + 3 + distance_to_first_pos;
147 println!("left: {first_pos} right: {distance_to_first_pos}");
148 while second_pos > 0 && nalus[second_pos - 1] == 0 {
149 second_pos -= 1;
150 }
151 // while nalus[pos_right ]
152 nalus.split_to(second_pos)
153 } else {
154 nalus.split_to(nalus.len())
155 };
156
157 println!("nalu_with_start_code: {:?}", nalu_with_start_code.to_vec());
158
159 let nalu = nalu_with_start_code.split_off(first_pos + 3);
160 println!("nalu: {:?}", nalu.to_vec());
161 } else {
162 break;
163 }
164 }
165 }
166 }
167