1 pub mod errors; 2 pub mod gop; 3 pub mod metadata; 4 5 use { 6 self::gop::Gops, 7 bytes::BytesMut, 8 errors::CacheError, 9 gop::Gop, 10 std::collections::VecDeque, 11 streamhub::define::FrameData, 12 streamhub::statistics::avstatistics::AvStatistics, 13 streamhub::stream::StreamIdentifier, 14 xflv::{define, demuxer_tag, mpeg4_aac::Mpeg4AacProcessor, mpeg4_avc::Mpeg4AvcProcessor}, 15 }; 16 17 // #[derive(Clone)] 18 pub struct Cache { 19 metadata: metadata::MetaData, 20 metadata_timestamp: u32, 21 video_seq: BytesMut, 22 video_timestamp: u32, 23 audio_seq: BytesMut, 24 audio_timestamp: u32, 25 gops: Gops, 26 pub av_statistics: AvStatistics, 27 } 28 29 impl Drop for Cache { 30 #[allow(unused_must_use)] 31 fn drop(&mut self) { 32 self.av_statistics.sender.send(true); 33 } 34 } 35 36 impl Cache { 37 pub fn new(app_name: String, stream_name: String, gop_num: usize) -> Self { 38 let identifier = StreamIdentifier::Rtmp { 39 app_name, 40 stream_name, 41 }; 42 let mut cache = Cache { 43 metadata: metadata::MetaData::new(), 44 metadata_timestamp: 0, 45 video_seq: BytesMut::new(), 46 video_timestamp: 0, 47 audio_seq: BytesMut::new(), 48 audio_timestamp: 0, 49 gops: Gops::new(gop_num), 50 av_statistics: AvStatistics::new(identifier), 51 }; 52 cache.av_statistics.start(); 53 cache 54 } 55 56 //, values: Vec<Amf0ValueType> 57 pub fn save_metadata(&mut self, chunk_body: &BytesMut, timestamp: u32) { 58 self.metadata.save(chunk_body); 59 self.metadata_timestamp = timestamp; 60 } 61 62 pub fn get_metadata(&self) -> Option<FrameData> { 63 let data = self.metadata.get_chunk_body(); 64 if !data.is_empty() { 65 Some(FrameData::MetaData { 66 timestamp: self.metadata_timestamp, 67 data, 68 }) 69 } else { 70 None 71 } 72 } 73 //save audio gops and sequence header information 74 pub async fn save_audio_data( 75 &mut self, 76 chunk_body: &BytesMut, 77 timestamp: u32, 78 ) -> Result<(), CacheError> { 79 let channel_data = FrameData::Audio { 80 timestamp, 81 data: chunk_body.clone(), 82 }; 83 self.gops.save_frame_data(channel_data, false); 84 85 let mut parser = demuxer_tag::AudioTagHeaderDemuxer::new(chunk_body.clone()); 86 let tag = parser.parse_tag_header()?; 87 88 if tag.sound_format == define::SoundFormat::AAC as u8 89 && tag.aac_packet_type == define::aac_packet_type::AAC_SEQHDR 90 { 91 self.audio_seq = chunk_body.clone(); 92 self.audio_timestamp = timestamp; 93 94 let mut aac_processor = Mpeg4AacProcessor::default(); 95 let aac = aac_processor 96 .extend_data(parser.get_remaining_bytes()) 97 .audio_specific_config_load()?; 98 self.av_statistics 99 .notify_audio_codec_info(&aac.mpeg4_aac) 100 .await; 101 } 102 103 self.av_statistics 104 .notify_audio_statistics_info(chunk_body.len(), tag.aac_packet_type) 105 .await; 106 107 Ok(()) 108 } 109 110 pub fn get_audio_seq(&self) -> Option<FrameData> { 111 if !self.audio_seq.is_empty() { 112 return Some(FrameData::Audio { 113 timestamp: self.audio_timestamp, 114 data: self.audio_seq.clone(), 115 }); 116 } 117 None 118 } 119 120 pub fn get_video_seq(&self) -> Option<FrameData> { 121 if !self.video_seq.is_empty() { 122 return Some(FrameData::Video { 123 timestamp: self.video_timestamp, 124 data: self.video_seq.clone(), 125 }); 126 } 127 None 128 } 129 //save video gops and sequence header information 130 pub async fn save_video_data( 131 &mut self, 132 chunk_body: &BytesMut, 133 timestamp: u32, 134 ) -> Result<(), CacheError> { 135 let mut parser = demuxer_tag::VideoTagHeaderDemuxer::new(chunk_body.clone()); 136 let tag = parser.parse_tag_header()?; 137 138 let channel_data = FrameData::Video { 139 timestamp, 140 data: chunk_body.clone(), 141 }; 142 let is_key_frame = tag.frame_type == define::frame_type::KEY_FRAME; 143 self.gops.save_frame_data(channel_data, is_key_frame); 144 145 if is_key_frame && tag.avc_packet_type == define::avc_packet_type::AVC_SEQHDR { 146 let mut avc_processor = Mpeg4AvcProcessor::default(); 147 avc_processor 148 .extend_data(parser.get_remaining_bytes()) 149 .decoder_configuration_record_load()?; 150 151 self.av_statistics 152 .notify_video_codec_info(&avc_processor.mpeg4_avc) 153 .await; 154 155 self.video_seq = chunk_body.clone(); 156 self.video_timestamp = timestamp; 157 } 158 159 self.av_statistics 160 .notify_video_statistics_info(chunk_body.len(), is_key_frame) 161 .await; 162 163 Ok(()) 164 } 165 166 pub fn get_gops_data(&self) -> Option<VecDeque<Gop>> { 167 if self.gops.setted() { 168 Some(self.gops.get_gops()) 169 } else { 170 None 171 } 172 } 173 } 174