xref: /xiu/protocol/rtmp/src/cache/mod.rs (revision cbe12ea9)
1 pub mod errors;
2 pub mod gop;
3 pub mod metadata;
4 
5 use {
6     self::gop::Gops,
7     bytes::BytesMut,
8     bytesio::bytes_reader::BytesReader,
9     errors::CacheError,
10     gop::Gop,
11     std::collections::VecDeque,
12     streamhub::define::FrameData,
13     streamhub::statistics::avstatistics::AvStatistics,
14     streamhub::stream::StreamIdentifier,
15     xflv::{
16         define,
17         flv_tag_header::{AudioTagHeader, VideoTagHeader},
18         mpeg4_aac::Mpeg4AacProcessor,
19         mpeg4_avc::Mpeg4AvcProcessor,
20         Unmarshal,
21     },
22 };
23 
24 // #[derive(Clone)]
25 pub struct Cache {
26     metadata: metadata::MetaData,
27     metadata_timestamp: u32,
28     video_seq: BytesMut,
29     video_timestamp: u32,
30     audio_seq: BytesMut,
31     audio_timestamp: u32,
32     gops: Gops,
33     pub av_statistics: AvStatistics,
34 }
35 
36 impl Drop for Cache {
37     #[allow(unused_must_use)]
38     fn drop(&mut self) {
39         self.av_statistics.sender.send(true);
40     }
41 }
42 
43 impl Cache {
44     pub fn new(app_name: String, stream_name: String, gop_num: usize) -> Self {
45         let identifier = StreamIdentifier::Rtmp {
46             app_name,
47             stream_name,
48         };
49         let mut cache = Cache {
50             metadata: metadata::MetaData::new(),
51             metadata_timestamp: 0,
52             video_seq: BytesMut::new(),
53             video_timestamp: 0,
54             audio_seq: BytesMut::new(),
55             audio_timestamp: 0,
56             gops: Gops::new(gop_num),
57             av_statistics: AvStatistics::new(identifier),
58         };
59         cache.av_statistics.start();
60         cache
61     }
62 
63     //, values: Vec<Amf0ValueType>
64     pub fn save_metadata(&mut self, chunk_body: &BytesMut, timestamp: u32) {
65         self.metadata.save(chunk_body);
66         self.metadata_timestamp = timestamp;
67     }
68 
69     pub fn get_metadata(&self) -> Option<FrameData> {
70         let data = self.metadata.get_chunk_body();
71         if !data.is_empty() {
72             Some(FrameData::MetaData {
73                 timestamp: self.metadata_timestamp,
74                 data,
75             })
76         } else {
77             None
78         }
79     }
80     //save audio gops and sequence header information
81     pub async fn save_audio_data(
82         &mut self,
83         chunk_body: &BytesMut,
84         timestamp: u32,
85     ) -> Result<(), CacheError> {
86         let channel_data = FrameData::Audio {
87             timestamp,
88             data: chunk_body.clone(),
89         };
90         self.gops.save_frame_data(channel_data, false);
91 
92         let mut reader = BytesReader::new(chunk_body.clone());
93         let tag_header = AudioTagHeader::unmarshal(&mut reader)?;
94 
95         if tag_header.sound_format == define::SoundFormat::AAC as u8
96             && tag_header.aac_packet_type == define::aac_packet_type::AAC_SEQHDR
97         {
98             self.audio_seq = chunk_body.clone();
99             self.audio_timestamp = timestamp;
100 
101             let mut aac_processor = Mpeg4AacProcessor::default();
102             let aac = aac_processor
103                 .extend_data(reader.extract_remaining_bytes())
104                 .audio_specific_config_load()?;
105             self.av_statistics
106                 .notify_audio_codec_info(&aac.mpeg4_aac)
107                 .await;
108         }
109 
110         self.av_statistics
111             .notify_audio_statistics_info(chunk_body.len(), tag_header.aac_packet_type)
112             .await;
113 
114         Ok(())
115     }
116 
117     pub fn get_audio_seq(&self) -> Option<FrameData> {
118         if !self.audio_seq.is_empty() {
119             return Some(FrameData::Audio {
120                 timestamp: self.audio_timestamp,
121                 data: self.audio_seq.clone(),
122             });
123         }
124         None
125     }
126 
127     pub fn get_video_seq(&self) -> Option<FrameData> {
128         if !self.video_seq.is_empty() {
129             return Some(FrameData::Video {
130                 timestamp: self.video_timestamp,
131                 data: self.video_seq.clone(),
132             });
133         }
134         None
135     }
136     //save video gops and sequence header information
137     pub async fn save_video_data(
138         &mut self,
139         chunk_body: &BytesMut,
140         timestamp: u32,
141     ) -> Result<(), CacheError> {
142         let channel_data = FrameData::Video {
143             timestamp,
144             data: chunk_body.clone(),
145         };
146 
147         let mut reader = BytesReader::new(chunk_body.clone());
148         let tag_header = VideoTagHeader::unmarshal(&mut reader)?;
149 
150         let is_key_frame = tag_header.frame_type == define::frame_type::KEY_FRAME;
151         self.gops.save_frame_data(channel_data, is_key_frame);
152 
153         if is_key_frame && tag_header.avc_packet_type == define::avc_packet_type::AVC_SEQHDR {
154             let mut avc_processor = Mpeg4AvcProcessor::default();
155             avc_processor.decoder_configuration_record_load(&mut reader)?;
156 
157             self.av_statistics
158                 .notify_video_codec_info(&avc_processor.mpeg4_avc)
159                 .await;
160 
161             self.video_seq = chunk_body.clone();
162             self.video_timestamp = timestamp;
163         }
164 
165         self.av_statistics
166             .notify_video_statistics_info(chunk_body.len(), is_key_frame)
167             .await;
168 
169         Ok(())
170     }
171 
172     pub fn get_gops_data(&self) -> Option<VecDeque<Gop>> {
173         if self.gops.setted() {
174             Some(self.gops.get_gops())
175         } else {
176             None
177         }
178     }
179 }
180