1 use crate::stream::StreamIdentifier; 2 3 use { 4 super::StreamStatistics, 5 std::{sync::Arc, time::Duration}, 6 tokio::{ 7 sync::{ 8 mpsc, 9 mpsc::{Receiver, Sender}, 10 Mutex, 11 }, 12 time, 13 }, 14 xflv::{ 15 define, 16 define::{aac_packet_type, AvcCodecId, SoundFormat}, 17 mpeg4_aac::Mpeg4Aac, 18 mpeg4_avc::Mpeg4Avc, 19 }, 20 }; 21 22 pub struct AvStatistics { 23 /*used to calculate video bitrate */ 24 video_bytes: Arc<Mutex<f32>>, 25 /*used to calculate audio bitrate */ 26 audio_bytes: Arc<Mutex<f32>>, 27 //used to calculate frame rate 28 frame_count: Arc<Mutex<usize>>, 29 //used to calculate GOP 30 gop_frame_count: Arc<Mutex<usize>>, 31 stream_statistics: Arc<Mutex<StreamStatistics>>, 32 pub sender: Sender<bool>, 33 } 34 35 impl AvStatistics { new(identifier: StreamIdentifier) -> Self36 pub fn new(identifier: StreamIdentifier) -> Self { 37 let (s, _): (Sender<bool>, Receiver<bool>) = mpsc::channel(1); 38 Self { 39 video_bytes: Arc::new(Mutex::new(0.0)), 40 audio_bytes: Arc::new(Mutex::new(0.0)), 41 frame_count: Arc::new(Mutex::new(0)), 42 gop_frame_count: Arc::new(Mutex::new(0)), 43 stream_statistics: Arc::new(Mutex::new(StreamStatistics::new(identifier))), 44 sender: s, 45 } 46 } 47 notify_audio_codec_info(&mut self, codec_info: &Mpeg4Aac)48 pub async fn notify_audio_codec_info(&mut self, codec_info: &Mpeg4Aac) { 49 let audio_info = &mut self.stream_statistics.lock().await.audio; 50 audio_info.profile = define::u8_2_aac_profile(codec_info.profile); 51 audio_info.samplerate = codec_info.sampling_frequency; 52 audio_info.sound_format = SoundFormat::AAC; 53 audio_info.channels = codec_info.channels; 54 } 55 notify_video_codec_info(&mut self, codec_info: &Mpeg4Avc)56 pub async fn notify_video_codec_info(&mut self, codec_info: &Mpeg4Avc) { 57 let video_info = &mut self.stream_statistics.lock().await.video; 58 video_info.codec = AvcCodecId::H264; 59 video_info.profile = define::u8_2_avc_profile(codec_info.profile); 60 video_info.level = define::u8_2_avc_level(codec_info.level); 61 video_info.height = codec_info.height; 62 video_info.width = codec_info.width; 63 } 64 notify_audio_statistics_info(&mut self, data_size: usize, aac_packet_type: u8)65 pub async fn notify_audio_statistics_info(&mut self, data_size: usize, aac_packet_type: u8) { 66 match aac_packet_type { 67 aac_packet_type::AAC_RAW => { 68 *self.audio_bytes.lock().await += data_size as f32; 69 } 70 aac_packet_type::AAC_SEQHDR => {} 71 _ => {} 72 } 73 } 74 notify_video_statistics_info(&mut self, data_size: usize, is_key_frame: bool)75 pub async fn notify_video_statistics_info(&mut self, data_size: usize, is_key_frame: bool) { 76 *self.video_bytes.lock().await += data_size as f32; 77 *self.frame_count.lock().await += 1; 78 if is_key_frame { 79 let video_info = &mut self.stream_statistics.lock().await.video; 80 video_info.gop = *self.gop_frame_count.lock().await; 81 *self.gop_frame_count.lock().await = 0; 82 } else { 83 *self.gop_frame_count.lock().await += 1; 84 } 85 } 86 start(&mut self)87 pub fn start(&mut self) { 88 let mut interval = time::interval(Duration::from_secs(1)); 89 90 let video_bytes_clone = self.video_bytes.clone(); 91 let audio_bytes_clone = self.audio_bytes.clone(); 92 let frame_count_clone = self.frame_count.clone(); 93 let stream_statistics_clone = self.stream_statistics.clone(); 94 95 let (s, mut r): (Sender<bool>, Receiver<bool>) = mpsc::channel(1); 96 self.sender = s; 97 98 tokio::spawn(async move { 99 loop { 100 tokio::select! { 101 _ = interval.tick() => { 102 { 103 let stream_statistics = &mut stream_statistics_clone.lock().await; 104 let audio_info = &mut stream_statistics.audio; 105 audio_info.bitrate = *audio_bytes_clone.lock().await * 8.0/1000.0; 106 107 let video_info = &mut stream_statistics.video; 108 video_info.bitrate = *video_bytes_clone.lock().await * 8.0/1000.0; 109 video_info.frame_rate = *frame_count_clone.lock().await; 110 } 111 *video_bytes_clone.lock().await = 0.0; 112 *audio_bytes_clone.lock().await = 0.0; 113 *frame_count_clone.lock().await = 0; 114 // if let Ok(strinfo) = serde_json::to_string(&*stream_statistics_clone.lock().await) { 115 // // log::info!("stream_info: {strinfo}"); 116 // } 117 }, 118 _ = r.recv() =>{ 119 log::info!("avstatistics shutting down"); 120 return 121 }, 122 } 123 } 124 }); 125 } 126 get_avstatistic_data(&self) -> StreamStatistics127 pub async fn get_avstatistic_data(&self) -> StreamStatistics { 128 self.stream_statistics.lock().await.clone() 129 } 130 } 131