xref: /xiu/protocol/rtmp/src/cache/mod.rs (revision 8e71d710)
1 pub mod errors;
2 pub mod gop;
3 pub mod metadata;
4 
5 use {
6     self::gop::Gops,
7     bytes::BytesMut,
8     errors::CacheError,
9     gop::Gop,
10     std::collections::VecDeque,
11     streamhub::define::FrameData,
12     streamhub::statistics::avstatistics::AvStatistics,
13     streamhub::stream::StreamIdentifier,
14     xflv::{define, demuxer_tag, mpeg4_aac::Mpeg4AacProcessor, mpeg4_avc::Mpeg4AvcProcessor},
15 };
16 
17 // #[derive(Clone)]
18 pub struct Cache {
19     metadata: metadata::MetaData,
20     metadata_timestamp: u32,
21     video_seq: BytesMut,
22     video_timestamp: u32,
23     audio_seq: BytesMut,
24     audio_timestamp: u32,
25     gops: Gops,
26     pub av_statistics: AvStatistics,
27 }
28 
29 impl Drop for Cache {
30     #[allow(unused_must_use)]
31     fn drop(&mut self) {
32         self.av_statistics.sender.send(true);
33     }
34 }
35 
36 impl Cache {
37     pub fn new(app_name: String, stream_name: String, gop_num: usize) -> Self {
38         let identifier = StreamIdentifier::Rtmp {
39             app_name,
40             stream_name,
41         };
42         let mut cache = Cache {
43             metadata: metadata::MetaData::new(),
44             metadata_timestamp: 0,
45             video_seq: BytesMut::new(),
46             video_timestamp: 0,
47             audio_seq: BytesMut::new(),
48             audio_timestamp: 0,
49             gops: Gops::new(gop_num),
50             av_statistics: AvStatistics::new(identifier),
51         };
52         cache.av_statistics.start();
53         cache
54     }
55 
56     //, values: Vec<Amf0ValueType>
57     pub fn save_metadata(&mut self, chunk_body: &BytesMut, timestamp: u32) {
58         self.metadata.save(chunk_body);
59         self.metadata_timestamp = timestamp;
60     }
61 
62     pub fn get_metadata(&self) -> Option<FrameData> {
63         let data = self.metadata.get_chunk_body();
64         if !data.is_empty() {
65             Some(FrameData::MetaData {
66                 timestamp: self.metadata_timestamp,
67                 data,
68             })
69         } else {
70             None
71         }
72     }
73     //save audio gops and sequence header information
74     pub async fn save_audio_data(
75         &mut self,
76         chunk_body: &BytesMut,
77         timestamp: u32,
78     ) -> Result<(), CacheError> {
79         let channel_data = FrameData::Audio {
80             timestamp,
81             data: chunk_body.clone(),
82         };
83         self.gops.save_frame_data(channel_data, false);
84 
85         let mut parser = demuxer_tag::AudioTagHeaderDemuxer::new(chunk_body.clone());
86         let tag = parser.parse_tag_header()?;
87 
88         if tag.sound_format == define::SoundFormat::AAC as u8
89             && tag.aac_packet_type == define::aac_packet_type::AAC_SEQHDR
90         {
91             self.audio_seq = chunk_body.clone();
92             self.audio_timestamp = timestamp;
93 
94             let mut aac_processor = Mpeg4AacProcessor::default();
95             let aac = aac_processor
96                 .extend_data(parser.get_remaining_bytes())
97                 .audio_specific_config_load()?;
98             self.av_statistics
99                 .notify_audio_codec_info(&aac.mpeg4_aac)
100                 .await;
101         }
102 
103         self.av_statistics
104             .notify_audio_statistics_info(chunk_body.len(), tag.aac_packet_type)
105             .await;
106 
107         Ok(())
108     }
109 
110     pub fn get_audio_seq(&self) -> Option<FrameData> {
111         if !self.audio_seq.is_empty() {
112             return Some(FrameData::Audio {
113                 timestamp: self.audio_timestamp,
114                 data: self.audio_seq.clone(),
115             });
116         }
117         None
118     }
119 
120     pub fn get_video_seq(&self) -> Option<FrameData> {
121         if !self.video_seq.is_empty() {
122             return Some(FrameData::Video {
123                 timestamp: self.video_timestamp,
124                 data: self.video_seq.clone(),
125             });
126         }
127         None
128     }
129     //save video gops and sequence header information
130     pub async fn save_video_data(
131         &mut self,
132         chunk_body: &BytesMut,
133         timestamp: u32,
134     ) -> Result<(), CacheError> {
135         let mut parser = demuxer_tag::VideoTagHeaderDemuxer::new(chunk_body.clone());
136         let tag = parser.parse_tag_header()?;
137 
138         let channel_data = FrameData::Video {
139             timestamp,
140             data: chunk_body.clone(),
141         };
142         let is_key_frame = tag.frame_type == define::frame_type::KEY_FRAME;
143         self.gops.save_frame_data(channel_data, is_key_frame);
144 
145         if is_key_frame && tag.avc_packet_type == define::avc_packet_type::AVC_SEQHDR {
146             let mut avc_processor = Mpeg4AvcProcessor::default();
147             avc_processor
148                 .extend_data(parser.get_remaining_bytes())
149                 .decoder_configuration_record_load()?;
150 
151             self.av_statistics
152                 .notify_video_codec_info(&avc_processor.mpeg4_avc)
153                 .await;
154 
155             self.video_seq = chunk_body.clone();
156             self.video_timestamp = timestamp;
157         }
158 
159         self.av_statistics
160             .notify_video_statistics_info(chunk_body.len(), is_key_frame)
161             .await;
162 
163         Ok(())
164     }
165 
166     pub fn get_gops_data(&self) -> Option<VecDeque<Gop>> {
167         if self.gops.setted() {
168             Some(self.gops.get_gops())
169         } else {
170             None
171         }
172     }
173 }
174