xref: /xiu/protocol/rtmp/src/cache/mod.rs (revision 13bac29a)
17adc8486SHarlanC pub mod errors;
2d20ba44cSwawacry pub mod gop;
3d20ba44cSwawacry pub mod metadata;
40ca99c20SHarlan 
50ca99c20SHarlan use {
68baa1d3cSHarlan     self::gop::Gops,
70ca99c20SHarlan     bytes::BytesMut,
8*13bac29aSHarlan     bytesio::bytes_reader::BytesReader,
90ca99c20SHarlan     errors::CacheError,
100ca99c20SHarlan     gop::Gop,
118baa1d3cSHarlan     std::collections::VecDeque,
128e71d710SHarlan     streamhub::define::FrameData,
138e71d710SHarlan     streamhub::statistics::avstatistics::AvStatistics,
148e71d710SHarlan     streamhub::stream::StreamIdentifier,
15*13bac29aSHarlan     xflv::{
16*13bac29aSHarlan         define,
17*13bac29aSHarlan         flv_tag_header::{AudioTagHeader, VideoTagHeader},
18*13bac29aSHarlan         mpeg4_aac::Mpeg4AacProcessor,
19*13bac29aSHarlan         mpeg4_avc::Mpeg4AvcProcessor,
20*13bac29aSHarlan         Unmarshal,
21*13bac29aSHarlan     },
220ca99c20SHarlan };
23ccd9a1faSHarlan 
24ccd9a1faSHarlan // #[derive(Clone)]
250ca99c20SHarlan pub struct Cache {
260ca99c20SHarlan     metadata: metadata::MetaData,
270ca99c20SHarlan     metadata_timestamp: u32,
280ca99c20SHarlan     video_seq: BytesMut,
290ca99c20SHarlan     video_timestamp: u32,
300ca99c20SHarlan     audio_seq: BytesMut,
310ca99c20SHarlan     audio_timestamp: u32,
328baa1d3cSHarlan     gops: Gops,
33f59e6ff6SHarlan     pub av_statistics: AvStatistics,
340ca99c20SHarlan }
350ca99c20SHarlan 
36ccd9a1faSHarlan impl Drop for Cache {
37ccd9a1faSHarlan     #[allow(unused_must_use)]
drop(&mut self)38ccd9a1faSHarlan     fn drop(&mut self) {
39ccd9a1faSHarlan         self.av_statistics.sender.send(true);
400ca99c20SHarlan     }
410ca99c20SHarlan }
420ca99c20SHarlan 
430ca99c20SHarlan impl Cache {
new(app_name: String, stream_name: String, gop_num: usize) -> Self44e9407847SHarlan     pub fn new(app_name: String, stream_name: String, gop_num: usize) -> Self {
458e71d710SHarlan         let identifier = StreamIdentifier::Rtmp {
468e71d710SHarlan             app_name,
478e71d710SHarlan             stream_name,
488e71d710SHarlan         };
49ccd9a1faSHarlan         let mut cache = Cache {
500ca99c20SHarlan             metadata: metadata::MetaData::new(),
510ca99c20SHarlan             metadata_timestamp: 0,
520ca99c20SHarlan             video_seq: BytesMut::new(),
530ca99c20SHarlan             video_timestamp: 0,
540ca99c20SHarlan             audio_seq: BytesMut::new(),
550ca99c20SHarlan             audio_timestamp: 0,
568baa1d3cSHarlan             gops: Gops::new(gop_num),
578e71d710SHarlan             av_statistics: AvStatistics::new(identifier),
58ccd9a1faSHarlan         };
59ccd9a1faSHarlan         cache.av_statistics.start();
60ccd9a1faSHarlan         cache
610ca99c20SHarlan     }
620ca99c20SHarlan 
630ca99c20SHarlan     //, values: Vec<Amf0ValueType>
save_metadata(&mut self, chunk_body: &BytesMut, timestamp: u32)648e71d710SHarlan     pub fn save_metadata(&mut self, chunk_body: &BytesMut, timestamp: u32) {
650ca99c20SHarlan         self.metadata.save(chunk_body);
660ca99c20SHarlan         self.metadata_timestamp = timestamp;
670ca99c20SHarlan     }
680ca99c20SHarlan 
get_metadata(&self) -> Option<FrameData>698e71d710SHarlan     pub fn get_metadata(&self) -> Option<FrameData> {
700ca99c20SHarlan         let data = self.metadata.get_chunk_body();
710ca99c20SHarlan         if !data.is_empty() {
728e71d710SHarlan             Some(FrameData::MetaData {
730ca99c20SHarlan                 timestamp: self.metadata_timestamp,
740ca99c20SHarlan                 data,
750ca99c20SHarlan             })
760ca99c20SHarlan         } else {
770ca99c20SHarlan             None
780ca99c20SHarlan         }
790ca99c20SHarlan     }
80ccd9a1faSHarlan     //save audio gops and sequence header information
save_audio_data( &mut self, chunk_body: &BytesMut, timestamp: u32, ) -> Result<(), CacheError>81ccd9a1faSHarlan     pub async fn save_audio_data(
820ca99c20SHarlan         &mut self,
838e71d710SHarlan         chunk_body: &BytesMut,
840ca99c20SHarlan         timestamp: u32,
850ca99c20SHarlan     ) -> Result<(), CacheError> {
868e71d710SHarlan         let channel_data = FrameData::Audio {
870ca99c20SHarlan             timestamp,
880ca99c20SHarlan             data: chunk_body.clone(),
890ca99c20SHarlan         };
908baa1d3cSHarlan         self.gops.save_frame_data(channel_data, false);
910ca99c20SHarlan 
92*13bac29aSHarlan         let mut reader = BytesReader::new(chunk_body.clone());
93*13bac29aSHarlan         let tag_header = AudioTagHeader::unmarshal(&mut reader)?;
94ccd9a1faSHarlan 
95*13bac29aSHarlan         if tag_header.sound_format == define::SoundFormat::AAC as u8
96*13bac29aSHarlan             && tag_header.aac_packet_type == define::aac_packet_type::AAC_SEQHDR
970ca99c20SHarlan         {
98ccd9a1faSHarlan             self.audio_seq = chunk_body.clone();
990ca99c20SHarlan             self.audio_timestamp = timestamp;
100ccd9a1faSHarlan 
101ccd9a1faSHarlan             let mut aac_processor = Mpeg4AacProcessor::default();
102ccd9a1faSHarlan             let aac = aac_processor
103*13bac29aSHarlan                 .extend_data(reader.extract_remaining_bytes())
104ccd9a1faSHarlan                 .audio_specific_config_load()?;
105ccd9a1faSHarlan             self.av_statistics
106ccd9a1faSHarlan                 .notify_audio_codec_info(&aac.mpeg4_aac)
107ccd9a1faSHarlan                 .await;
1080ca99c20SHarlan         }
1090ca99c20SHarlan 
110ccd9a1faSHarlan         self.av_statistics
111*13bac29aSHarlan             .notify_audio_statistics_info(chunk_body.len(), tag_header.aac_packet_type)
112ccd9a1faSHarlan             .await;
113ccd9a1faSHarlan 
1140ca99c20SHarlan         Ok(())
1150ca99c20SHarlan     }
1160ca99c20SHarlan 
get_audio_seq(&self) -> Option<FrameData>1178e71d710SHarlan     pub fn get_audio_seq(&self) -> Option<FrameData> {
1180ca99c20SHarlan         if !self.audio_seq.is_empty() {
1198e71d710SHarlan             return Some(FrameData::Audio {
1200ca99c20SHarlan                 timestamp: self.audio_timestamp,
1210ca99c20SHarlan                 data: self.audio_seq.clone(),
1220ca99c20SHarlan             });
1230ca99c20SHarlan         }
1240ca99c20SHarlan         None
1250ca99c20SHarlan     }
1260ca99c20SHarlan 
get_video_seq(&self) -> Option<FrameData>1278e71d710SHarlan     pub fn get_video_seq(&self) -> Option<FrameData> {
1280ca99c20SHarlan         if !self.video_seq.is_empty() {
1298e71d710SHarlan             return Some(FrameData::Video {
1300ca99c20SHarlan                 timestamp: self.video_timestamp,
1310ca99c20SHarlan                 data: self.video_seq.clone(),
1320ca99c20SHarlan             });
1330ca99c20SHarlan         }
1340ca99c20SHarlan         None
1350ca99c20SHarlan     }
136ccd9a1faSHarlan     //save video gops and sequence header information
save_video_data( &mut self, chunk_body: &BytesMut, timestamp: u32, ) -> Result<(), CacheError>137ccd9a1faSHarlan     pub async fn save_video_data(
1380ca99c20SHarlan         &mut self,
1398e71d710SHarlan         chunk_body: &BytesMut,
1400ca99c20SHarlan         timestamp: u32,
1410ca99c20SHarlan     ) -> Result<(), CacheError> {
1428e71d710SHarlan         let channel_data = FrameData::Video {
1430ca99c20SHarlan             timestamp,
1440ca99c20SHarlan             data: chunk_body.clone(),
1450ca99c20SHarlan         };
146*13bac29aSHarlan 
147*13bac29aSHarlan         let mut reader = BytesReader::new(chunk_body.clone());
148*13bac29aSHarlan         let tag_header = VideoTagHeader::unmarshal(&mut reader)?;
149*13bac29aSHarlan 
150*13bac29aSHarlan         let is_key_frame = tag_header.frame_type == define::frame_type::KEY_FRAME;
1518baa1d3cSHarlan         self.gops.save_frame_data(channel_data, is_key_frame);
1520ca99c20SHarlan 
153*13bac29aSHarlan         if is_key_frame && tag_header.avc_packet_type == define::avc_packet_type::AVC_SEQHDR {
154ccd9a1faSHarlan             let mut avc_processor = Mpeg4AvcProcessor::default();
155*13bac29aSHarlan             avc_processor.decoder_configuration_record_load(&mut reader)?;
156ccd9a1faSHarlan 
157ccd9a1faSHarlan             self.av_statistics
158ccd9a1faSHarlan                 .notify_video_codec_info(&avc_processor.mpeg4_avc)
159ccd9a1faSHarlan                 .await;
160ccd9a1faSHarlan 
161ccd9a1faSHarlan             self.video_seq = chunk_body.clone();
1620ca99c20SHarlan             self.video_timestamp = timestamp;
1630ca99c20SHarlan         }
1640ca99c20SHarlan 
165ccd9a1faSHarlan         self.av_statistics
166ccd9a1faSHarlan             .notify_video_statistics_info(chunk_body.len(), is_key_frame)
167ccd9a1faSHarlan             .await;
168ccd9a1faSHarlan 
1690ca99c20SHarlan         Ok(())
1700ca99c20SHarlan     }
1710ca99c20SHarlan 
get_gops_data(&self) -> Option<VecDeque<Gop>>172ccd9a1faSHarlan     pub fn get_gops_data(&self) -> Option<VecDeque<Gop>> {
1738baa1d3cSHarlan         if self.gops.setted() {
1748baa1d3cSHarlan             Some(self.gops.get_gops())
1750ca99c20SHarlan         } else {
1760ca99c20SHarlan             None
1770ca99c20SHarlan         }
1780ca99c20SHarlan     }
1790ca99c20SHarlan }
180