1 use super::define; 2 use super::errors::PackerError; 3 use super::errors::UnPackerError; 4 use super::utils; 5 use super::utils::OnFrameFn; 6 use super::utils::OnRtpPacketFn; 7 use super::utils::OnRtpPacketFn2; 8 use super::utils::TPacker; 9 use super::utils::TRtpReceiverForRtcp; 10 use super::utils::TUnPacker; 11 use super::utils::TVideoPacker; 12 use super::utils::Unmarshal; 13 use super::RtpHeader; 14 use super::RtpPacket; 15 use async_trait::async_trait; 16 use byteorder::BigEndian; 17 use bytes::{BufMut, BytesMut}; 18 use bytesio::bytes_reader::BytesReader; 19 use bytesio::bytesio::TNetIO; 20 use std::sync::Arc; 21 use streamhub::define::FrameData; 22 use tokio::sync::Mutex; 23 24 pub struct RtpH264Packer { 25 header: RtpHeader, 26 mtu: usize, 27 on_packet_handler: Option<OnRtpPacketFn>, 28 on_packet_for_rtcp_handler: Option<OnRtpPacketFn2>, 29 io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, 30 } 31 32 impl RtpH264Packer { new( payload_type: u8, ssrc: u32, init_seq: u16, mtu: usize, io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, ) -> Self33 pub fn new( 34 payload_type: u8, 35 ssrc: u32, 36 init_seq: u16, 37 mtu: usize, 38 io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, 39 ) -> Self { 40 RtpH264Packer { 41 header: RtpHeader { 42 payload_type, 43 seq_number: init_seq, 44 ssrc, 45 version: 2, 46 ..Default::default() 47 }, 48 mtu, 49 io, 50 on_packet_handler: None, 51 on_packet_for_rtcp_handler: None, 52 } 53 } 54 pack_fu_a(&mut self, nalu: BytesMut) -> Result<(), PackerError>55 pub async fn pack_fu_a(&mut self, nalu: BytesMut) -> Result<(), PackerError> { 56 let mut nalu_reader = BytesReader::new(nalu); 57 let byte_1st = nalu_reader.read_u8()?; 58 59 let fu_indicator: u8 = (byte_1st & 0xE0) | define::FU_A; 60 let mut fu_header: u8 = (byte_1st & 0x1F) | define::FU_START; 61 62 let mut left_nalu_bytes: usize = nalu_reader.len(); 63 let mut fu_payload_len: usize; 64 65 while left_nalu_bytes > 0 { 66 if left_nalu_bytes + define::RTP_FIXED_HEADER_LEN <= self.mtu - 2 { 67 fu_header = (byte_1st & 0x1F) | define::FU_END; 68 fu_payload_len = left_nalu_bytes; 69 } else { 70 fu_payload_len = self.mtu - define::RTP_FIXED_HEADER_LEN - 2; 71 } 72 73 let fu_payload = nalu_reader.read_bytes(fu_payload_len)?; 74 75 let mut packet = RtpPacket::new(self.header.clone()); 76 packet.payload.put_u8(fu_indicator); 77 packet.payload.put_u8(fu_header); 78 79 if fu_header & define::FU_START > 0 { 80 fu_header &= 0x7F 81 } 82 83 packet.payload.put(fu_payload); 84 packet.header.marker = if fu_header & define::FU_END > 0 { 1 } else { 0 }; 85 86 if let Some(f) = &self.on_packet_for_rtcp_handler { 87 f(packet.clone()); 88 } 89 90 if let Some(f) = &self.on_packet_handler { 91 // log::info!("seq number: {}", packet.header.seq_number); 92 f(self.io.clone(), packet).await?; 93 } 94 95 left_nalu_bytes = nalu_reader.len(); 96 self.header.seq_number += 1; 97 } 98 99 Ok(()) 100 } pack_single(&mut self, nalu: BytesMut) -> Result<(), PackerError>101 pub async fn pack_single(&mut self, nalu: BytesMut) -> Result<(), PackerError> { 102 let mut packet = RtpPacket::new(self.header.clone()); 103 packet.header.marker = 1; 104 packet.payload.put(nalu); 105 106 // let packet_bytesmut = packet.marshal()?; 107 self.header.seq_number += 1; 108 109 if let Some(f) = &self.on_packet_for_rtcp_handler { 110 f(packet.clone()); 111 } 112 113 if let Some(f) = &self.on_packet_handler { 114 return f(self.io.clone(), packet).await; 115 } 116 117 Ok(()) 118 } 119 } 120 121 #[async_trait] 122 impl TPacker for RtpH264Packer { 123 //pack annexb h264 data pack(&mut self, nalus: &mut BytesMut, timestamp: u32) -> Result<(), PackerError>124 async fn pack(&mut self, nalus: &mut BytesMut, timestamp: u32) -> Result<(), PackerError> { 125 self.header.timestamp = timestamp; // ((timestamp as u64 * self.clock_rate as u64) / 1000) as u32; 126 utils::split_annexb_and_process(nalus, self).await?; 127 Ok(()) 128 } 129 on_packet_handler(&mut self, f: OnRtpPacketFn)130 fn on_packet_handler(&mut self, f: OnRtpPacketFn) { 131 self.on_packet_handler = Some(f); 132 } 133 } 134 135 impl TRtpReceiverForRtcp for RtpH264Packer { on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2)136 fn on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2) { 137 self.on_packet_for_rtcp_handler = Some(f); 138 } 139 } 140 141 #[async_trait] 142 impl TVideoPacker for RtpH264Packer { pack_nalu(&mut self, nalu: BytesMut) -> Result<(), PackerError>143 async fn pack_nalu(&mut self, nalu: BytesMut) -> Result<(), PackerError> { 144 if nalu.len() + define::RTP_FIXED_HEADER_LEN <= self.mtu { 145 self.pack_single(nalu).await?; 146 } else { 147 self.pack_fu_a(nalu).await?; 148 } 149 Ok(()) 150 } 151 } 152 153 #[derive(Default)] 154 pub struct RtpH264UnPacker { 155 sequence_number: u16, 156 timestamp: u32, 157 fu_buffer: BytesMut, 158 on_frame_handler: Option<OnFrameFn>, 159 on_packet_for_rtcp_handler: Option<OnRtpPacketFn2>, 160 } 161 162 impl TUnPacker for RtpH264UnPacker { unpack(&mut self, reader: &mut BytesReader) -> Result<(), UnPackerError>163 fn unpack(&mut self, reader: &mut BytesReader) -> Result<(), UnPackerError> { 164 let rtp_packet = RtpPacket::unmarshal(reader)?; 165 166 if let Some(f) = &self.on_packet_for_rtcp_handler { 167 f(rtp_packet.clone()); 168 } 169 170 self.timestamp = rtp_packet.header.timestamp; 171 self.sequence_number = rtp_packet.header.seq_number; 172 173 if let Some(packet_type) = rtp_packet.payload.first() { 174 match *packet_type & 0x1F { 175 1..=23 => { 176 return self.unpack_single(rtp_packet.payload.clone(), *packet_type); 177 } 178 define::STAP_A | define::STAP_B => { 179 return self.unpack_stap(rtp_packet.payload.clone(), *packet_type); 180 } 181 define::MTAP_16 | define::MTAP_24 => { 182 return self.unpack_mtap(rtp_packet.payload.clone(), *packet_type); 183 } 184 define::FU_A | define::FU_B => { 185 return self.unpack_fu(rtp_packet.payload.clone(), *packet_type); 186 } 187 _ => {} 188 } 189 } 190 191 Ok(()) 192 } 193 on_frame_handler(&mut self, f: OnFrameFn)194 fn on_frame_handler(&mut self, f: OnFrameFn) { 195 self.on_frame_handler = Some(f); 196 } 197 } 198 199 impl RtpH264UnPacker { new() -> Self200 pub fn new() -> Self { 201 RtpH264UnPacker { 202 ..Default::default() 203 } 204 } 205 unpack_single( &mut self, payload: BytesMut, _t: define::RtpNalType, ) -> Result<(), UnPackerError>206 fn unpack_single( 207 &mut self, 208 payload: BytesMut, 209 _t: define::RtpNalType, 210 ) -> Result<(), UnPackerError> { 211 if let Some(f) = &self.on_frame_handler { 212 let mut annexb_payload = BytesMut::new(); 213 annexb_payload.extend_from_slice(&define::ANNEXB_NALU_START_CODE); 214 annexb_payload.put(payload); 215 216 f(FrameData::Video { 217 timestamp: self.timestamp, 218 data: annexb_payload, 219 })?; 220 } 221 Ok(()) 222 } 223 224 // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 225 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 226 // | FU indicator | FU header | | 227 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | 228 // | | 229 // | FU payload | 230 // | | 231 // | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 232 // | :...OPTIONAL RTP padding | 233 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 234 235 // RTP payload format for FU-A 236 237 // 0 1 2 3 238 // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 239 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 240 // | FU indicator | FU header | DON | 241 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| 242 // | | 243 // | FU payload | 244 // | | 245 // | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 246 // | :...OPTIONAL RTP padding | 247 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 248 249 // RTP payload format for FU-B 250 251 // FU indicator 252 // +---------------+ 253 // |0|1|2|3|4|5|6|7| 254 // +-+-+-+-+-+-+-+-+ 255 // |F|NRI| Type | 256 // +---------------+ 257 258 // FU header 259 // +---------------+ 260 // |0|1|2|3|4|5|6|7| 261 // +-+-+-+-+-+-+-+-+ 262 // |S|E|R| Type | 263 // +---------------+ unpack_fu( &mut self, rtp_payload: BytesMut, t: define::RtpNalType, ) -> Result<(), UnPackerError>264 fn unpack_fu( 265 &mut self, 266 rtp_payload: BytesMut, 267 t: define::RtpNalType, 268 ) -> Result<(), UnPackerError> { 269 let mut payload_reader = BytesReader::new(rtp_payload); 270 let fu_indicator = payload_reader.read_u8()?; 271 let fu_header = payload_reader.read_u8()?; 272 273 if t == define::FU_B { 274 //read DON 275 payload_reader.read_u16::<BigEndian>()?; 276 } 277 278 if utils::is_fu_start(fu_header) { 279 self.fu_buffer 280 .put_u8((fu_indicator & 0xE0) | (fu_header & 0x1F)) 281 } 282 283 self.fu_buffer.put(payload_reader.extract_remaining_bytes()); 284 285 if utils::is_fu_end(fu_header) { 286 let mut payload = BytesMut::new(); 287 payload.extend_from_slice(&define::ANNEXB_NALU_START_CODE); 288 payload.put(self.fu_buffer.clone()); 289 self.fu_buffer.clear(); 290 if let Some(f) = &self.on_frame_handler { 291 f(FrameData::Video { 292 timestamp: self.timestamp, 293 data: payload, 294 })?; 295 } 296 } 297 298 Ok(()) 299 } 300 301 // 0 1 2 3 302 // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 303 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 304 // | RTP Header | 305 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 306 // |STAP-A NAL HDR | NALU 1 Size | NALU 1 HDR | 307 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 308 // | NALU 1 Data | 309 // : : 310 // + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 311 // | | NALU 2 Size | NALU 2 HDR | 312 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 313 // | NALU 2 Data | 314 // : : 315 // | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 316 // | :...OPTIONAL RTP padding | 317 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 318 319 // An example of an RTP packet including an STAP-A 320 // containing two single-time aggregation units 321 322 // 0 1 2 3 323 // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 324 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 325 // | RTP Header | 326 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 327 // |STAP-B NAL HDR | DON | NALU 1 Size | 328 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 329 // | NALU 1 Size | NALU 1 HDR | NALU 1 Data | 330 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + 331 // : : 332 // + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 333 // | | NALU 2 Size | NALU 2 HDR | 334 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 335 // | NALU 2 Data | 336 // : : 337 // | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 338 // | :...OPTIONAL RTP padding | 339 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 340 341 // An example of an RTP packet including an STAP-B 342 // containing two single-time aggregation units 343 unpack_stap( &mut self, rtp_payload: BytesMut, t: define::RtpNalType, ) -> Result<(), UnPackerError>344 fn unpack_stap( 345 &mut self, 346 rtp_payload: BytesMut, 347 t: define::RtpNalType, 348 ) -> Result<(), UnPackerError> { 349 let mut payload_reader = BytesReader::new(rtp_payload); 350 //STAP-A / STAP-B HDR 351 payload_reader.read_u8()?; 352 353 if t == define::STAP_B { 354 //read DON 355 payload_reader.read_u16::<BigEndian>()?; 356 } 357 358 while !payload_reader.is_empty() { 359 let length = payload_reader.read_u16::<BigEndian>()? as usize; 360 let nalu = payload_reader.read_bytes(length)?; 361 362 let mut payload = BytesMut::new(); 363 payload.extend_from_slice(&define::ANNEXB_NALU_START_CODE); 364 payload.put(nalu); 365 if let Some(f) = &self.on_frame_handler { 366 f(FrameData::Video { 367 timestamp: self.timestamp, 368 data: payload, 369 })?; 370 } 371 } 372 Ok(()) 373 } 374 375 // 0 1 2 3 376 // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 377 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 378 // | RTP Header | 379 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 380 // |MTAP16 NAL HDR | decoding order number base | NALU 1 Size | 381 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 382 // | NALU 1 Size | NALU 1 DOND | NALU 1 TS offset | 383 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 384 // | NALU 1 HDR | NALU 1 DATA | 385 // +-+-+-+-+-+-+-+-+ + 386 // : : 387 // + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 388 // | | NALU 2 SIZE | NALU 2 DOND | 389 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 390 // | NALU 2 TS offset | NALU 2 HDR | NALU 2 DATA | 391 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | 392 // : : 393 // | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 394 // | :...OPTIONAL RTP padding | 395 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 396 397 // An RTP packet including a multi-time aggregation 398 // packet of type MTAP16 containing two multi-time 399 // aggregation units 400 401 // 0 1 2 3 402 // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 403 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 404 // | RTP Header | 405 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 406 // |MTAP24 NAL HDR | decoding order number base | NALU 1 Size | 407 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 408 // | NALU 1 Size | NALU 1 DOND | NALU 1 TS offs | 409 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 410 // |NALU 1 TS offs | NALU 1 HDR | NALU 1 DATA | 411 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + 412 // : : 413 // + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 414 // | | NALU 2 SIZE | NALU 2 DOND | 415 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 416 // | NALU 2 TS offset | NALU 2 HDR | 417 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 418 // | NALU 2 DATA | 419 // : : 420 // | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 421 // | :...OPTIONAL RTP padding | 422 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 423 424 // An RTP packet including a multi-time aggregation 425 // packet of type MTAP24 containing two multi-time 426 // aggregation units 427 unpack_mtap( &mut self, rtp_payload: BytesMut, t: define::RtpNalType, ) -> Result<(), UnPackerError>428 fn unpack_mtap( 429 &mut self, 430 rtp_payload: BytesMut, 431 t: define::RtpNalType, 432 ) -> Result<(), UnPackerError> { 433 let mut payload_reader = BytesReader::new(rtp_payload); 434 //read NAL HDR 435 payload_reader.read_u8()?; 436 //read decoding_order_number_base 437 payload_reader.read_u16::<BigEndian>()?; 438 439 while !payload_reader.is_empty() { 440 //read nalu size 441 let nalu_size = payload_reader.read_u16::<BigEndian>()? as usize; 442 // read dond 443 payload_reader.read_u8()?; 444 // read TS offs 445 let (ts, ts_bytes) = if t == define::MTAP_16 { 446 (payload_reader.read_u16::<BigEndian>()? as u32, 2_usize) 447 } else if t == define::MTAP_24 { 448 (payload_reader.read_u24::<BigEndian>()?, 3_usize) 449 } else { 450 log::warn!("should not be here!"); 451 (0, 0) 452 }; 453 assert!(ts != 0); 454 let nalu = payload_reader.read_bytes(nalu_size - ts_bytes - 1)?; 455 456 let mut payload = BytesMut::new(); 457 payload.extend_from_slice(&define::ANNEXB_NALU_START_CODE); 458 payload.put(nalu); 459 if let Some(f) = &self.on_frame_handler { 460 f(FrameData::Video { 461 timestamp: self.timestamp, 462 data: payload, 463 })?; 464 } 465 } 466 467 Ok(()) 468 } 469 } 470 471 impl TRtpReceiverForRtcp for RtpH264UnPacker { on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2)472 fn on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2) { 473 self.on_packet_for_rtcp_handler = Some(f); 474 } 475 } 476