17adc8486SHarlanC pub mod errors; 2d20ba44cSwawacry pub mod gop; 3d20ba44cSwawacry pub mod metadata; 40ca99c20SHarlan 50ca99c20SHarlan use { 68baa1d3cSHarlan self::gop::Gops, 70ca99c20SHarlan bytes::BytesMut, 8*13bac29aSHarlan bytesio::bytes_reader::BytesReader, 90ca99c20SHarlan errors::CacheError, 100ca99c20SHarlan gop::Gop, 118baa1d3cSHarlan std::collections::VecDeque, 128e71d710SHarlan streamhub::define::FrameData, 138e71d710SHarlan streamhub::statistics::avstatistics::AvStatistics, 148e71d710SHarlan streamhub::stream::StreamIdentifier, 15*13bac29aSHarlan xflv::{ 16*13bac29aSHarlan define, 17*13bac29aSHarlan flv_tag_header::{AudioTagHeader, VideoTagHeader}, 18*13bac29aSHarlan mpeg4_aac::Mpeg4AacProcessor, 19*13bac29aSHarlan mpeg4_avc::Mpeg4AvcProcessor, 20*13bac29aSHarlan Unmarshal, 21*13bac29aSHarlan }, 220ca99c20SHarlan }; 23ccd9a1faSHarlan 24ccd9a1faSHarlan // #[derive(Clone)] 250ca99c20SHarlan pub struct Cache { 260ca99c20SHarlan metadata: metadata::MetaData, 270ca99c20SHarlan metadata_timestamp: u32, 280ca99c20SHarlan video_seq: BytesMut, 290ca99c20SHarlan video_timestamp: u32, 300ca99c20SHarlan audio_seq: BytesMut, 310ca99c20SHarlan audio_timestamp: u32, 328baa1d3cSHarlan gops: Gops, 33f59e6ff6SHarlan pub av_statistics: AvStatistics, 340ca99c20SHarlan } 350ca99c20SHarlan 36ccd9a1faSHarlan impl Drop for Cache { 37ccd9a1faSHarlan #[allow(unused_must_use)] drop(&mut self)38ccd9a1faSHarlan fn drop(&mut self) { 39ccd9a1faSHarlan self.av_statistics.sender.send(true); 400ca99c20SHarlan } 410ca99c20SHarlan } 420ca99c20SHarlan 430ca99c20SHarlan impl Cache { new(app_name: String, stream_name: String, gop_num: usize) -> Self44e9407847SHarlan pub fn new(app_name: String, stream_name: String, gop_num: usize) -> Self { 458e71d710SHarlan let identifier = StreamIdentifier::Rtmp { 468e71d710SHarlan app_name, 478e71d710SHarlan stream_name, 488e71d710SHarlan }; 49ccd9a1faSHarlan let mut cache = Cache { 500ca99c20SHarlan metadata: metadata::MetaData::new(), 510ca99c20SHarlan metadata_timestamp: 0, 520ca99c20SHarlan video_seq: BytesMut::new(), 530ca99c20SHarlan video_timestamp: 0, 540ca99c20SHarlan audio_seq: BytesMut::new(), 550ca99c20SHarlan audio_timestamp: 0, 568baa1d3cSHarlan gops: Gops::new(gop_num), 578e71d710SHarlan av_statistics: AvStatistics::new(identifier), 58ccd9a1faSHarlan }; 59ccd9a1faSHarlan cache.av_statistics.start(); 60ccd9a1faSHarlan cache 610ca99c20SHarlan } 620ca99c20SHarlan 630ca99c20SHarlan //, values: Vec<Amf0ValueType> save_metadata(&mut self, chunk_body: &BytesMut, timestamp: u32)648e71d710SHarlan pub fn save_metadata(&mut self, chunk_body: &BytesMut, timestamp: u32) { 650ca99c20SHarlan self.metadata.save(chunk_body); 660ca99c20SHarlan self.metadata_timestamp = timestamp; 670ca99c20SHarlan } 680ca99c20SHarlan get_metadata(&self) -> Option<FrameData>698e71d710SHarlan pub fn get_metadata(&self) -> Option<FrameData> { 700ca99c20SHarlan let data = self.metadata.get_chunk_body(); 710ca99c20SHarlan if !data.is_empty() { 728e71d710SHarlan Some(FrameData::MetaData { 730ca99c20SHarlan timestamp: self.metadata_timestamp, 740ca99c20SHarlan data, 750ca99c20SHarlan }) 760ca99c20SHarlan } else { 770ca99c20SHarlan None 780ca99c20SHarlan } 790ca99c20SHarlan } 80ccd9a1faSHarlan //save audio gops and sequence header information save_audio_data( &mut self, chunk_body: &BytesMut, timestamp: u32, ) -> Result<(), CacheError>81ccd9a1faSHarlan pub async fn save_audio_data( 820ca99c20SHarlan &mut self, 838e71d710SHarlan chunk_body: &BytesMut, 840ca99c20SHarlan timestamp: u32, 850ca99c20SHarlan ) -> Result<(), CacheError> { 868e71d710SHarlan let channel_data = FrameData::Audio { 870ca99c20SHarlan timestamp, 880ca99c20SHarlan data: chunk_body.clone(), 890ca99c20SHarlan }; 908baa1d3cSHarlan self.gops.save_frame_data(channel_data, false); 910ca99c20SHarlan 92*13bac29aSHarlan let mut reader = BytesReader::new(chunk_body.clone()); 93*13bac29aSHarlan let tag_header = AudioTagHeader::unmarshal(&mut reader)?; 94ccd9a1faSHarlan 95*13bac29aSHarlan if tag_header.sound_format == define::SoundFormat::AAC as u8 96*13bac29aSHarlan && tag_header.aac_packet_type == define::aac_packet_type::AAC_SEQHDR 970ca99c20SHarlan { 98ccd9a1faSHarlan self.audio_seq = chunk_body.clone(); 990ca99c20SHarlan self.audio_timestamp = timestamp; 100ccd9a1faSHarlan 101ccd9a1faSHarlan let mut aac_processor = Mpeg4AacProcessor::default(); 102ccd9a1faSHarlan let aac = aac_processor 103*13bac29aSHarlan .extend_data(reader.extract_remaining_bytes()) 104ccd9a1faSHarlan .audio_specific_config_load()?; 105ccd9a1faSHarlan self.av_statistics 106ccd9a1faSHarlan .notify_audio_codec_info(&aac.mpeg4_aac) 107ccd9a1faSHarlan .await; 1080ca99c20SHarlan } 1090ca99c20SHarlan 110ccd9a1faSHarlan self.av_statistics 111*13bac29aSHarlan .notify_audio_statistics_info(chunk_body.len(), tag_header.aac_packet_type) 112ccd9a1faSHarlan .await; 113ccd9a1faSHarlan 1140ca99c20SHarlan Ok(()) 1150ca99c20SHarlan } 1160ca99c20SHarlan get_audio_seq(&self) -> Option<FrameData>1178e71d710SHarlan pub fn get_audio_seq(&self) -> Option<FrameData> { 1180ca99c20SHarlan if !self.audio_seq.is_empty() { 1198e71d710SHarlan return Some(FrameData::Audio { 1200ca99c20SHarlan timestamp: self.audio_timestamp, 1210ca99c20SHarlan data: self.audio_seq.clone(), 1220ca99c20SHarlan }); 1230ca99c20SHarlan } 1240ca99c20SHarlan None 1250ca99c20SHarlan } 1260ca99c20SHarlan get_video_seq(&self) -> Option<FrameData>1278e71d710SHarlan pub fn get_video_seq(&self) -> Option<FrameData> { 1280ca99c20SHarlan if !self.video_seq.is_empty() { 1298e71d710SHarlan return Some(FrameData::Video { 1300ca99c20SHarlan timestamp: self.video_timestamp, 1310ca99c20SHarlan data: self.video_seq.clone(), 1320ca99c20SHarlan }); 1330ca99c20SHarlan } 1340ca99c20SHarlan None 1350ca99c20SHarlan } 136ccd9a1faSHarlan //save video gops and sequence header information save_video_data( &mut self, chunk_body: &BytesMut, timestamp: u32, ) -> Result<(), CacheError>137ccd9a1faSHarlan pub async fn save_video_data( 1380ca99c20SHarlan &mut self, 1398e71d710SHarlan chunk_body: &BytesMut, 1400ca99c20SHarlan timestamp: u32, 1410ca99c20SHarlan ) -> Result<(), CacheError> { 1428e71d710SHarlan let channel_data = FrameData::Video { 1430ca99c20SHarlan timestamp, 1440ca99c20SHarlan data: chunk_body.clone(), 1450ca99c20SHarlan }; 146*13bac29aSHarlan 147*13bac29aSHarlan let mut reader = BytesReader::new(chunk_body.clone()); 148*13bac29aSHarlan let tag_header = VideoTagHeader::unmarshal(&mut reader)?; 149*13bac29aSHarlan 150*13bac29aSHarlan let is_key_frame = tag_header.frame_type == define::frame_type::KEY_FRAME; 1518baa1d3cSHarlan self.gops.save_frame_data(channel_data, is_key_frame); 1520ca99c20SHarlan 153*13bac29aSHarlan if is_key_frame && tag_header.avc_packet_type == define::avc_packet_type::AVC_SEQHDR { 154ccd9a1faSHarlan let mut avc_processor = Mpeg4AvcProcessor::default(); 155*13bac29aSHarlan avc_processor.decoder_configuration_record_load(&mut reader)?; 156ccd9a1faSHarlan 157ccd9a1faSHarlan self.av_statistics 158ccd9a1faSHarlan .notify_video_codec_info(&avc_processor.mpeg4_avc) 159ccd9a1faSHarlan .await; 160ccd9a1faSHarlan 161ccd9a1faSHarlan self.video_seq = chunk_body.clone(); 1620ca99c20SHarlan self.video_timestamp = timestamp; 1630ca99c20SHarlan } 1640ca99c20SHarlan 165ccd9a1faSHarlan self.av_statistics 166ccd9a1faSHarlan .notify_video_statistics_info(chunk_body.len(), is_key_frame) 167ccd9a1faSHarlan .await; 168ccd9a1faSHarlan 1690ca99c20SHarlan Ok(()) 1700ca99c20SHarlan } 1710ca99c20SHarlan get_gops_data(&self) -> Option<VecDeque<Gop>>172ccd9a1faSHarlan pub fn get_gops_data(&self) -> Option<VecDeque<Gop>> { 1738baa1d3cSHarlan if self.gops.setted() { 1748baa1d3cSHarlan Some(self.gops.get_gops()) 1750ca99c20SHarlan } else { 1760ca99c20SHarlan None 1770ca99c20SHarlan } 1780ca99c20SHarlan } 1790ca99c20SHarlan } 180