xref: /xiu/protocol/rtsp/src/rtp/utils.rs (revision b36cf5da)
1 use super::define;
2 use super::errors::PackerError;
3 use super::errors::UnPackerError;
4 use super::RtpPacket;
5 use async_trait::async_trait;
6 use bytes::BytesMut;
7 use bytesio::bytes_reader::BytesReader;
8 use bytesio::bytesio::TNetIO;
9 use std::future::Future;
10 use std::pin::Pin;
11 use std::sync::Arc;
12 use std::time::SystemTime;
13 use streamhub::define::FrameData;
14 use tokio::sync::Mutex;
15 
16 pub trait Unmarshal<T1, T2> {
unmarshal(data: T1) -> T2 where Self: Sized17     fn unmarshal(data: T1) -> T2
18     where
19         Self: Sized;
20 }
21 
22 pub trait Marshal<T> {
marshal(&self) -> T23     fn marshal(&self) -> T;
24 }
25 
26 pub type OnFrameFn = Box<dyn Fn(FrameData) -> Result<(), UnPackerError> + Send + Sync>;
27 
28 //Arc<Mutex<Box<dyn TNetIO + Send + Sync>>> : The network connection used by packer to send a/v data
29 //BytesMut: The Rtp packet data that will be sent using the TNetIO
30 pub type OnRtpPacketFn = Box<
31     dyn Fn(
32             Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>,
33             RtpPacket,
34         ) -> Pin<Box<dyn Future<Output = Result<(), PackerError>> + Send + 'static>>
35         + Send
36         + Sync,
37 >;
38 
39 pub type OnRtpPacketFn2 =
40     Box<dyn Fn(RtpPacket) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> + Send + Sync>;
41 // pub type OnPacketFn2 = Box<dyn Fn(&RtpPacket) + Send + Sync>;
42 
43 pub trait TRtpReceiverForRtcp {
on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2)44     fn on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2);
45 }
46 
47 #[async_trait]
48 pub trait TPacker: TRtpReceiverForRtcp + Send + Sync {
49     /*Split frame to rtp packets and send out*/
pack(&mut self, nalus: &mut BytesMut, timestamp: u32) -> Result<(), PackerError>50     async fn pack(&mut self, nalus: &mut BytesMut, timestamp: u32) -> Result<(), PackerError>;
51     /*Call back function used for processing a rtp packet.*/
on_packet_handler(&mut self, f: OnRtpPacketFn)52     fn on_packet_handler(&mut self, f: OnRtpPacketFn);
53 }
54 
55 #[async_trait]
56 pub trait TVideoPacker: TPacker {
57     /*pack one nalu to rtp packets*/
pack_nalu(&mut self, nalu: BytesMut) -> Result<(), PackerError>58     async fn pack_nalu(&mut self, nalu: BytesMut) -> Result<(), PackerError>;
59 }
60 
61 pub trait TUnPacker: TRtpReceiverForRtcp + Send + Sync {
62     /*Assemble rtp fragments into complete frame and send to stream hub*/
unpack(&mut self, reader: &mut BytesReader) -> Result<(), UnPackerError>63     fn unpack(&mut self, reader: &mut BytesReader) -> Result<(), UnPackerError>;
64     /*Call back function used for processing a frame.*/
on_frame_handler(&mut self, f: OnFrameFn)65     fn on_frame_handler(&mut self, f: OnFrameFn);
66 }
67 
is_fu_start(fu_header: u8) -> bool68 pub(super) fn is_fu_start(fu_header: u8) -> bool {
69     fu_header & define::FU_START > 0
70 }
71 
is_fu_end(fu_header: u8) -> bool72 pub(super) fn is_fu_end(fu_header: u8) -> bool {
73     fu_header & define::FU_END > 0
74 }
75 
find_start_code(nalus: &[u8]) -> Option<usize>76 pub fn find_start_code(nalus: &[u8]) -> Option<usize> {
77     let pattern = [0x00, 0x00, 0x01];
78     nalus.windows(pattern.len()).position(|w| w == pattern)
79 }
80 
split_annexb_and_process<T: TVideoPacker>( nalus: &mut BytesMut, packer: &mut T, ) -> Result<(), PackerError>81 pub async fn split_annexb_and_process<T: TVideoPacker>(
82     nalus: &mut BytesMut,
83     packer: &mut T,
84 ) -> Result<(), PackerError> {
85     while !nalus.is_empty() {
86         /* 0x02,...,0x00,0x00,0x01,0x02..,0x00,0x00,0x01  */
87         /*  |         |              |      |             */
88         /*  -----------              --------             */
89         /*   first_pos         distance_to_first_pos      */
90         if let Some(first_pos) = find_start_code(&nalus[..]) {
91             let mut nalu_with_start_code =
92                 if let Some(distance_to_first_pos) = find_start_code(&nalus[first_pos + 3..]) {
93                     let mut second_pos = first_pos + 3 + distance_to_first_pos;
94                     while second_pos > 0 && nalus[second_pos - 1] == 0 {
95                         second_pos -= 1;
96                     }
97                     nalus.split_to(second_pos)
98                 } else {
99                     nalus.split_to(nalus.len())
100                 };
101 
102             let nalu = nalu_with_start_code.split_off(first_pos + 3);
103             packer.pack_nalu(nalu).await?;
104         } else {
105             break;
106         }
107     }
108     Ok(())
109 }
110 
current_time() -> u64111 pub fn current_time() -> u64 {
112     let duration = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH);
113 
114     match duration {
115         Ok(result) => (result.as_nanos() / 1000) as u64,
116         _ => 0,
117     }
118 }
119 
120 #[cfg(test)]
121 mod tests {
122 
123     use bytes::BytesMut;
124 
find_start_code(nalus: &[u8]) -> Option<usize>125     fn find_start_code(nalus: &[u8]) -> Option<usize> {
126         let pattern = [0x00, 0x00, 0x01];
127         nalus.windows(pattern.len()).position(|w| w == pattern)
128     }
129 
130     #[test]
test_annexb_split()131     pub fn test_annexb_split() {
132         let mut nalus = BytesMut::new();
133         nalus.extend_from_slice(&[
134             0x00, 0x00, 0x01, 0x02, 0x03, 0x05, 0x06, 0x00, 0x00, 0x00, 0x01, 0x02, 0x03, 0x04,
135             0x00, 0x00, 0x01, 0x02, 0x03,
136         ]);
137 
138         while !nalus.is_empty() {
139             /* 0x02,...,0x00,0x00,0x01,0x02..,0x00,0x00,0x01  */
140             /*  |         |              |      |             */
141             /*  -----------              --------             */
142             /*   first_pos              second_pos            */
143             if let Some(first_pos) = find_start_code(&nalus[..]) {
144                 let mut nalu_with_start_code =
145                     if let Some(distance_to_first_pos) = find_start_code(&nalus[first_pos + 3..]) {
146                         let mut second_pos = first_pos + 3 + distance_to_first_pos;
147                         println!("left: {first_pos} right: {distance_to_first_pos}");
148                         while second_pos > 0 && nalus[second_pos - 1] == 0 {
149                             second_pos -= 1;
150                         }
151                         // while nalus[pos_right ]
152                         nalus.split_to(second_pos)
153                     } else {
154                         nalus.split_to(nalus.len())
155                     };
156 
157                 println!("nalu_with_start_code: {:?}", nalu_with_start_code.to_vec());
158 
159                 let nalu = nalu_with_start_code.split_off(first_pos + 3);
160                 println!("nalu: {:?}", nalu.to_vec());
161             } else {
162                 break;
163             }
164         }
165     }
166 }
167