1 use { 2 super::{ 3 define, 4 define::{epat_pid, epes_stream_id, ts}, 5 errors::{MpegTsError, MpegTsErrorValue}, 6 pat, pes, 7 pes::PesMuxer, 8 pmt, utils, 9 }, 10 bytes::{BufMut, BytesMut}, 11 bytesio::{bytes_reader::BytesReader, bytes_writer::BytesWriter}, 12 }; 13 14 pub struct TsMuxer { 15 pub bytes_writer: BytesWriter, 16 pat_continuity_counter: u8, 17 pmt_continuity_counter: u8, 18 h264_h265_with_aud: bool, 19 pid: u16, 20 pat_period: i64, 21 pcr_period: i64, 22 pcr_clock: i64, 23 pat: pat::Pat, 24 cur_pmt_index: usize, 25 cur_stream_index: usize, 26 27 packet_number: usize, 28 } 29 30 impl Default for TsMuxer { default() -> Self31 fn default() -> Self { 32 Self::new() 33 } 34 } 35 impl TsMuxer { new() -> Self36 pub fn new() -> Self { 37 Self { 38 bytes_writer: BytesWriter::new(), 39 pat_continuity_counter: 0, 40 pmt_continuity_counter: 0, 41 h264_h265_with_aud: false, 42 pid: 0x0100, 43 pat_period: 0, 44 pcr_period: 80 * 90, 45 pcr_clock: 0, 46 pat: pat::Pat::new(), 47 cur_pmt_index: 0, 48 cur_stream_index: 0, 49 packet_number: 0, 50 } 51 } 52 reset(&mut self)53 pub fn reset(&mut self) { 54 self.pat_period = 0; 55 self.pcr_period = 80 * 90; 56 self.pcr_clock = 0; 57 58 self.packet_number = 0; 59 } 60 get_data(&mut self) -> BytesMut61 pub fn get_data(&mut self) -> BytesMut { 62 self.bytes_writer.extract_current_bytes() 63 } 64 write( &mut self, pid: u16, pts: i64, dts: i64, flags: u16, payload: BytesMut, ) -> Result<(), MpegTsError>65 pub fn write( 66 &mut self, 67 pid: u16, 68 pts: i64, 69 dts: i64, 70 flags: u16, 71 payload: BytesMut, 72 ) -> Result<(), MpegTsError> { 73 self.h264_h265_with_aud = (flags & define::MPEG_FLAG_H264_H265_WITH_AUD) > 0; 74 75 //print!("pes payload length {}\n", payload.len()); 76 //self.packet_number += payload.len(); 77 //print!("pes payload sum length {}\n", self.payload_sum); 78 79 self.find_stream(pid)?; 80 81 let cur_pmt = self.pat.pmt.get_mut(self.cur_pmt_index).unwrap(); 82 let cur_stream = cur_pmt.streams.get_mut(self.cur_stream_index).unwrap(); 83 84 if 0x1FFF == cur_pmt.pcr_pid 85 || (define::epes_stream_id::PES_SID_VIDEO 86 == (cur_stream.stream_id & define::epes_stream_id::PES_SID_VIDEO) 87 && (cur_pmt.pcr_pid != cur_stream.pid)) 88 { 89 cur_pmt.pcr_pid = cur_stream.pid; 90 self.pat_period = 0; 91 } 92 93 if cur_pmt.pcr_pid == cur_stream.pid { 94 self.pcr_clock += 1; 95 } 96 97 cur_stream.pts = pts; 98 cur_stream.dts = dts; 99 100 if (flags & define::MPEG_FLAG_IDR_FRAME) > 0 { 101 cur_stream.data_alignment_indicator = 1; // idr frame 102 } else { 103 cur_stream.data_alignment_indicator = 0; // idr frame 104 } 105 106 if 0 == self.pat_period || (self.pat_period + define::PAT_PERIOD) <= dts { 107 self.pat_period = dts; 108 let pat_data = pat::PatMuxer::new().write(self.pat.clone())?; 109 110 self.write_ts_header_for_pat_pmt( 111 epat_pid::PAT_TID_PAS, 112 pat_data, 113 self.pat_continuity_counter, 114 )?; 115 self.pat_continuity_counter = (self.pat_continuity_counter + 1) % 16; 116 self.packet_number += 1; 117 118 for pmt_data in &mut self.pat.pmt.clone() { 119 let payload_data = pmt::PmtMuxer::new().write(pmt_data)?; 120 self.write_ts_header_for_pat_pmt( 121 pmt_data.pid, 122 payload_data, 123 self.pmt_continuity_counter, 124 )?; 125 self.pmt_continuity_counter = (self.pmt_continuity_counter + 1) % 16; 126 self.packet_number += 1; 127 } 128 } 129 130 self.write_pes(payload)?; 131 132 Ok(()) 133 } 134 write_ts_header_for_pat_pmt( &mut self, pid: u16, payload: BytesMut, continuity_counter: u8, ) -> Result<(), MpegTsError>135 pub fn write_ts_header_for_pat_pmt( 136 &mut self, 137 pid: u16, 138 payload: BytesMut, 139 continuity_counter: u8, 140 ) -> Result<(), MpegTsError> { 141 /*sync byte*/ 142 self.bytes_writer.write_u8(0x47)?; //0 143 /*PID 13 bits*/ 144 self.bytes_writer 145 .write_u8(0x40 | ((pid >> 8) as u8 & 0x1F))?; //1 146 147 self.bytes_writer.write_u8(pid as u8)?; //2 148 149 self.bytes_writer.write_u8(0x10 | continuity_counter)?; 150 151 /*adaption field control*/ 152 self.bytes_writer.write_u8(0x00)?; //4 153 154 /*payload data*/ 155 self.bytes_writer.write(&payload)?; 156 157 let left_size = ts::TS_PACKET_SIZE - payload.len() as u8 - 5; 158 for _ in 0..left_size { 159 self.bytes_writer.write_u8(0xFF)?; 160 } 161 Ok(()) 162 } 163 //2.4.3.6 PES packet P35 write_pes(&mut self, payload: BytesMut) -> Result<(), MpegTsError>164 pub fn write_pes(&mut self, payload: BytesMut) -> Result<(), MpegTsError> { 165 let mut is_start: bool = true; 166 let mut payload_reader = BytesReader::new(payload); 167 168 while !payload_reader.is_empty() { 169 //write pes header 170 let mut pes_muxer = PesMuxer::new(); 171 if is_start { 172 let cur_pmt = self.pat.pmt.get_mut(self.cur_pmt_index).unwrap(); 173 let stream_data = cur_pmt.streams.get_mut(self.cur_stream_index).unwrap(); 174 pes_muxer.write_pes_header( 175 payload_reader.len(), 176 stream_data, 177 self.h264_h265_with_aud, 178 )?; 179 } 180 181 let pes_header_length: usize = pes_muxer.len(); 182 let mut payload_length = payload_reader.len(); 183 184 //write ts header 185 let mut ts_header = BytesWriter::new(); 186 payload_length = self.write_ts_header_for_pes( 187 &mut ts_header, 188 pes_header_length, 189 payload_length, 190 is_start, 191 )?; 192 self.packet_number += 1; 193 194 is_start = false; 195 196 let data = payload_reader.read_bytes(payload_length)?; 197 198 self.bytes_writer.append(&mut ts_header); 199 self.bytes_writer.append(&mut pes_muxer.bytes_writer); 200 self.bytes_writer.write(&data[..])?; 201 } 202 Ok(()) 203 } write_ts_header_for_pes( &mut self, ts_header: &mut BytesWriter, pes_header_length: usize, payload_data_length: usize, is_start: bool, ) -> Result<usize, MpegTsError>204 pub fn write_ts_header_for_pes( 205 &mut self, 206 207 ts_header: &mut BytesWriter, 208 pes_header_length: usize, 209 payload_data_length: usize, 210 is_start: bool, 211 ) -> Result<usize, MpegTsError> { 212 let cur_pmt = self.pat.pmt.get_mut(self.cur_pmt_index).unwrap(); 213 let stream_data = cur_pmt.streams.get_mut(self.cur_stream_index).unwrap(); 214 215 let pcr_pid = cur_pmt.pcr_pid; 216 217 /****************************************************************/ 218 /* ts header 4 bytes without adaptation filed */ 219 /***************************************************************** 220 0 1 2 3 221 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 222 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 223 | sync byte | | | | PID | | | | 224 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 225 */ 226 227 /*sync byte*/ 228 ts_header.write_u8(0x47)?; //0 229 230 /*PID 13 bits*/ 231 ts_header.write_u8((stream_data.pid >> 8) as u8 & 0x1F)?; //1 232 ts_header.write_u8((stream_data.pid & 0xFF) as u8)?; //2 233 234 /*continuity counter 4 bits*/ 235 ts_header.write_u8(0x10 | (stream_data.continuity_counter & 0x0F))?; //3 236 stream_data.continuity_counter = (stream_data.continuity_counter + 1) % 16; 237 238 if is_start { 239 /*payload unit start indicator*/ 240 ts_header.or_u8_at(1, define::TS_PAYLOAD_UNIT_START_INDICATOR)?; 241 242 if (stream_data.pid == pcr_pid) 243 || ((stream_data.data_alignment_indicator > 0) 244 && define::PTS_NO_VALUE != stream_data.pts) 245 { 246 /*adaption field control*/ 247 ts_header.or_u8_at(3, 0x20)?; 248 249 /*adaption filed length -- set value to 1 for flags*/ 250 ts_header.write_u8(0x01)?; //4 251 252 /*will be used for adaptation field flags if have*/ 253 ts_header.write_u8(0x00)?; //5 254 255 if stream_data.pid == pcr_pid { 256 /*adaption field flags*/ 257 ts_header.or_u8_at(5, define::AF_FLAG_PCR)?; 258 259 let pcr = if define::PTS_NO_VALUE == stream_data.dts { 260 stream_data.pts 261 } else { 262 stream_data.dts 263 }; 264 let mut pcr_result: BytesWriter = BytesWriter::new(); 265 utils::pcr_write(&mut pcr_result, pcr * 300)?; 266 ts_header.write(&pcr_result.extract_current_bytes()[..])?; 267 /*adaption filed length -- add 6 for pcr length*/ 268 ts_header.add_u8_at(4, 6)?; 269 } 270 271 if (stream_data.data_alignment_indicator > 0) 272 && define::PTS_NO_VALUE != stream_data.pts 273 { 274 /*adaption field flags*/ 275 ts_header.or_u8_at(5, define::AF_FLAG_RANDOM_ACCESS_INDICATOR)?; 276 } 277 } 278 } 279 280 /* 281 +-------------------------------------------------------------------------+ 282 | ts header | PES data | 283 +-------------------------------------------------------------------------+ 284 | 4bytes fixed header | adaption field(stuffing)| pes header | pes payload| 285 +-------------------------------------------------------------------------+ 286 */ 287 // If payload data cannot fill up the 188 bytes packet, 288 // then stuffling bytes need to be filled in the adaptation field, 289 290 let ts_header_length = ts_header.len(); 291 let mut stuffing_length = define::TS_PACKET_SIZE as i32 292 - (ts_header_length + pes_header_length + payload_data_length) as i32; 293 294 if stuffing_length > 0 { 295 if (ts_header.get(3).unwrap() & 0x20) > 0 { 296 /*adaption filed length -- add 6 for pcr length*/ 297 ts_header.add_u8_at(4, stuffing_length as u8)?; 298 } else { 299 /*adaption field control*/ 300 ts_header.or_u8_at(3, 0x20)?; 301 /*AF length,because it occupys one byte,so here sub one.*/ 302 stuffing_length -= 1; 303 /*adaption filed length*/ 304 ts_header.write_u8(stuffing_length as u8)?; 305 /*add flag*/ 306 if stuffing_length >= 1 { 307 /*adaptation field flags flag occupies one byte, sub one.*/ 308 stuffing_length -= 1; 309 ts_header.write_u8(0x00)?; 310 } 311 } 312 for _ in 0..stuffing_length { 313 ts_header.write_u8(0xFF)?; 314 } 315 } else { 316 return Ok(define::TS_PACKET_SIZE - ts_header_length - pes_header_length); 317 } 318 319 Ok(payload_data_length) 320 } 321 find_stream(&mut self, pid: u16) -> Result<(), MpegTsError>322 pub fn find_stream(&mut self, pid: u16) -> Result<(), MpegTsError> { 323 // let mut pmt_index: usize = 0; 324 let mut stream_index: usize = 0; 325 326 for (pmt_index, pmt) in self.pat.pmt.iter_mut().enumerate() { 327 for stream in pmt.streams.iter_mut() { 328 if stream.pid == pid { 329 self.cur_pmt_index = pmt_index; 330 self.cur_stream_index = stream_index; 331 332 return Ok(()); 333 } 334 stream_index += 1; 335 } 336 } 337 338 // for pmt in self.pat.pmt.iter_mut() { 339 // for stream in pmt.streams.iter_mut() { 340 // if stream.pid == pid { 341 // self.cur_pmt_index = pmt_index; 342 // self.cur_stream_index = stream_index; 343 344 // return Ok(()); 345 // } 346 // stream_index += 1; 347 // } 348 // pmt_index += 1; 349 // } 350 351 Err(MpegTsError { 352 value: MpegTsErrorValue::StreamNotFound, 353 }) 354 } 355 add_stream(&mut self, codecid: u8, extra_data: BytesMut) -> Result<u16, MpegTsError>356 pub fn add_stream(&mut self, codecid: u8, extra_data: BytesMut) -> Result<u16, MpegTsError> { 357 if self.pat.pmt.is_empty() { 358 self.add_program(1, BytesMut::new())?; 359 } 360 361 self.pmt_add_stream(0, codecid, extra_data) 362 } 363 pmt_add_stream( &mut self, pmt_index: usize, codecid: u8, extra_data: BytesMut, ) -> Result<u16, MpegTsError>364 pub fn pmt_add_stream( 365 &mut self, 366 pmt_index: usize, 367 codecid: u8, 368 extra_data: BytesMut, 369 ) -> Result<u16, MpegTsError> { 370 let pmt = &mut self.pat.pmt[pmt_index]; 371 372 if pmt.streams.len() == 4 { 373 return Err(MpegTsError { 374 value: MpegTsErrorValue::StreamCountExeceed, 375 }); 376 } 377 378 let mut cur_stream = pes::Pes::new(); //&mut pmt.streams[pmt.stream_count]; 379 380 cur_stream.codec_id = codecid; 381 cur_stream.pid = self.pid; 382 self.pid += 1; 383 384 if utils::is_steam_type_video(codecid) { 385 cur_stream.stream_id = epes_stream_id::PES_SID_VIDEO; 386 } else if utils::is_steam_type_audio(codecid) { 387 cur_stream.stream_id = epes_stream_id::PES_SID_AUDIO; 388 } else { 389 cur_stream.stream_id = epes_stream_id::PES_SID_PRIVATE_1; 390 } 391 392 if !extra_data.is_empty() { 393 cur_stream.esinfo.put(extra_data); 394 } 395 396 pmt.streams.push(cur_stream); 397 pmt.version_number = (pmt.version_number + 1) % 32; 398 399 self.reset(); 400 401 Ok(self.pid - 1) 402 } 403 add_program(&mut self, program_number: u16, info: BytesMut) -> Result<(), MpegTsError>404 pub fn add_program(&mut self, program_number: u16, info: BytesMut) -> Result<(), MpegTsError> { 405 for cur_pmt in self.pat.pmt.iter() { 406 if cur_pmt.program_number == program_number { 407 return Err(MpegTsError { 408 value: MpegTsErrorValue::ProgramNumberExists, 409 }); 410 } 411 } 412 413 if self.pat.pmt.len() == 4 { 414 return Err(MpegTsError { 415 value: MpegTsErrorValue::PmtCountExeceed, 416 }); 417 } 418 let mut cur_pmt = pmt::Pmt::new(); //&mut self.pat.pmt[self.pat.pmt_count]; 419 420 cur_pmt.pid = self.pid; 421 self.pid += 1; 422 cur_pmt.program_number = program_number; 423 cur_pmt.version_number = 0x00; 424 cur_pmt.continuity_counter = 0; 425 cur_pmt.pcr_pid = 0x1FFF; 426 427 if !info.is_empty() { 428 cur_pmt.program_info.put(&info[..]); 429 } 430 431 self.pat.pmt.push(cur_pmt); 432 433 Ok(()) 434 } 435 } 436