1 pub mod errors; 2 pub mod gop; 3 pub mod metadata; 4 5 use { 6 self::gop::Gops, 7 bytes::BytesMut, 8 bytesio::bytes_reader::BytesReader, 9 errors::CacheError, 10 gop::Gop, 11 std::collections::VecDeque, 12 streamhub::define::FrameData, 13 streamhub::statistics::avstatistics::AvStatistics, 14 streamhub::stream::StreamIdentifier, 15 xflv::{ 16 define, 17 flv_tag_header::{AudioTagHeader, VideoTagHeader}, 18 mpeg4_aac::Mpeg4AacProcessor, 19 mpeg4_avc::Mpeg4AvcProcessor, 20 Unmarshal, 21 }, 22 }; 23 24 // #[derive(Clone)] 25 pub struct Cache { 26 metadata: metadata::MetaData, 27 metadata_timestamp: u32, 28 video_seq: BytesMut, 29 video_timestamp: u32, 30 audio_seq: BytesMut, 31 audio_timestamp: u32, 32 gops: Gops, 33 pub av_statistics: AvStatistics, 34 } 35 36 impl Drop for Cache { 37 #[allow(unused_must_use)] drop(&mut self)38 fn drop(&mut self) { 39 self.av_statistics.sender.send(true); 40 } 41 } 42 43 impl Cache { new(app_name: String, stream_name: String, gop_num: usize) -> Self44 pub fn new(app_name: String, stream_name: String, gop_num: usize) -> Self { 45 let identifier = StreamIdentifier::Rtmp { 46 app_name, 47 stream_name, 48 }; 49 let mut cache = Cache { 50 metadata: metadata::MetaData::new(), 51 metadata_timestamp: 0, 52 video_seq: BytesMut::new(), 53 video_timestamp: 0, 54 audio_seq: BytesMut::new(), 55 audio_timestamp: 0, 56 gops: Gops::new(gop_num), 57 av_statistics: AvStatistics::new(identifier), 58 }; 59 cache.av_statistics.start(); 60 cache 61 } 62 63 //, values: Vec<Amf0ValueType> save_metadata(&mut self, chunk_body: &BytesMut, timestamp: u32)64 pub fn save_metadata(&mut self, chunk_body: &BytesMut, timestamp: u32) { 65 self.metadata.save(chunk_body); 66 self.metadata_timestamp = timestamp; 67 } 68 get_metadata(&self) -> Option<FrameData>69 pub fn get_metadata(&self) -> Option<FrameData> { 70 let data = self.metadata.get_chunk_body(); 71 if !data.is_empty() { 72 Some(FrameData::MetaData { 73 timestamp: self.metadata_timestamp, 74 data, 75 }) 76 } else { 77 None 78 } 79 } 80 //save audio gops and sequence header information save_audio_data( &mut self, chunk_body: &BytesMut, timestamp: u32, ) -> Result<(), CacheError>81 pub async fn save_audio_data( 82 &mut self, 83 chunk_body: &BytesMut, 84 timestamp: u32, 85 ) -> Result<(), CacheError> { 86 let channel_data = FrameData::Audio { 87 timestamp, 88 data: chunk_body.clone(), 89 }; 90 self.gops.save_frame_data(channel_data, false); 91 92 let mut reader = BytesReader::new(chunk_body.clone()); 93 let tag_header = AudioTagHeader::unmarshal(&mut reader)?; 94 95 if tag_header.sound_format == define::SoundFormat::AAC as u8 96 && tag_header.aac_packet_type == define::aac_packet_type::AAC_SEQHDR 97 { 98 self.audio_seq = chunk_body.clone(); 99 self.audio_timestamp = timestamp; 100 101 let mut aac_processor = Mpeg4AacProcessor::default(); 102 let aac = aac_processor 103 .extend_data(reader.extract_remaining_bytes()) 104 .audio_specific_config_load()?; 105 self.av_statistics 106 .notify_audio_codec_info(&aac.mpeg4_aac) 107 .await; 108 } 109 110 self.av_statistics 111 .notify_audio_statistics_info(chunk_body.len(), tag_header.aac_packet_type) 112 .await; 113 114 Ok(()) 115 } 116 get_audio_seq(&self) -> Option<FrameData>117 pub fn get_audio_seq(&self) -> Option<FrameData> { 118 if !self.audio_seq.is_empty() { 119 return Some(FrameData::Audio { 120 timestamp: self.audio_timestamp, 121 data: self.audio_seq.clone(), 122 }); 123 } 124 None 125 } 126 get_video_seq(&self) -> Option<FrameData>127 pub fn get_video_seq(&self) -> Option<FrameData> { 128 if !self.video_seq.is_empty() { 129 return Some(FrameData::Video { 130 timestamp: self.video_timestamp, 131 data: self.video_seq.clone(), 132 }); 133 } 134 None 135 } 136 //save video gops and sequence header information save_video_data( &mut self, chunk_body: &BytesMut, timestamp: u32, ) -> Result<(), CacheError>137 pub async fn save_video_data( 138 &mut self, 139 chunk_body: &BytesMut, 140 timestamp: u32, 141 ) -> Result<(), CacheError> { 142 let channel_data = FrameData::Video { 143 timestamp, 144 data: chunk_body.clone(), 145 }; 146 147 let mut reader = BytesReader::new(chunk_body.clone()); 148 let tag_header = VideoTagHeader::unmarshal(&mut reader)?; 149 150 let is_key_frame = tag_header.frame_type == define::frame_type::KEY_FRAME; 151 self.gops.save_frame_data(channel_data, is_key_frame); 152 153 if is_key_frame && tag_header.avc_packet_type == define::avc_packet_type::AVC_SEQHDR { 154 let mut avc_processor = Mpeg4AvcProcessor::default(); 155 avc_processor.decoder_configuration_record_load(&mut reader)?; 156 157 self.av_statistics 158 .notify_video_codec_info(&avc_processor.mpeg4_avc) 159 .await; 160 161 self.video_seq = chunk_body.clone(); 162 self.video_timestamp = timestamp; 163 } 164 165 self.av_statistics 166 .notify_video_statistics_info(chunk_body.len(), is_key_frame) 167 .await; 168 169 Ok(()) 170 } 171 get_gops_data(&self) -> Option<VecDeque<Gop>>172 pub fn get_gops_data(&self) -> Option<VecDeque<Gop>> { 173 if self.gops.setted() { 174 Some(self.gops.get_gops()) 175 } else { 176 None 177 } 178 } 179 } 180