1 use { 2 super::{define::FlvDemuxerData, errors::MediaError, m3u8::M3u8}, 3 bytes::BytesMut, 4 xflv::{ 5 define::{frame_type, FlvData}, 6 demuxer::{FlvAudioTagDemuxer, FlvVideoTagDemuxer}, 7 }, 8 xmpegts::{ 9 define::{epsi_stream_type, MPEG_FLAG_IDR_FRAME}, 10 ts::TsMuxer, 11 }, 12 }; 13 14 pub struct Flv2HlsRemuxer { 15 video_demuxer: FlvVideoTagDemuxer, 16 audio_demuxer: FlvAudioTagDemuxer, 17 18 ts_muxer: TsMuxer, 19 20 last_ts_dts: i64, 21 last_ts_pts: i64, 22 23 last_dts: i64, 24 last_pts: i64, 25 26 duration: i64, 27 need_new_segment: bool, 28 29 video_pid: u16, 30 audio_pid: u16, 31 32 m3u8_handler: M3u8, 33 } 34 35 impl Flv2HlsRemuxer { new(duration: i64, app_name: String, stream_name: String, need_record: bool) -> Self36 pub fn new(duration: i64, app_name: String, stream_name: String, need_record: bool) -> Self { 37 let mut ts_muxer = TsMuxer::new(); 38 let audio_pid = ts_muxer 39 .add_stream(epsi_stream_type::PSI_STREAM_AAC, BytesMut::new()) 40 .unwrap(); 41 let video_pid = ts_muxer 42 .add_stream(epsi_stream_type::PSI_STREAM_H264, BytesMut::new()) 43 .unwrap(); 44 45 Self { 46 video_demuxer: FlvVideoTagDemuxer::new(), 47 audio_demuxer: FlvAudioTagDemuxer::new(), 48 49 ts_muxer, 50 51 last_ts_dts: 0, 52 last_ts_pts: 0, 53 54 last_dts: 0, 55 last_pts: 0, 56 57 duration, 58 need_new_segment: false, 59 60 video_pid, 61 audio_pid, 62 63 m3u8_handler: M3u8::new(duration, 6, app_name, stream_name, need_record), 64 } 65 } 66 process_flv_data(&mut self, data: FlvData) -> Result<(), MediaError>67 pub fn process_flv_data(&mut self, data: FlvData) -> Result<(), MediaError> { 68 let flv_demux_data: FlvDemuxerData = match data { 69 FlvData::Audio { timestamp, data } => { 70 let audio_data = self.audio_demuxer.demux(timestamp, data)?; 71 FlvDemuxerData::Audio { data: audio_data } 72 } 73 FlvData::Video { timestamp, data } => { 74 if let Some(video_data) = self.video_demuxer.demux(timestamp, data)? { 75 FlvDemuxerData::Video { data: video_data } 76 } else { 77 return Ok(()); 78 } 79 } 80 _ => return Ok(()), 81 }; 82 83 self.process_demux_data(&flv_demux_data)?; 84 85 Ok(()) 86 } 87 flush_remaining_data(&mut self) -> Result<(), MediaError>88 pub fn flush_remaining_data(&mut self) -> Result<(), MediaError> { 89 let data = self.ts_muxer.get_data(); 90 let mut discontinuity: bool = false; 91 if self.last_dts > self.last_ts_dts + 15 * 1000 { 92 discontinuity = true; 93 } 94 self.m3u8_handler.add_segment( 95 self.last_dts - self.last_ts_dts, 96 discontinuity, 97 true, 98 data, 99 )?; 100 self.m3u8_handler.refresh_playlist()?; 101 102 Ok(()) 103 } 104 process_demux_data( &mut self, flv_demux_data: &FlvDemuxerData, ) -> Result<(), MediaError>105 pub fn process_demux_data( 106 &mut self, 107 flv_demux_data: &FlvDemuxerData, 108 ) -> Result<(), MediaError> { 109 self.need_new_segment = false; 110 111 let pid: u16; 112 let pts: i64; 113 let dts: i64; 114 let mut flags: u16 = 0; 115 let mut payload: BytesMut = BytesMut::new(); 116 117 match flv_demux_data { 118 FlvDemuxerData::Video { data } => { 119 pts = data.pts; 120 dts = data.dts; 121 pid = self.video_pid; 122 payload.extend_from_slice(&data.data[..]); 123 124 if data.frame_type == frame_type::KEY_FRAME { 125 flags = MPEG_FLAG_IDR_FRAME; 126 if dts - self.last_ts_dts >= self.duration * 1000 { 127 self.need_new_segment = true; 128 } 129 } 130 } 131 FlvDemuxerData::Audio { data } => { 132 if !data.has_data { 133 return Ok(()); 134 } 135 136 pts = data.pts; 137 dts = data.dts; 138 pid = self.audio_pid; 139 payload.extend_from_slice(&data.data[..]); 140 } 141 _ => return Ok(()), 142 } 143 144 if self.need_new_segment { 145 let mut discontinuity: bool = false; 146 if dts > self.last_ts_dts + 15 * 1000 { 147 discontinuity = true; 148 } 149 let data = self.ts_muxer.get_data(); 150 151 self.m3u8_handler 152 .add_segment(dts - self.last_ts_dts, discontinuity, false, data)?; 153 self.m3u8_handler.refresh_playlist()?; 154 155 self.ts_muxer.reset(); 156 self.last_ts_dts = dts; 157 self.last_ts_pts = pts; 158 self.need_new_segment = false; 159 } 160 161 self.last_dts = dts; 162 self.last_pts = pts; 163 164 self.ts_muxer 165 .write(pid, pts * 90, dts * 90, flags, payload)?; 166 167 Ok(()) 168 } 169 clear_files(&mut self) -> Result<(), MediaError>170 pub fn clear_files(&mut self) -> Result<(), MediaError> { 171 self.m3u8_handler.clear() 172 } 173 } 174 #[cfg(test)] 175 mod tests { 176 // use std::{ 177 // env, 178 // fs::{self}, 179 // }; 180 181 // #[test] 182 // fn test_new_path() { 183 // if let Ok(current_dir) = env::current_dir() { 184 // println!("Current directory: {:?}", current_dir); 185 // } else { 186 // eprintln!("Failed to get the current directory"); 187 // } 188 // let directory = "test"; 189 190 // if !fs::metadata(directory).is_ok() { 191 // match fs::create_dir(directory) { 192 // Ok(_) => println!("目录已创建"), 193 // Err(err) => println!("创建目录时出错:{:?}", err), 194 // } 195 // } else { 196 // println!("目录已存在"); 197 // } 198 // } 199 // #[test] 200 // fn test_copy() { 201 // let path = "./aa.txt"; 202 // if let Err(err) = fs::copy(path, "./test/") { 203 // println!("copy err: {err}"); 204 // } else { 205 // println!("copy success"); 206 // } 207 // } 208 } 209