xref: /xiu/protocol/rtsp/src/rtp/rtp_h264.rs (revision b36cf5da)
1 use super::define;
2 use super::errors::PackerError;
3 use super::errors::UnPackerError;
4 use super::utils;
5 use super::utils::OnFrameFn;
6 use super::utils::OnRtpPacketFn;
7 use super::utils::OnRtpPacketFn2;
8 use super::utils::TPacker;
9 use super::utils::TRtpReceiverForRtcp;
10 use super::utils::TUnPacker;
11 use super::utils::TVideoPacker;
12 use super::utils::Unmarshal;
13 use super::RtpHeader;
14 use super::RtpPacket;
15 use async_trait::async_trait;
16 use byteorder::BigEndian;
17 use bytes::{BufMut, BytesMut};
18 use bytesio::bytes_reader::BytesReader;
19 use bytesio::bytesio::TNetIO;
20 use std::sync::Arc;
21 use streamhub::define::FrameData;
22 use tokio::sync::Mutex;
23 
24 pub struct RtpH264Packer {
25     header: RtpHeader,
26     mtu: usize,
27     on_packet_handler: Option<OnRtpPacketFn>,
28     on_packet_for_rtcp_handler: Option<OnRtpPacketFn2>,
29     io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>,
30 }
31 
32 impl RtpH264Packer {
new( payload_type: u8, ssrc: u32, init_seq: u16, mtu: usize, io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, ) -> Self33     pub fn new(
34         payload_type: u8,
35         ssrc: u32,
36         init_seq: u16,
37         mtu: usize,
38         io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>,
39     ) -> Self {
40         RtpH264Packer {
41             header: RtpHeader {
42                 payload_type,
43                 seq_number: init_seq,
44                 ssrc,
45                 version: 2,
46                 ..Default::default()
47             },
48             mtu,
49             io,
50             on_packet_handler: None,
51             on_packet_for_rtcp_handler: None,
52         }
53     }
54 
pack_fu_a(&mut self, nalu: BytesMut) -> Result<(), PackerError>55     pub async fn pack_fu_a(&mut self, nalu: BytesMut) -> Result<(), PackerError> {
56         let mut nalu_reader = BytesReader::new(nalu);
57         let byte_1st = nalu_reader.read_u8()?;
58 
59         let fu_indicator: u8 = (byte_1st & 0xE0) | define::FU_A;
60         let mut fu_header: u8 = (byte_1st & 0x1F) | define::FU_START;
61 
62         let mut left_nalu_bytes: usize = nalu_reader.len();
63         let mut fu_payload_len: usize;
64 
65         while left_nalu_bytes > 0 {
66             if left_nalu_bytes + define::RTP_FIXED_HEADER_LEN <= self.mtu - 2 {
67                 fu_header = (byte_1st & 0x1F) | define::FU_END;
68                 fu_payload_len = left_nalu_bytes;
69             } else {
70                 fu_payload_len = self.mtu - define::RTP_FIXED_HEADER_LEN - 2;
71             }
72 
73             let fu_payload = nalu_reader.read_bytes(fu_payload_len)?;
74 
75             let mut packet = RtpPacket::new(self.header.clone());
76             packet.payload.put_u8(fu_indicator);
77             packet.payload.put_u8(fu_header);
78 
79             if fu_header & define::FU_START > 0 {
80                 fu_header &= 0x7F
81             }
82 
83             packet.payload.put(fu_payload);
84             packet.header.marker = if fu_header & define::FU_END > 0 { 1 } else { 0 };
85 
86             if let Some(f) = &self.on_packet_for_rtcp_handler {
87                 f(packet.clone());
88             }
89 
90             if let Some(f) = &self.on_packet_handler {
91                 // log::info!("seq number: {}", packet.header.seq_number);
92                 f(self.io.clone(), packet).await?;
93             }
94 
95             left_nalu_bytes = nalu_reader.len();
96             self.header.seq_number += 1;
97         }
98 
99         Ok(())
100     }
pack_single(&mut self, nalu: BytesMut) -> Result<(), PackerError>101     pub async fn pack_single(&mut self, nalu: BytesMut) -> Result<(), PackerError> {
102         let mut packet = RtpPacket::new(self.header.clone());
103         packet.header.marker = 1;
104         packet.payload.put(nalu);
105 
106         // let packet_bytesmut = packet.marshal()?;
107         self.header.seq_number += 1;
108 
109         if let Some(f) = &self.on_packet_for_rtcp_handler {
110             f(packet.clone());
111         }
112 
113         if let Some(f) = &self.on_packet_handler {
114             return f(self.io.clone(), packet).await;
115         }
116 
117         Ok(())
118     }
119 }
120 
121 #[async_trait]
122 impl TPacker for RtpH264Packer {
123     //pack annexb h264 data
pack(&mut self, nalus: &mut BytesMut, timestamp: u32) -> Result<(), PackerError>124     async fn pack(&mut self, nalus: &mut BytesMut, timestamp: u32) -> Result<(), PackerError> {
125         self.header.timestamp = timestamp; // ((timestamp as u64 * self.clock_rate as u64) / 1000) as u32;
126         utils::split_annexb_and_process(nalus, self).await?;
127         Ok(())
128     }
129 
on_packet_handler(&mut self, f: OnRtpPacketFn)130     fn on_packet_handler(&mut self, f: OnRtpPacketFn) {
131         self.on_packet_handler = Some(f);
132     }
133 }
134 
135 impl TRtpReceiverForRtcp for RtpH264Packer {
on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2)136     fn on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2) {
137         self.on_packet_for_rtcp_handler = Some(f);
138     }
139 }
140 
141 #[async_trait]
142 impl TVideoPacker for RtpH264Packer {
pack_nalu(&mut self, nalu: BytesMut) -> Result<(), PackerError>143     async fn pack_nalu(&mut self, nalu: BytesMut) -> Result<(), PackerError> {
144         if nalu.len() + define::RTP_FIXED_HEADER_LEN <= self.mtu {
145             self.pack_single(nalu).await?;
146         } else {
147             self.pack_fu_a(nalu).await?;
148         }
149         Ok(())
150     }
151 }
152 
153 #[derive(Default)]
154 pub struct RtpH264UnPacker {
155     sequence_number: u16,
156     timestamp: u32,
157     fu_buffer: BytesMut,
158     on_frame_handler: Option<OnFrameFn>,
159     on_packet_for_rtcp_handler: Option<OnRtpPacketFn2>,
160 }
161 
162 impl TUnPacker for RtpH264UnPacker {
unpack(&mut self, reader: &mut BytesReader) -> Result<(), UnPackerError>163     fn unpack(&mut self, reader: &mut BytesReader) -> Result<(), UnPackerError> {
164         let rtp_packet = RtpPacket::unmarshal(reader)?;
165 
166         if let Some(f) = &self.on_packet_for_rtcp_handler {
167             f(rtp_packet.clone());
168         }
169 
170         self.timestamp = rtp_packet.header.timestamp;
171         self.sequence_number = rtp_packet.header.seq_number;
172 
173         if let Some(packet_type) = rtp_packet.payload.first() {
174             match *packet_type & 0x1F {
175                 1..=23 => {
176                     return self.unpack_single(rtp_packet.payload.clone(), *packet_type);
177                 }
178                 define::STAP_A | define::STAP_B => {
179                     return self.unpack_stap(rtp_packet.payload.clone(), *packet_type);
180                 }
181                 define::MTAP_16 | define::MTAP_24 => {
182                     return self.unpack_mtap(rtp_packet.payload.clone(), *packet_type);
183                 }
184                 define::FU_A | define::FU_B => {
185                     return self.unpack_fu(rtp_packet.payload.clone(), *packet_type);
186                 }
187                 _ => {}
188             }
189         }
190 
191         Ok(())
192     }
193 
on_frame_handler(&mut self, f: OnFrameFn)194     fn on_frame_handler(&mut self, f: OnFrameFn) {
195         self.on_frame_handler = Some(f);
196     }
197 }
198 
199 impl RtpH264UnPacker {
new() -> Self200     pub fn new() -> Self {
201         RtpH264UnPacker {
202             ..Default::default()
203         }
204     }
205 
unpack_single( &mut self, payload: BytesMut, _t: define::RtpNalType, ) -> Result<(), UnPackerError>206     fn unpack_single(
207         &mut self,
208         payload: BytesMut,
209         _t: define::RtpNalType,
210     ) -> Result<(), UnPackerError> {
211         if let Some(f) = &self.on_frame_handler {
212             let mut annexb_payload = BytesMut::new();
213             annexb_payload.extend_from_slice(&define::ANNEXB_NALU_START_CODE);
214             annexb_payload.put(payload);
215 
216             f(FrameData::Video {
217                 timestamp: self.timestamp,
218                 data: annexb_payload,
219             })?;
220         }
221         Ok(())
222     }
223 
224     //  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
225     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
226     // | FU indicator  |   FU header   |                               |
227     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+                               |
228     // |                                                               |
229     // |                         FU payload                            |
230     // |                                                               |
231     // |                               +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
232     // |                               :...OPTIONAL RTP padding        |
233     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
234 
235     //   RTP payload format for FU-A
236 
237     //  0                   1                   2                   3
238     //  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
239     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
240     // | FU indicator  |   FU header   |               DON             |
241     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-|
242     // |                                                               |
243     // |                         FU payload                            |
244     // |                                                               |
245     // |                               +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
246     // |                               :...OPTIONAL RTP padding        |
247     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
248 
249     //   RTP payload format for FU-B
250 
251     // FU indicator
252     // +---------------+
253     // |0|1|2|3|4|5|6|7|
254     // +-+-+-+-+-+-+-+-+
255     // |F|NRI|  Type   |
256     // +---------------+
257 
258     // FU header
259     // +---------------+
260     // |0|1|2|3|4|5|6|7|
261     // +-+-+-+-+-+-+-+-+
262     // |S|E|R|  Type   |
263     // +---------------+
unpack_fu( &mut self, rtp_payload: BytesMut, t: define::RtpNalType, ) -> Result<(), UnPackerError>264     fn unpack_fu(
265         &mut self,
266         rtp_payload: BytesMut,
267         t: define::RtpNalType,
268     ) -> Result<(), UnPackerError> {
269         let mut payload_reader = BytesReader::new(rtp_payload);
270         let fu_indicator = payload_reader.read_u8()?;
271         let fu_header = payload_reader.read_u8()?;
272 
273         if t == define::FU_B {
274             //read DON
275             payload_reader.read_u16::<BigEndian>()?;
276         }
277 
278         if utils::is_fu_start(fu_header) {
279             self.fu_buffer
280                 .put_u8((fu_indicator & 0xE0) | (fu_header & 0x1F))
281         }
282 
283         self.fu_buffer.put(payload_reader.extract_remaining_bytes());
284 
285         if utils::is_fu_end(fu_header) {
286             let mut payload = BytesMut::new();
287             payload.extend_from_slice(&define::ANNEXB_NALU_START_CODE);
288             payload.put(self.fu_buffer.clone());
289             self.fu_buffer.clear();
290             if let Some(f) = &self.on_frame_handler {
291                 f(FrameData::Video {
292                     timestamp: self.timestamp,
293                     data: payload,
294                 })?;
295             }
296         }
297 
298         Ok(())
299     }
300 
301     //  0                   1                   2                   3
302     //  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
303     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
304     // |                          RTP Header                           |
305     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
306     // |STAP-A NAL HDR |         NALU 1 Size           | NALU 1 HDR    |
307     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
308     // |                         NALU 1 Data                           |
309     // :                                                               :
310     // +               +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
311     // |               | NALU 2 Size                   | NALU 2 HDR    |
312     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
313     // |                         NALU 2 Data                           |
314     // :                                                               :
315     // |                               +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
316     // |                               :...OPTIONAL RTP padding        |
317     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
318 
319     //   An example of an RTP packet including an STAP-A
320     //   containing two single-time aggregation units
321 
322     //  0                   1                   2                   3
323     //  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
324     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
325     // |                          RTP Header                           |
326     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
327     // |STAP-B NAL HDR | DON                           | NALU 1 Size   |
328     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
329     // | NALU 1 Size   | NALU 1 HDR    | NALU 1 Data                   |
330     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+                               +
331     // :                                                               :
332     // +               +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
333     // |               | NALU 2 Size                   | NALU 2 HDR    |
334     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
335     // |                       NALU 2 Data                             |
336     // :                                                               :
337     // |                               +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
338     // |                               :...OPTIONAL RTP padding        |
339     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
340 
341     //   An example of an RTP packet including an STAP-B
342     //   containing two single-time aggregation units
343 
unpack_stap( &mut self, rtp_payload: BytesMut, t: define::RtpNalType, ) -> Result<(), UnPackerError>344     fn unpack_stap(
345         &mut self,
346         rtp_payload: BytesMut,
347         t: define::RtpNalType,
348     ) -> Result<(), UnPackerError> {
349         let mut payload_reader = BytesReader::new(rtp_payload);
350         //STAP-A / STAP-B HDR
351         payload_reader.read_u8()?;
352 
353         if t == define::STAP_B {
354             //read DON
355             payload_reader.read_u16::<BigEndian>()?;
356         }
357 
358         while !payload_reader.is_empty() {
359             let length = payload_reader.read_u16::<BigEndian>()? as usize;
360             let nalu = payload_reader.read_bytes(length)?;
361 
362             let mut payload = BytesMut::new();
363             payload.extend_from_slice(&define::ANNEXB_NALU_START_CODE);
364             payload.put(nalu);
365             if let Some(f) = &self.on_frame_handler {
366                 f(FrameData::Video {
367                     timestamp: self.timestamp,
368                     data: payload,
369                 })?;
370             }
371         }
372         Ok(())
373     }
374 
375     //  0                   1                   2                   3
376     //  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
377     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
378     // |                          RTP Header                           |
379     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
380     // |MTAP16 NAL HDR |  decoding order number base   | NALU 1 Size   |
381     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
382     // |  NALU 1 Size  |  NALU 1 DOND  |       NALU 1 TS offset        |
383     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
384     // |  NALU 1 HDR   |  NALU 1 DATA                                  |
385     // +-+-+-+-+-+-+-+-+                                               +
386     // :                                                               :
387     // +               +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
388     // |               | NALU 2 SIZE                   |  NALU 2 DOND  |
389     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
390     // |       NALU 2 TS offset        |  NALU 2 HDR   |  NALU 2 DATA  |
391     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+               |
392     // :                                                               :
393     // |                               +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
394     // |                               :...OPTIONAL RTP padding        |
395     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
396 
397     //   An RTP packet including a multi-time aggregation
398     //   packet of type MTAP16 containing two multi-time
399     //   aggregation units
400 
401     //  0                   1                   2                   3
402     //  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
403     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
404     // |                          RTP Header                           |
405     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
406     // |MTAP24 NAL HDR |  decoding order number base   | NALU 1 Size   |
407     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
408     // |  NALU 1 Size  |  NALU 1 DOND  |       NALU 1 TS offs          |
409     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
410     // |NALU 1 TS offs |  NALU 1 HDR   |  NALU 1 DATA                  |
411     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+                               +
412     // :                                                               :
413     // +               +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
414     // |               | NALU 2 SIZE                   |  NALU 2 DOND  |
415     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
416     // |       NALU 2 TS offset                        |  NALU 2 HDR   |
417     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
418     // |  NALU 2 DATA                                                  |
419     // :                                                               :
420     // |                               +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
421     // |                               :...OPTIONAL RTP padding        |
422     // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
423 
424     //   An RTP packet including a multi-time aggregation
425     //   packet of type MTAP24 containing two multi-time
426     //   aggregation units
427 
unpack_mtap( &mut self, rtp_payload: BytesMut, t: define::RtpNalType, ) -> Result<(), UnPackerError>428     fn unpack_mtap(
429         &mut self,
430         rtp_payload: BytesMut,
431         t: define::RtpNalType,
432     ) -> Result<(), UnPackerError> {
433         let mut payload_reader = BytesReader::new(rtp_payload);
434         //read NAL HDR
435         payload_reader.read_u8()?;
436         //read decoding_order_number_base
437         payload_reader.read_u16::<BigEndian>()?;
438 
439         while !payload_reader.is_empty() {
440             //read nalu size
441             let nalu_size = payload_reader.read_u16::<BigEndian>()? as usize;
442             // read dond
443             payload_reader.read_u8()?;
444             // read TS offs
445             let (ts, ts_bytes) = if t == define::MTAP_16 {
446                 (payload_reader.read_u16::<BigEndian>()? as u32, 2_usize)
447             } else if t == define::MTAP_24 {
448                 (payload_reader.read_u24::<BigEndian>()?, 3_usize)
449             } else {
450                 log::warn!("should not be here!");
451                 (0, 0)
452             };
453             assert!(ts != 0);
454             let nalu = payload_reader.read_bytes(nalu_size - ts_bytes - 1)?;
455 
456             let mut payload = BytesMut::new();
457             payload.extend_from_slice(&define::ANNEXB_NALU_START_CODE);
458             payload.put(nalu);
459             if let Some(f) = &self.on_frame_handler {
460                 f(FrameData::Video {
461                     timestamp: self.timestamp,
462                     data: payload,
463                 })?;
464             }
465         }
466 
467         Ok(())
468     }
469 }
470 
471 impl TRtpReceiverForRtcp for RtpH264UnPacker {
on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2)472     fn on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2) {
473         self.on_packet_for_rtcp_handler = Some(f);
474     }
475 }
476