1 use super::define; 2 use super::errors::PackerError; 3 use super::errors::UnPackerError; 4 use super::utils; 5 use super::utils::OnFrameFn; 6 use super::utils::OnRtpPacketFn; 7 use super::utils::OnRtpPacketFn2; 8 use super::utils::TPacker; 9 use super::utils::TRtpReceiverForRtcp; 10 use super::utils::TUnPacker; 11 use super::utils::TVideoPacker; 12 use super::utils::Unmarshal; 13 use super::RtpHeader; 14 use super::RtpPacket; 15 use async_trait::async_trait; 16 use byteorder::BigEndian; 17 use bytes::{BufMut, BytesMut}; 18 use bytesio::bytes_reader::BytesReader; 19 use bytesio::bytesio::TNetIO; 20 use std::sync::Arc; 21 use streamhub::define::FrameData; 22 use tokio::sync::Mutex; 23 24 pub struct RtpH265Packer { 25 header: RtpHeader, 26 mtu: usize, 27 io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, 28 on_packet_handler: Option<OnRtpPacketFn>, 29 on_packet_for_rtcp_handler: Option<OnRtpPacketFn2>, 30 } 31 32 impl RtpH265Packer { new( payload_type: u8, ssrc: u32, init_seq: u16, mtu: usize, io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, ) -> Self33 pub fn new( 34 payload_type: u8, 35 ssrc: u32, 36 init_seq: u16, 37 mtu: usize, 38 io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, 39 ) -> Self { 40 RtpH265Packer { 41 header: RtpHeader { 42 payload_type, 43 seq_number: init_seq, 44 ssrc, 45 version: 2, 46 ..Default::default() 47 }, 48 mtu, 49 io, 50 on_packet_handler: None, 51 on_packet_for_rtcp_handler: None, 52 } 53 } 54 pack_fu(&mut self, nalu: BytesMut) -> Result<(), PackerError>55 pub async fn pack_fu(&mut self, nalu: BytesMut) -> Result<(), PackerError> { 56 let mut nalu_reader = BytesReader::new(nalu); 57 /* NALU header 58 0 1 59 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 60 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 61 |F| Type | LayerId | TID | 62 +-------------+-----------------+ 63 64 Forbidden zero(F) : 1 bit 65 NAL unit type(Type) : 6 bits 66 NUH layer ID(LayerId) : 6 bits 67 NUH temporal ID plus 1 (TID) : 3 bits 68 */ 69 let nalu_header_1st_byte = nalu_reader.read_u8()?; 70 let nalu_header_2nd_byte = nalu_reader.read_u8()?; 71 72 /* The PayloadHdr needs replace Type with the FU type value(49) */ 73 let payload_hdr: u16 = ((nalu_header_1st_byte as u16 & 0x81) | ((define::FU as u16) << 1)) 74 << 8 75 | nalu_header_2nd_byte as u16; 76 /* FU header 77 +---------------+ 78 |0|1|2|3|4|5|6|7| 79 +-+-+-+-+-+-+-+-+ 80 |S|E| FuType | 81 +---------------+ 82 */ 83 /*set FuType from NALU header's Type */ 84 let mut fu_header = (nalu_header_1st_byte >> 1) & 0x3F | define::FU_START; 85 86 let mut left_nalu_bytes: usize = nalu_reader.len(); 87 let mut fu_payload_len: usize; 88 89 while left_nalu_bytes > 0 { 90 /* 3 = PayloadHdr(2 bytes) + FU header(1 byte) */ 91 if left_nalu_bytes + define::RTP_FIXED_HEADER_LEN <= self.mtu - 3 { 92 fu_header = (nalu_header_1st_byte & 0x1F) | define::FU_END; 93 fu_payload_len = left_nalu_bytes; 94 } else { 95 fu_payload_len = self.mtu - define::RTP_FIXED_HEADER_LEN - 3; 96 } 97 98 let fu_payload = nalu_reader.read_bytes(fu_payload_len)?; 99 100 let mut packet = RtpPacket::new(self.header.clone()); 101 packet.payload.put_u16(payload_hdr); 102 packet.payload.put_u8(fu_header); 103 packet.payload.put(fu_payload); 104 packet.header.marker = if fu_header & define::FU_END > 0 { 1 } else { 0 }; 105 106 if fu_header & define::FU_START > 0 { 107 fu_header &= 0x7F 108 } 109 110 if let Some(f) = &self.on_packet_for_rtcp_handler { 111 f(packet.clone()); 112 } 113 114 if let Some(f) = &self.on_packet_handler { 115 f(self.io.clone(), packet).await?; 116 } 117 left_nalu_bytes = nalu_reader.len(); 118 self.header.seq_number += 1; 119 } 120 121 Ok(()) 122 } pack_single(&mut self, nalu: BytesMut) -> Result<(), PackerError>123 pub async fn pack_single(&mut self, nalu: BytesMut) -> Result<(), PackerError> { 124 let mut packet = RtpPacket::new(self.header.clone()); 125 packet.header.marker = 1; 126 packet.payload.put(nalu); 127 128 self.header.seq_number += 1; 129 130 if let Some(f) = &self.on_packet_for_rtcp_handler { 131 f(packet.clone()); 132 } 133 134 if let Some(f) = &self.on_packet_handler { 135 return f(self.io.clone(), packet).await; 136 } 137 Ok(()) 138 } 139 } 140 141 #[async_trait] 142 impl TPacker for RtpH265Packer { pack(&mut self, nalus: &mut BytesMut, timestamp: u32) -> Result<(), PackerError>143 async fn pack(&mut self, nalus: &mut BytesMut, timestamp: u32) -> Result<(), PackerError> { 144 self.header.timestamp = timestamp; 145 utils::split_annexb_and_process(nalus, self).await?; 146 Ok(()) 147 } on_packet_handler(&mut self, f: OnRtpPacketFn)148 fn on_packet_handler(&mut self, f: OnRtpPacketFn) { 149 self.on_packet_handler = Some(f); 150 } 151 } 152 153 impl TRtpReceiverForRtcp for RtpH265Packer { on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2)154 fn on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2) { 155 self.on_packet_for_rtcp_handler = Some(f); 156 } 157 } 158 159 #[async_trait] 160 impl TVideoPacker for RtpH265Packer { pack_nalu(&mut self, nalu: BytesMut) -> Result<(), PackerError>161 async fn pack_nalu(&mut self, nalu: BytesMut) -> Result<(), PackerError> { 162 if nalu.len() + define::RTP_FIXED_HEADER_LEN <= self.mtu { 163 self.pack_single(nalu).await?; 164 } else { 165 self.pack_fu(nalu).await?; 166 } 167 Ok(()) 168 } 169 } 170 171 #[derive(Default)] 172 pub struct RtpH265UnPacker { 173 sequence_number: u16, 174 timestamp: u32, 175 fu_buffer: BytesMut, 176 using_donl_field: bool, 177 on_frame_handler: Option<OnFrameFn>, 178 on_packet_for_rtcp_handler: Option<OnRtpPacketFn2>, 179 } 180 181 impl TUnPacker for RtpH265UnPacker { unpack(&mut self, reader: &mut BytesReader) -> Result<(), UnPackerError>182 fn unpack(&mut self, reader: &mut BytesReader) -> Result<(), UnPackerError> { 183 let rtp_packet = RtpPacket::unmarshal(reader)?; 184 185 if let Some(f) = &self.on_packet_for_rtcp_handler { 186 f(rtp_packet.clone()); 187 } 188 189 self.timestamp = rtp_packet.header.timestamp; 190 self.sequence_number = rtp_packet.header.seq_number; 191 192 if let Some(packet_type) = rtp_packet.payload.first() { 193 match *packet_type >> 1 & 0x3F { 194 define::FU => { 195 return self.unpack_fu(rtp_packet.payload.clone()); 196 } 197 define::AP => { 198 return self.unpack_ap(rtp_packet.payload); 199 } 200 define::PACI => return Ok(()), 201 202 _ => { 203 return self.unpack_single(rtp_packet.payload.clone()); 204 } 205 } 206 } 207 208 Ok(()) 209 } 210 on_frame_handler(&mut self, f: OnFrameFn)211 fn on_frame_handler(&mut self, f: OnFrameFn) { 212 self.on_frame_handler = Some(f); 213 } 214 } 215 216 impl RtpH265UnPacker { new() -> Self217 pub fn new() -> Self { 218 RtpH265UnPacker::default() 219 } 220 unpack_single(&mut self, payload: BytesMut) -> Result<(), UnPackerError>221 fn unpack_single(&mut self, payload: BytesMut) -> Result<(), UnPackerError> { 222 let mut annexb_payload = BytesMut::new(); 223 annexb_payload.extend_from_slice(&define::ANNEXB_NALU_START_CODE); 224 annexb_payload.put(payload); 225 226 if let Some(f) = &self.on_frame_handler { 227 f(FrameData::Video { 228 timestamp: self.timestamp, 229 data: annexb_payload, 230 })?; 231 } 232 Ok(()) 233 } 234 235 /* 236 0 1 2 3 237 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 238 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 239 | RTP Header | 240 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 241 | PayloadHdr (Type=48) | NALU 1 DONL | 242 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 243 | NALU 1 Size | NALU 1 HDR | 244 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 245 | | 246 | NALU 1 Data . . . | 247 | | 248 + . . . +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 249 | | NALU 2 DOND | NALU 2 Size | 250 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 251 | NALU 2 HDR | | 252 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ NALU 2 Data | 253 | | 254 | . . . +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 255 | : ...OPTIONAL RTP padding | 256 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 257 */ 258 unpack_ap(&mut self, rtp_payload: BytesMut) -> Result<(), UnPackerError>259 fn unpack_ap(&mut self, rtp_payload: BytesMut) -> Result<(), UnPackerError> { 260 let mut payload_reader = BytesReader::new(rtp_payload); 261 /*read PayloadHdr*/ 262 payload_reader.read_bytes(2)?; 263 264 while !payload_reader.is_empty() { 265 if self.using_donl_field { 266 /*read DONL*/ 267 payload_reader.read_bytes(2)?; 268 } 269 /*read NALU Size*/ 270 let nalu_len = payload_reader.read_u16::<BigEndian>()? as usize; 271 /*read NALU HDR + Data */ 272 let nalu = payload_reader.read_bytes(nalu_len)?; 273 274 let mut payload = BytesMut::new(); 275 payload.extend_from_slice(&define::ANNEXB_NALU_START_CODE); 276 payload.put(nalu); 277 278 if let Some(f) = &self.on_frame_handler { 279 f(FrameData::Video { 280 timestamp: self.timestamp, 281 data: payload, 282 })?; 283 } 284 } 285 286 Ok(()) 287 } 288 289 /* 290 0 1 291 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 292 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 293 |F| Type | LayerId | TID | 294 +-------------+-----------------+ 295 296 Forbidden zero(F) : 1 bit 297 NAL unit type(Type) : 6 bits 298 NUH layer ID(LayerId) : 6 bits 299 NUH temporal ID plus 1 (TID) : 3 bits 300 */ 301 302 /* 303 0 1 2 3 304 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 305 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 306 | PayloadHdr (Type=49) | FU header | DONL (cond) | 307 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| 308 | DONL (cond) | | 309 |-+-+-+-+-+-+-+-+ | 310 | FU payload | 311 | | 312 | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 313 | : ...OPTIONAL RTP padding | 314 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 315 /* FU header */ 316 +---------------+ 317 |0|1|2|3|4|5|6|7| 318 +-+-+-+-+-+-+-+-+ 319 |S|E| FuType | 320 +---------------+ 321 */ unpack_fu(&mut self, rtp_payload: BytesMut) -> Result<(), UnPackerError>322 fn unpack_fu(&mut self, rtp_payload: BytesMut) -> Result<(), UnPackerError> { 323 let mut payload_reader = BytesReader::new(rtp_payload); 324 let payload_header_1st_byte = payload_reader.read_u8()?; 325 let payload_header_2nd_byte = payload_reader.read_u8()?; 326 let fu_header = payload_reader.read_u8()?; 327 if self.using_donl_field { 328 payload_reader.read_bytes(2)?; 329 } 330 331 if utils::is_fu_start(fu_header) { 332 /*set NAL UNIT type 2 bytes */ 333 //replace Type of PayloadHdr with the FuType of FU header 334 let nal_1st_byte = (payload_header_1st_byte & 0x81) | ((fu_header & 0x3F) << 1); 335 self.fu_buffer.put_u8(nal_1st_byte); 336 self.fu_buffer.put_u8(payload_header_2nd_byte); 337 } 338 339 self.fu_buffer.put(payload_reader.extract_remaining_bytes()); 340 341 if utils::is_fu_end(fu_header) { 342 let mut payload = BytesMut::new(); 343 payload.extend_from_slice(&define::ANNEXB_NALU_START_CODE); 344 payload.put(self.fu_buffer.clone()); 345 self.fu_buffer.clear(); 346 347 if let Some(f) = &self.on_frame_handler { 348 f(FrameData::Video { 349 timestamp: self.timestamp, 350 data: payload, 351 })?; 352 } 353 } 354 355 Ok(()) 356 } 357 } 358 359 impl TRtpReceiverForRtcp for RtpH265UnPacker { on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2)360 fn on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2) { 361 self.on_packet_for_rtcp_handler = Some(f); 362 } 363 } 364