xref: /xiu/library/container/mpegts/src/ts.rs (revision 69de9bbd)
1 use {
2     super::{
3         define,
4         define::{epat_pid, epes_stream_id, ts},
5         errors::{MpegTsError, MpegTsErrorValue},
6         pat, pes,
7         pes::PesMuxer,
8         pmt, utils,
9     },
10     bytes::{BufMut, BytesMut},
11     bytesio::{bytes_reader::BytesReader, bytes_writer::BytesWriter},
12 };
13 
14 pub struct TsMuxer {
15     pub bytes_writer: BytesWriter,
16     pat_continuity_counter: u8,
17     pmt_continuity_counter: u8,
18     h264_h265_with_aud: bool,
19     pid: u16,
20     pat_period: i64,
21     pcr_period: i64,
22     pcr_clock: i64,
23     pat: pat::Pat,
24     cur_pmt_index: usize,
25     cur_stream_index: usize,
26 
27     packet_number: usize,
28 }
29 
30 impl Default for TsMuxer {
default() -> Self31     fn default() -> Self {
32         Self::new()
33     }
34 }
35 impl TsMuxer {
new() -> Self36     pub fn new() -> Self {
37         Self {
38             bytes_writer: BytesWriter::new(),
39             pat_continuity_counter: 0,
40             pmt_continuity_counter: 0,
41             h264_h265_with_aud: false,
42             pid: 0x0100,
43             pat_period: 0,
44             pcr_period: 80 * 90,
45             pcr_clock: 0,
46             pat: pat::Pat::new(),
47             cur_pmt_index: 0,
48             cur_stream_index: 0,
49             packet_number: 0,
50         }
51     }
52 
reset(&mut self)53     pub fn reset(&mut self) {
54         self.pat_period = 0;
55         self.pcr_period = 80 * 90;
56         self.pcr_clock = 0;
57 
58         self.packet_number = 0;
59     }
60 
get_data(&mut self) -> BytesMut61     pub fn get_data(&mut self) -> BytesMut {
62         self.bytes_writer.extract_current_bytes()
63     }
64 
write( &mut self, pid: u16, pts: i64, dts: i64, flags: u16, payload: BytesMut, ) -> Result<(), MpegTsError>65     pub fn write(
66         &mut self,
67         pid: u16,
68         pts: i64,
69         dts: i64,
70         flags: u16,
71         payload: BytesMut,
72     ) -> Result<(), MpegTsError> {
73         self.h264_h265_with_aud = (flags & define::MPEG_FLAG_H264_H265_WITH_AUD) > 0;
74 
75         //print!("pes payload length {}\n", payload.len());
76         //self.packet_number += payload.len();
77         //print!("pes payload sum length {}\n", self.payload_sum);
78 
79         self.find_stream(pid)?;
80 
81         let cur_pmt = self.pat.pmt.get_mut(self.cur_pmt_index).unwrap();
82         let cur_stream = cur_pmt.streams.get_mut(self.cur_stream_index).unwrap();
83 
84         if 0x1FFF == cur_pmt.pcr_pid
85             || (define::epes_stream_id::PES_SID_VIDEO
86                 == (cur_stream.stream_id & define::epes_stream_id::PES_SID_VIDEO)
87                 && (cur_pmt.pcr_pid != cur_stream.pid))
88         {
89             cur_pmt.pcr_pid = cur_stream.pid;
90             self.pat_period = 0;
91         }
92 
93         if cur_pmt.pcr_pid == cur_stream.pid {
94             self.pcr_clock += 1;
95         }
96 
97         cur_stream.pts = pts;
98         cur_stream.dts = dts;
99 
100         if (flags & define::MPEG_FLAG_IDR_FRAME) > 0 {
101             cur_stream.data_alignment_indicator = 1; // idr frame
102         } else {
103             cur_stream.data_alignment_indicator = 0; // idr frame
104         }
105 
106         if 0 == self.pat_period || (self.pat_period + define::PAT_PERIOD) <= dts {
107             self.pat_period = dts;
108             let pat_data = pat::PatMuxer::new().write(self.pat.clone())?;
109 
110             self.write_ts_header_for_pat_pmt(
111                 epat_pid::PAT_TID_PAS,
112                 pat_data,
113                 self.pat_continuity_counter,
114             )?;
115             self.pat_continuity_counter = (self.pat_continuity_counter + 1) % 16;
116             self.packet_number += 1;
117 
118             for pmt_data in &mut self.pat.pmt.clone() {
119                 let payload_data = pmt::PmtMuxer::new().write(pmt_data)?;
120                 self.write_ts_header_for_pat_pmt(
121                     pmt_data.pid,
122                     payload_data,
123                     self.pmt_continuity_counter,
124                 )?;
125                 self.pmt_continuity_counter = (self.pmt_continuity_counter + 1) % 16;
126                 self.packet_number += 1;
127             }
128         }
129 
130         self.write_pes(payload)?;
131 
132         Ok(())
133     }
134 
write_ts_header_for_pat_pmt( &mut self, pid: u16, payload: BytesMut, continuity_counter: u8, ) -> Result<(), MpegTsError>135     pub fn write_ts_header_for_pat_pmt(
136         &mut self,
137         pid: u16,
138         payload: BytesMut,
139         continuity_counter: u8,
140     ) -> Result<(), MpegTsError> {
141         /*sync byte*/
142         self.bytes_writer.write_u8(0x47)?; //0
143                                            /*PID 13 bits*/
144         self.bytes_writer
145             .write_u8(0x40 | ((pid >> 8) as u8 & 0x1F))?; //1
146 
147         self.bytes_writer.write_u8(pid as u8)?; //2
148 
149         self.bytes_writer.write_u8(0x10 | continuity_counter)?;
150 
151         /*adaption field control*/
152         self.bytes_writer.write_u8(0x00)?; //4
153 
154         /*payload data*/
155         self.bytes_writer.write(&payload)?;
156 
157         let left_size = ts::TS_PACKET_SIZE - payload.len() as u8 - 5;
158         for _ in 0..left_size {
159             self.bytes_writer.write_u8(0xFF)?;
160         }
161         Ok(())
162     }
163     //2.4.3.6 PES packet P35
write_pes(&mut self, payload: BytesMut) -> Result<(), MpegTsError>164     pub fn write_pes(&mut self, payload: BytesMut) -> Result<(), MpegTsError> {
165         let mut is_start: bool = true;
166         let mut payload_reader = BytesReader::new(payload);
167 
168         while !payload_reader.is_empty() {
169             //write pes header
170             let mut pes_muxer = PesMuxer::new();
171             if is_start {
172                 let cur_pmt = self.pat.pmt.get_mut(self.cur_pmt_index).unwrap();
173                 let stream_data = cur_pmt.streams.get_mut(self.cur_stream_index).unwrap();
174                 pes_muxer.write_pes_header(
175                     payload_reader.len(),
176                     stream_data,
177                     self.h264_h265_with_aud,
178                 )?;
179             }
180 
181             let pes_header_length: usize = pes_muxer.len();
182             let mut payload_length = payload_reader.len();
183 
184             //write ts header
185             let mut ts_header = BytesWriter::new();
186             payload_length = self.write_ts_header_for_pes(
187                 &mut ts_header,
188                 pes_header_length,
189                 payload_length,
190                 is_start,
191             )?;
192             self.packet_number += 1;
193 
194             is_start = false;
195 
196             let data = payload_reader.read_bytes(payload_length)?;
197 
198             self.bytes_writer.append(&mut ts_header);
199             self.bytes_writer.append(&mut pes_muxer.bytes_writer);
200             self.bytes_writer.write(&data[..])?;
201         }
202         Ok(())
203     }
write_ts_header_for_pes( &mut self, ts_header: &mut BytesWriter, pes_header_length: usize, payload_data_length: usize, is_start: bool, ) -> Result<usize, MpegTsError>204     pub fn write_ts_header_for_pes(
205         &mut self,
206 
207         ts_header: &mut BytesWriter,
208         pes_header_length: usize,
209         payload_data_length: usize,
210         is_start: bool,
211     ) -> Result<usize, MpegTsError> {
212         let cur_pmt = self.pat.pmt.get_mut(self.cur_pmt_index).unwrap();
213         let stream_data = cur_pmt.streams.get_mut(self.cur_stream_index).unwrap();
214 
215         let pcr_pid = cur_pmt.pcr_pid;
216 
217         /****************************************************************/
218         /*        ts header 4 bytes without adaptation filed            */
219         /*****************************************************************
220          0                   1                   2                   3
221          0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
222         +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
223         |   sync byte   | | | |          PID            |   |   |       |
224         +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
225         */
226 
227         /*sync byte*/
228         ts_header.write_u8(0x47)?; //0
229 
230         /*PID 13 bits*/
231         ts_header.write_u8((stream_data.pid >> 8) as u8 & 0x1F)?; //1
232         ts_header.write_u8((stream_data.pid & 0xFF) as u8)?; //2
233 
234         /*continuity counter 4 bits*/
235         ts_header.write_u8(0x10 | (stream_data.continuity_counter & 0x0F))?; //3
236         stream_data.continuity_counter = (stream_data.continuity_counter + 1) % 16;
237 
238         if is_start {
239             /*payload unit start indicator*/
240             ts_header.or_u8_at(1, define::TS_PAYLOAD_UNIT_START_INDICATOR)?;
241 
242             if (stream_data.pid == pcr_pid)
243                 || ((stream_data.data_alignment_indicator > 0)
244                     && define::PTS_NO_VALUE != stream_data.pts)
245             {
246                 /*adaption field control*/
247                 ts_header.or_u8_at(3, 0x20)?;
248 
249                 /*adaption filed length -- set value to 1 for flags*/
250                 ts_header.write_u8(0x01)?; //4
251 
252                 /*will be used for adaptation field flags if have*/
253                 ts_header.write_u8(0x00)?; //5
254 
255                 if stream_data.pid == pcr_pid {
256                     /*adaption field flags*/
257                     ts_header.or_u8_at(5, define::AF_FLAG_PCR)?;
258 
259                     let pcr = if define::PTS_NO_VALUE == stream_data.dts {
260                         stream_data.pts
261                     } else {
262                         stream_data.dts
263                     };
264                     let mut pcr_result: BytesWriter = BytesWriter::new();
265                     utils::pcr_write(&mut pcr_result, pcr * 300)?;
266                     ts_header.write(&pcr_result.extract_current_bytes()[..])?;
267                     /*adaption filed length -- add 6 for pcr length*/
268                     ts_header.add_u8_at(4, 6)?;
269                 }
270 
271                 if (stream_data.data_alignment_indicator > 0)
272                     && define::PTS_NO_VALUE != stream_data.pts
273                 {
274                     /*adaption field flags*/
275                     ts_header.or_u8_at(5, define::AF_FLAG_RANDOM_ACCESS_INDICATOR)?;
276                 }
277             }
278         }
279 
280         /*
281         +-------------------------------------------------------------------------+
282         |        ts header                              | PES data                |
283         +-------------------------------------------------------------------------+
284         | 4bytes fixed header | adaption field(stuffing)| pes header | pes payload|
285         +-------------------------------------------------------------------------+
286         */
287         // If payload data cannot fill up the 188 bytes packet,
288         // then stuffling bytes need to be filled in the adaptation field,
289 
290         let ts_header_length = ts_header.len();
291         let mut stuffing_length = define::TS_PACKET_SIZE as i32
292             - (ts_header_length + pes_header_length + payload_data_length) as i32;
293 
294         if stuffing_length > 0 {
295             if (ts_header.get(3).unwrap() & 0x20) > 0 {
296                 /*adaption filed length -- add 6 for pcr length*/
297                 ts_header.add_u8_at(4, stuffing_length as u8)?;
298             } else {
299                 /*adaption field control*/
300                 ts_header.or_u8_at(3, 0x20)?;
301                 /*AF length,because it occupys one byte,so here sub one.*/
302                 stuffing_length -= 1;
303                 /*adaption filed length*/
304                 ts_header.write_u8(stuffing_length as u8)?;
305                 /*add flag*/
306                 if stuffing_length >= 1 {
307                     /*adaptation field flags flag occupies one byte, sub one.*/
308                     stuffing_length -= 1;
309                     ts_header.write_u8(0x00)?;
310                 }
311             }
312             for _ in 0..stuffing_length {
313                 ts_header.write_u8(0xFF)?;
314             }
315         } else {
316             return Ok(define::TS_PACKET_SIZE - ts_header_length - pes_header_length);
317         }
318 
319         Ok(payload_data_length)
320     }
321 
find_stream(&mut self, pid: u16) -> Result<(), MpegTsError>322     pub fn find_stream(&mut self, pid: u16) -> Result<(), MpegTsError> {
323         // let mut pmt_index: usize = 0;
324         let mut stream_index: usize = 0;
325 
326         for (pmt_index, pmt) in self.pat.pmt.iter_mut().enumerate() {
327             for stream in pmt.streams.iter_mut() {
328                 if stream.pid == pid {
329                     self.cur_pmt_index = pmt_index;
330                     self.cur_stream_index = stream_index;
331 
332                     return Ok(());
333                 }
334                 stream_index += 1;
335             }
336         }
337 
338         // for pmt in self.pat.pmt.iter_mut() {
339         //     for stream in pmt.streams.iter_mut() {
340         //         if stream.pid == pid {
341         //             self.cur_pmt_index = pmt_index;
342         //             self.cur_stream_index = stream_index;
343 
344         //             return Ok(());
345         //         }
346         //         stream_index += 1;
347         //     }
348         //     pmt_index += 1;
349         // }
350 
351         Err(MpegTsError {
352             value: MpegTsErrorValue::StreamNotFound,
353         })
354     }
355 
add_stream(&mut self, codecid: u8, extra_data: BytesMut) -> Result<u16, MpegTsError>356     pub fn add_stream(&mut self, codecid: u8, extra_data: BytesMut) -> Result<u16, MpegTsError> {
357         if self.pat.pmt.is_empty() {
358             self.add_program(1, BytesMut::new())?;
359         }
360 
361         self.pmt_add_stream(0, codecid, extra_data)
362     }
363 
pmt_add_stream( &mut self, pmt_index: usize, codecid: u8, extra_data: BytesMut, ) -> Result<u16, MpegTsError>364     pub fn pmt_add_stream(
365         &mut self,
366         pmt_index: usize,
367         codecid: u8,
368         extra_data: BytesMut,
369     ) -> Result<u16, MpegTsError> {
370         let pmt = &mut self.pat.pmt[pmt_index];
371 
372         if pmt.streams.len() == 4 {
373             return Err(MpegTsError {
374                 value: MpegTsErrorValue::StreamCountExeceed,
375             });
376         }
377 
378         let mut cur_stream = pes::Pes::new(); //&mut pmt.streams[pmt.stream_count];
379 
380         cur_stream.codec_id = codecid;
381         cur_stream.pid = self.pid;
382         self.pid += 1;
383 
384         if utils::is_steam_type_video(codecid) {
385             cur_stream.stream_id = epes_stream_id::PES_SID_VIDEO;
386         } else if utils::is_steam_type_audio(codecid) {
387             cur_stream.stream_id = epes_stream_id::PES_SID_AUDIO;
388         } else {
389             cur_stream.stream_id = epes_stream_id::PES_SID_PRIVATE_1;
390         }
391 
392         if !extra_data.is_empty() {
393             cur_stream.esinfo.put(extra_data);
394         }
395 
396         pmt.streams.push(cur_stream);
397         pmt.version_number = (pmt.version_number + 1) % 32;
398 
399         self.reset();
400 
401         Ok(self.pid - 1)
402     }
403 
add_program(&mut self, program_number: u16, info: BytesMut) -> Result<(), MpegTsError>404     pub fn add_program(&mut self, program_number: u16, info: BytesMut) -> Result<(), MpegTsError> {
405         for cur_pmt in self.pat.pmt.iter() {
406             if cur_pmt.program_number == program_number {
407                 return Err(MpegTsError {
408                     value: MpegTsErrorValue::ProgramNumberExists,
409                 });
410             }
411         }
412 
413         if self.pat.pmt.len() == 4 {
414             return Err(MpegTsError {
415                 value: MpegTsErrorValue::PmtCountExeceed,
416             });
417         }
418         let mut cur_pmt = pmt::Pmt::new(); //&mut self.pat.pmt[self.pat.pmt_count];
419 
420         cur_pmt.pid = self.pid;
421         self.pid += 1;
422         cur_pmt.program_number = program_number;
423         cur_pmt.version_number = 0x00;
424         cur_pmt.continuity_counter = 0;
425         cur_pmt.pcr_pid = 0x1FFF;
426 
427         if !info.is_empty() {
428             cur_pmt.program_info.put(&info[..]);
429         }
430 
431         self.pat.pmt.push(cur_pmt);
432 
433         Ok(())
434     }
435 }
436