xref: /xiu/protocol/rtsp/src/rtp/rtp_h265.rs (revision b36cf5da)
1 use super::define;
2 use super::errors::PackerError;
3 use super::errors::UnPackerError;
4 use super::utils;
5 use super::utils::OnFrameFn;
6 use super::utils::OnRtpPacketFn;
7 use super::utils::OnRtpPacketFn2;
8 use super::utils::TPacker;
9 use super::utils::TRtpReceiverForRtcp;
10 use super::utils::TUnPacker;
11 use super::utils::TVideoPacker;
12 use super::utils::Unmarshal;
13 use super::RtpHeader;
14 use super::RtpPacket;
15 use async_trait::async_trait;
16 use byteorder::BigEndian;
17 use bytes::{BufMut, BytesMut};
18 use bytesio::bytes_reader::BytesReader;
19 use bytesio::bytesio::TNetIO;
20 use std::sync::Arc;
21 use streamhub::define::FrameData;
22 use tokio::sync::Mutex;
23 
24 pub struct RtpH265Packer {
25     header: RtpHeader,
26     mtu: usize,
27     io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>,
28     on_packet_handler: Option<OnRtpPacketFn>,
29     on_packet_for_rtcp_handler: Option<OnRtpPacketFn2>,
30 }
31 
32 impl RtpH265Packer {
new( payload_type: u8, ssrc: u32, init_seq: u16, mtu: usize, io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, ) -> Self33     pub fn new(
34         payload_type: u8,
35         ssrc: u32,
36         init_seq: u16,
37         mtu: usize,
38         io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>,
39     ) -> Self {
40         RtpH265Packer {
41             header: RtpHeader {
42                 payload_type,
43                 seq_number: init_seq,
44                 ssrc,
45                 version: 2,
46                 ..Default::default()
47             },
48             mtu,
49             io,
50             on_packet_handler: None,
51             on_packet_for_rtcp_handler: None,
52         }
53     }
54 
pack_fu(&mut self, nalu: BytesMut) -> Result<(), PackerError>55     pub async fn pack_fu(&mut self, nalu: BytesMut) -> Result<(), PackerError> {
56         let mut nalu_reader = BytesReader::new(nalu);
57         /* NALU header
58         0               1
59         0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
60         +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
61         |F|    Type   |  LayerId  | TID |
62         +-------------+-----------------+
63 
64         Forbidden zero(F) : 1 bit
65         NAL unit type(Type) : 6 bits
66         NUH layer ID(LayerId) : 6 bits
67         NUH temporal ID plus 1 (TID) : 3 bits
68         */
69         let nalu_header_1st_byte = nalu_reader.read_u8()?;
70         let nalu_header_2nd_byte = nalu_reader.read_u8()?;
71 
72         /* The PayloadHdr needs replace Type with the FU type value(49) */
73         let payload_hdr: u16 = ((nalu_header_1st_byte as u16 & 0x81) | ((define::FU as u16) << 1))
74             << 8
75             | nalu_header_2nd_byte as u16;
76         /* FU header
77         +---------------+
78         |0|1|2|3|4|5|6|7|
79         +-+-+-+-+-+-+-+-+
80         |S|E|   FuType  |
81         +---------------+
82         */
83         /*set FuType from NALU header's Type */
84         let mut fu_header = (nalu_header_1st_byte >> 1) & 0x3F | define::FU_START;
85 
86         let mut left_nalu_bytes: usize = nalu_reader.len();
87         let mut fu_payload_len: usize;
88 
89         while left_nalu_bytes > 0 {
90             /* 3 = PayloadHdr(2 bytes) + FU header(1 byte) */
91             if left_nalu_bytes + define::RTP_FIXED_HEADER_LEN <= self.mtu - 3 {
92                 fu_header = (nalu_header_1st_byte & 0x1F) | define::FU_END;
93                 fu_payload_len = left_nalu_bytes;
94             } else {
95                 fu_payload_len = self.mtu - define::RTP_FIXED_HEADER_LEN - 3;
96             }
97 
98             let fu_payload = nalu_reader.read_bytes(fu_payload_len)?;
99 
100             let mut packet = RtpPacket::new(self.header.clone());
101             packet.payload.put_u16(payload_hdr);
102             packet.payload.put_u8(fu_header);
103             packet.payload.put(fu_payload);
104             packet.header.marker = if fu_header & define::FU_END > 0 { 1 } else { 0 };
105 
106             if fu_header & define::FU_START > 0 {
107                 fu_header &= 0x7F
108             }
109 
110             if let Some(f) = &self.on_packet_for_rtcp_handler {
111                 f(packet.clone());
112             }
113 
114             if let Some(f) = &self.on_packet_handler {
115                 f(self.io.clone(), packet).await?;
116             }
117             left_nalu_bytes = nalu_reader.len();
118             self.header.seq_number += 1;
119         }
120 
121         Ok(())
122     }
pack_single(&mut self, nalu: BytesMut) -> Result<(), PackerError>123     pub async fn pack_single(&mut self, nalu: BytesMut) -> Result<(), PackerError> {
124         let mut packet = RtpPacket::new(self.header.clone());
125         packet.header.marker = 1;
126         packet.payload.put(nalu);
127 
128         self.header.seq_number += 1;
129 
130         if let Some(f) = &self.on_packet_for_rtcp_handler {
131             f(packet.clone());
132         }
133 
134         if let Some(f) = &self.on_packet_handler {
135             return f(self.io.clone(), packet).await;
136         }
137         Ok(())
138     }
139 }
140 
141 #[async_trait]
142 impl TPacker for RtpH265Packer {
pack(&mut self, nalus: &mut BytesMut, timestamp: u32) -> Result<(), PackerError>143     async fn pack(&mut self, nalus: &mut BytesMut, timestamp: u32) -> Result<(), PackerError> {
144         self.header.timestamp = timestamp;
145         utils::split_annexb_and_process(nalus, self).await?;
146         Ok(())
147     }
on_packet_handler(&mut self, f: OnRtpPacketFn)148     fn on_packet_handler(&mut self, f: OnRtpPacketFn) {
149         self.on_packet_handler = Some(f);
150     }
151 }
152 
153 impl TRtpReceiverForRtcp for RtpH265Packer {
on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2)154     fn on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2) {
155         self.on_packet_for_rtcp_handler = Some(f);
156     }
157 }
158 
159 #[async_trait]
160 impl TVideoPacker for RtpH265Packer {
pack_nalu(&mut self, nalu: BytesMut) -> Result<(), PackerError>161     async fn pack_nalu(&mut self, nalu: BytesMut) -> Result<(), PackerError> {
162         if nalu.len() + define::RTP_FIXED_HEADER_LEN <= self.mtu {
163             self.pack_single(nalu).await?;
164         } else {
165             self.pack_fu(nalu).await?;
166         }
167         Ok(())
168     }
169 }
170 
171 #[derive(Default)]
172 pub struct RtpH265UnPacker {
173     sequence_number: u16,
174     timestamp: u32,
175     fu_buffer: BytesMut,
176     using_donl_field: bool,
177     on_frame_handler: Option<OnFrameFn>,
178     on_packet_for_rtcp_handler: Option<OnRtpPacketFn2>,
179 }
180 
181 impl TUnPacker for RtpH265UnPacker {
unpack(&mut self, reader: &mut BytesReader) -> Result<(), UnPackerError>182     fn unpack(&mut self, reader: &mut BytesReader) -> Result<(), UnPackerError> {
183         let rtp_packet = RtpPacket::unmarshal(reader)?;
184 
185         if let Some(f) = &self.on_packet_for_rtcp_handler {
186             f(rtp_packet.clone());
187         }
188 
189         self.timestamp = rtp_packet.header.timestamp;
190         self.sequence_number = rtp_packet.header.seq_number;
191 
192         if let Some(packet_type) = rtp_packet.payload.first() {
193             match *packet_type >> 1 & 0x3F {
194                 define::FU => {
195                     return self.unpack_fu(rtp_packet.payload.clone());
196                 }
197                 define::AP => {
198                     return self.unpack_ap(rtp_packet.payload);
199                 }
200                 define::PACI => return Ok(()),
201 
202                 _ => {
203                     return self.unpack_single(rtp_packet.payload.clone());
204                 }
205             }
206         }
207 
208         Ok(())
209     }
210 
on_frame_handler(&mut self, f: OnFrameFn)211     fn on_frame_handler(&mut self, f: OnFrameFn) {
212         self.on_frame_handler = Some(f);
213     }
214 }
215 
216 impl RtpH265UnPacker {
new() -> Self217     pub fn new() -> Self {
218         RtpH265UnPacker::default()
219     }
220 
unpack_single(&mut self, payload: BytesMut) -> Result<(), UnPackerError>221     fn unpack_single(&mut self, payload: BytesMut) -> Result<(), UnPackerError> {
222         let mut annexb_payload = BytesMut::new();
223         annexb_payload.extend_from_slice(&define::ANNEXB_NALU_START_CODE);
224         annexb_payload.put(payload);
225 
226         if let Some(f) = &self.on_frame_handler {
227             f(FrameData::Video {
228                 timestamp: self.timestamp,
229                 data: annexb_payload,
230             })?;
231         }
232         Ok(())
233     }
234 
235     /*
236      0               1               2               3
237      0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
238     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
239     |                          RTP Header                           |
240     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
241     |      PayloadHdr (Type=48)     |           NALU 1 DONL         |
242     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
243     |           NALU 1 Size         |            NALU 1 HDR         |
244     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
245     |                                                               |
246     |                         NALU 1 Data . . .                     |
247     |                                                               |
248     +     . . .     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
249     |               |  NALU 2 DOND  |            NALU 2 Size        |
250     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
251     |          NALU 2 HDR           |                               |
252     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+            NALU 2 Data        |
253     |                                                               |
254     |         . . .                 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
255     |                               :    ...OPTIONAL RTP padding    |
256     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
257     */
258 
unpack_ap(&mut self, rtp_payload: BytesMut) -> Result<(), UnPackerError>259     fn unpack_ap(&mut self, rtp_payload: BytesMut) -> Result<(), UnPackerError> {
260         let mut payload_reader = BytesReader::new(rtp_payload);
261         /*read PayloadHdr*/
262         payload_reader.read_bytes(2)?;
263 
264         while !payload_reader.is_empty() {
265             if self.using_donl_field {
266                 /*read DONL*/
267                 payload_reader.read_bytes(2)?;
268             }
269             /*read NALU Size*/
270             let nalu_len = payload_reader.read_u16::<BigEndian>()? as usize;
271             /*read NALU HDR + Data */
272             let nalu = payload_reader.read_bytes(nalu_len)?;
273 
274             let mut payload = BytesMut::new();
275             payload.extend_from_slice(&define::ANNEXB_NALU_START_CODE);
276             payload.put(nalu);
277 
278             if let Some(f) = &self.on_frame_handler {
279                 f(FrameData::Video {
280                     timestamp: self.timestamp,
281                     data: payload,
282                 })?;
283             }
284         }
285 
286         Ok(())
287     }
288 
289     /*
290     0               1
291     0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
292     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
293     |F|    Type   |  LayerId  | TID |
294     +-------------+-----------------+
295 
296     Forbidden zero(F) : 1 bit
297     NAL unit type(Type) : 6 bits
298     NUH layer ID(LayerId) : 6 bits
299     NUH temporal ID plus 1 (TID) : 3 bits
300     */
301 
302     /*
303      0               1               2               3
304      0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
305     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
306     |     PayloadHdr (Type=49)      |    FU header  |  DONL (cond)  |
307     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-|
308     |  DONL (cond)  |                                               |
309     |-+-+-+-+-+-+-+-+                                               |
310     |                           FU payload                          |
311     |                                                               |
312     |                               +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
313     |                               :    ...OPTIONAL RTP padding    |
314     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
315     /* FU header */
316     +---------------+
317     |0|1|2|3|4|5|6|7|
318     +-+-+-+-+-+-+-+-+
319     |S|E|   FuType  |
320     +---------------+
321     */
unpack_fu(&mut self, rtp_payload: BytesMut) -> Result<(), UnPackerError>322     fn unpack_fu(&mut self, rtp_payload: BytesMut) -> Result<(), UnPackerError> {
323         let mut payload_reader = BytesReader::new(rtp_payload);
324         let payload_header_1st_byte = payload_reader.read_u8()?;
325         let payload_header_2nd_byte = payload_reader.read_u8()?;
326         let fu_header = payload_reader.read_u8()?;
327         if self.using_donl_field {
328             payload_reader.read_bytes(2)?;
329         }
330 
331         if utils::is_fu_start(fu_header) {
332             /*set NAL UNIT type 2 bytes */
333             //replace Type of PayloadHdr with the FuType of FU header
334             let nal_1st_byte = (payload_header_1st_byte & 0x81) | ((fu_header & 0x3F) << 1);
335             self.fu_buffer.put_u8(nal_1st_byte);
336             self.fu_buffer.put_u8(payload_header_2nd_byte);
337         }
338 
339         self.fu_buffer.put(payload_reader.extract_remaining_bytes());
340 
341         if utils::is_fu_end(fu_header) {
342             let mut payload = BytesMut::new();
343             payload.extend_from_slice(&define::ANNEXB_NALU_START_CODE);
344             payload.put(self.fu_buffer.clone());
345             self.fu_buffer.clear();
346 
347             if let Some(f) = &self.on_frame_handler {
348                 f(FrameData::Video {
349                     timestamp: self.timestamp,
350                     data: payload,
351                 })?;
352             }
353         }
354 
355         Ok(())
356     }
357 }
358 
359 impl TRtpReceiverForRtcp for RtpH265UnPacker {
on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2)360     fn on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2) {
361         self.on_packet_for_rtcp_handler = Some(f);
362     }
363 }
364