1 use crate::stream::StreamIdentifier;
2 
3 use {
4     super::StreamStatistics,
5     std::{sync::Arc, time::Duration},
6     tokio::{
7         sync::{
8             mpsc,
9             mpsc::{Receiver, Sender},
10             Mutex,
11         },
12         time,
13     },
14     xflv::{
15         define,
16         define::{aac_packet_type, AvcCodecId, SoundFormat},
17         mpeg4_aac::Mpeg4Aac,
18         mpeg4_avc::Mpeg4Avc,
19     },
20 };
21 
22 pub struct AvStatistics {
23     /*used to calculate video bitrate */
24     video_bytes: Arc<Mutex<f32>>,
25     /*used to calculate audio bitrate */
26     audio_bytes: Arc<Mutex<f32>>,
27     //used to calculate frame rate
28     frame_count: Arc<Mutex<usize>>,
29     //used to calculate GOP
30     gop_frame_count: Arc<Mutex<usize>>,
31     stream_statistics: Arc<Mutex<StreamStatistics>>,
32     pub sender: Sender<bool>,
33 }
34 
35 impl AvStatistics {
new(identifier: StreamIdentifier) -> Self36     pub fn new(identifier: StreamIdentifier) -> Self {
37         let (s, _): (Sender<bool>, Receiver<bool>) = mpsc::channel(1);
38         Self {
39             video_bytes: Arc::new(Mutex::new(0.0)),
40             audio_bytes: Arc::new(Mutex::new(0.0)),
41             frame_count: Arc::new(Mutex::new(0)),
42             gop_frame_count: Arc::new(Mutex::new(0)),
43             stream_statistics: Arc::new(Mutex::new(StreamStatistics::new(identifier))),
44             sender: s,
45         }
46     }
47 
notify_audio_codec_info(&mut self, codec_info: &Mpeg4Aac)48     pub async fn notify_audio_codec_info(&mut self, codec_info: &Mpeg4Aac) {
49         let audio_info = &mut self.stream_statistics.lock().await.audio;
50         audio_info.profile = define::u8_2_aac_profile(codec_info.profile);
51         audio_info.samplerate = codec_info.sampling_frequency;
52         audio_info.sound_format = SoundFormat::AAC;
53         audio_info.channels = codec_info.channels;
54     }
55 
notify_video_codec_info(&mut self, codec_info: &Mpeg4Avc)56     pub async fn notify_video_codec_info(&mut self, codec_info: &Mpeg4Avc) {
57         let video_info = &mut self.stream_statistics.lock().await.video;
58         video_info.codec = AvcCodecId::H264;
59         video_info.profile = define::u8_2_avc_profile(codec_info.profile);
60         video_info.level = define::u8_2_avc_level(codec_info.level);
61         video_info.height = codec_info.height;
62         video_info.width = codec_info.width;
63     }
64 
notify_audio_statistics_info(&mut self, data_size: usize, aac_packet_type: u8)65     pub async fn notify_audio_statistics_info(&mut self, data_size: usize, aac_packet_type: u8) {
66         match aac_packet_type {
67             aac_packet_type::AAC_RAW => {
68                 *self.audio_bytes.lock().await += data_size as f32;
69             }
70             aac_packet_type::AAC_SEQHDR => {}
71             _ => {}
72         }
73     }
74 
notify_video_statistics_info(&mut self, data_size: usize, is_key_frame: bool)75     pub async fn notify_video_statistics_info(&mut self, data_size: usize, is_key_frame: bool) {
76         *self.video_bytes.lock().await += data_size as f32;
77         *self.frame_count.lock().await += 1;
78         if is_key_frame {
79             let video_info = &mut self.stream_statistics.lock().await.video;
80             video_info.gop = *self.gop_frame_count.lock().await;
81             *self.gop_frame_count.lock().await = 0;
82         } else {
83             *self.gop_frame_count.lock().await += 1;
84         }
85     }
86 
start(&mut self)87     pub fn start(&mut self) {
88         let mut interval = time::interval(Duration::from_secs(1));
89 
90         let video_bytes_clone = self.video_bytes.clone();
91         let audio_bytes_clone = self.audio_bytes.clone();
92         let frame_count_clone = self.frame_count.clone();
93         let stream_statistics_clone = self.stream_statistics.clone();
94 
95         let (s, mut r): (Sender<bool>, Receiver<bool>) = mpsc::channel(1);
96         self.sender = s;
97 
98         tokio::spawn(async move {
99             loop {
100                 tokio::select! {
101                    _ = interval.tick() => {
102                     {
103                         let stream_statistics = &mut stream_statistics_clone.lock().await;
104                         let audio_info = &mut stream_statistics.audio;
105                         audio_info.bitrate = *audio_bytes_clone.lock().await * 8.0/1000.0;
106 
107                         let video_info = &mut stream_statistics.video;
108                         video_info.bitrate = *video_bytes_clone.lock().await * 8.0/1000.0;
109                         video_info.frame_rate = *frame_count_clone.lock().await;
110                     }
111                     *video_bytes_clone.lock().await = 0.0;
112                     *audio_bytes_clone.lock().await = 0.0;
113                     *frame_count_clone.lock().await = 0;
114                     // if let Ok(strinfo) = serde_json::to_string(&*stream_statistics_clone.lock().await) {
115                     //    // log::info!("stream_info: {strinfo}");
116                     // }
117                 },
118                    _ = r.recv() =>{
119                         log::info!("avstatistics shutting down");
120                         return
121                    },
122                 }
123             }
124         });
125     }
126 
get_avstatistic_data(&self) -> StreamStatistics127     pub async fn get_avstatistic_data(&self) -> StreamStatistics {
128         self.stream_statistics.lock().await.clone()
129     }
130 }
131