xref: /xiu/protocol/rtmp/src/cache/mod.rs (revision ccd9a1fa)
1 pub mod errors;
2 pub mod gop;
3 pub mod metadata;
4 
5 use {
6     self::gop::Gops,
7     super::statistics::avstatistics::AvStatistics,
8     crate::channels::define::ChannelData,
9     bytes::BytesMut,
10     errors::CacheError,
11     gop::Gop,
12     std::collections::VecDeque,
13     xflv::{define, demuxer_tag, mpeg4_aac::Mpeg4AacProcessor, mpeg4_avc::Mpeg4AvcProcessor},
14 };
15 
16 // #[derive(Clone)]
17 pub struct Cache {
18     metadata: metadata::MetaData,
19     metadata_timestamp: u32,
20     video_seq: BytesMut,
21     video_timestamp: u32,
22     audio_seq: BytesMut,
23     audio_timestamp: u32,
24     gops: Gops,
25     av_statistics: AvStatistics,
26 }
27 
28 impl Drop for Cache {
29     #[allow(unused_must_use)]
30     fn drop(&mut self) {
31         self.av_statistics.sender.send(true);
32     }
33 }
34 
35 impl Cache {
36     pub fn new(gop_num: usize) -> Self {
37         let mut cache = Cache {
38             metadata: metadata::MetaData::new(),
39             metadata_timestamp: 0,
40             video_seq: BytesMut::new(),
41             video_timestamp: 0,
42             audio_seq: BytesMut::new(),
43             audio_timestamp: 0,
44             gops: Gops::new(gop_num),
45             av_statistics: AvStatistics::new(),
46         };
47         cache.av_statistics.start();
48         cache
49     }
50 
51     //, values: Vec<Amf0ValueType>
52     pub fn save_metadata(&mut self, chunk_body: BytesMut, timestamp: u32) {
53         self.metadata.save(chunk_body);
54         self.metadata_timestamp = timestamp;
55     }
56 
57     pub fn get_metadata(&self) -> Option<ChannelData> {
58         let data = self.metadata.get_chunk_body();
59         if !data.is_empty() {
60             Some(ChannelData::MetaData {
61                 timestamp: self.metadata_timestamp,
62                 data,
63             })
64         } else {
65             None
66         }
67     }
68     //save audio gops and sequence header information
69     pub async fn save_audio_data(
70         &mut self,
71         chunk_body: BytesMut,
72         timestamp: u32,
73     ) -> Result<(), CacheError> {
74         let channel_data = ChannelData::Audio {
75             timestamp,
76             data: chunk_body.clone(),
77         };
78         self.gops.save_frame_data(channel_data, false);
79 
80         let mut parser = demuxer_tag::AudioTagHeaderDemuxer::new(chunk_body.clone());
81         let tag = parser.parse_tag_header()?;
82 
83         if tag.sound_format == define::SoundFormat::AAC as u8
84             && tag.aac_packet_type == define::aac_packet_type::AAC_SEQHDR
85         {
86             self.audio_seq = chunk_body.clone();
87             self.audio_timestamp = timestamp;
88 
89             let mut aac_processor = Mpeg4AacProcessor::default();
90             let aac = aac_processor
91                 .extend_data(parser.get_remaining_bytes())
92                 .audio_specific_config_load()?;
93             self.av_statistics
94                 .notify_audio_codec_info(&aac.mpeg4_aac)
95                 .await;
96         }
97 
98         self.av_statistics
99             .notify_audio_statistics_info(chunk_body.len(), tag.aac_packet_type)
100             .await;
101 
102         Ok(())
103     }
104 
105     pub fn get_audio_seq(&self) -> Option<ChannelData> {
106         if !self.audio_seq.is_empty() {
107             return Some(ChannelData::Audio {
108                 timestamp: self.audio_timestamp,
109                 data: self.audio_seq.clone(),
110             });
111         }
112         None
113     }
114 
115     pub fn get_video_seq(&self) -> Option<ChannelData> {
116         if !self.video_seq.is_empty() {
117             return Some(ChannelData::Video {
118                 timestamp: self.video_timestamp,
119                 data: self.video_seq.clone(),
120             });
121         }
122         None
123     }
124     //save video gops and sequence header information
125     pub async fn save_video_data(
126         &mut self,
127         chunk_body: BytesMut,
128         timestamp: u32,
129     ) -> Result<(), CacheError> {
130         let mut parser = demuxer_tag::VideoTagHeaderDemuxer::new(chunk_body.clone());
131         let tag = parser.parse_tag_header()?;
132 
133         let channel_data = ChannelData::Video {
134             timestamp,
135             data: chunk_body.clone(),
136         };
137         let is_key_frame = tag.frame_type == define::frame_type::KEY_FRAME;
138         self.gops.save_frame_data(channel_data, is_key_frame);
139 
140         if is_key_frame && tag.avc_packet_type == define::avc_packet_type::AVC_SEQHDR {
141             let mut avc_processor = Mpeg4AvcProcessor::default();
142             avc_processor
143                 .extend_data(parser.get_remaining_bytes())
144                 .decoder_configuration_record_load()?;
145 
146             self.av_statistics
147                 .notify_video_codec_info(&avc_processor.mpeg4_avc)
148                 .await;
149 
150             self.video_seq = chunk_body.clone();
151             self.video_timestamp = timestamp;
152         }
153 
154         self.av_statistics
155             .notify_video_statistics_info(chunk_body.len(), is_key_frame)
156             .await;
157 
158         Ok(())
159     }
160 
161     pub fn get_gops_data(&self) -> Option<VecDeque<Gop>> {
162         if self.gops.setted() {
163             Some(self.gops.get_gops())
164         } else {
165             None
166         }
167     }
168 }
169