1 use super::errors::PackerError; 2 use super::errors::UnPackerError; 3 use super::utils::OnFrameFn; 4 use super::utils::OnRtpPacketFn; 5 use super::utils::OnRtpPacketFn2; 6 use super::utils::TPacker; 7 8 use super::utils::TRtpReceiverForRtcp; 9 use super::utils::TUnPacker; 10 use super::utils::Unmarshal; 11 use super::RtpHeader; 12 use super::RtpPacket; 13 use async_trait::async_trait; 14 use byteorder::BigEndian; 15 use bytes::{BufMut, BytesMut}; 16 17 use bytesio::bytes_reader::BytesReader; 18 use bytesio::bytesio::TNetIO; 19 use std::sync::Arc; 20 use streamhub::define::FrameData; 21 use tokio::sync::Mutex; 22 23 // pub type OnPacketFn = fn(BytesMut) -> Result<(), PackerError>; 24 25 pub struct RtpAacPacker { 26 header: RtpHeader, 27 on_packet_handler: Option<OnRtpPacketFn>, 28 on_packet_for_rtcp_handler: Option<OnRtpPacketFn2>, 29 io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, 30 } 31 32 impl RtpAacPacker { new( payload_type: u8, ssrc: u32, init_seq: u16, io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, ) -> Self33 pub fn new( 34 payload_type: u8, 35 ssrc: u32, 36 init_seq: u16, 37 io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, 38 ) -> Self { 39 RtpAacPacker { 40 header: RtpHeader { 41 payload_type, 42 seq_number: init_seq, 43 ssrc, 44 version: 2, 45 marker: 1, 46 ..Default::default() 47 }, 48 io, 49 on_packet_handler: None, 50 on_packet_for_rtcp_handler: None, 51 } 52 } 53 } 54 #[async_trait] 55 impl TPacker for RtpAacPacker { pack(&mut self, data: &mut BytesMut, timestamp: u32) -> Result<(), PackerError>56 async fn pack(&mut self, data: &mut BytesMut, timestamp: u32) -> Result<(), PackerError> { 57 self.header.timestamp = timestamp; 58 59 let data_len = data.len(); 60 let mut packet = RtpPacket::new(self.header.clone()); 61 packet.payload.put_u16(16); 62 packet.payload.put_u8((data_len >> 5) as u8); 63 packet.payload.put_u8(((data_len & 0x1F) << 3) as u8); 64 packet.payload.put(data); 65 66 if let Some(f) = &self.on_packet_for_rtcp_handler { 67 f(packet.clone()); 68 } 69 70 if let Some(f) = &self.on_packet_handler { 71 f(self.io.clone(), packet).await?; 72 } 73 74 self.header.seq_number += 1; 75 76 Ok(()) 77 } 78 on_packet_handler(&mut self, f: OnRtpPacketFn)79 fn on_packet_handler(&mut self, f: OnRtpPacketFn) { 80 self.on_packet_handler = Some(f); 81 } 82 } 83 84 impl TRtpReceiverForRtcp for RtpAacPacker { on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2)85 fn on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2) { 86 self.on_packet_for_rtcp_handler = Some(f); 87 } 88 } 89 90 #[derive(Default)] 91 pub struct RtpAacUnPacker { 92 on_frame_handler: Option<OnFrameFn>, 93 on_packet_for_rtcp_handler: Option<OnRtpPacketFn2>, 94 } 95 96 // +---------+-----------+-----------+---------------+ 97 // | RTP | AU Header | Auxiliary | Access Unit | 98 // | Header | Section | Section | Data Section | 99 // +---------+-----------+-----------+---------------+ 100 // <----------RTP Packet Payload-----------> 101 // 102 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+- .. -+-+-+-+-+-+-+-+-+-+ 103 // |AU-headers-length|AU-header|AU-header| |AU-header|padding| 104 // | | (1) | (2) | | (n) | bits | 105 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+- .. -+-+-+-+-+-+-+-+-+-+ 106 107 // Au-headers-length 2 bytes 108 109 impl RtpAacUnPacker { new() -> Self110 pub fn new() -> Self { 111 Self { 112 ..Default::default() 113 } 114 } 115 } 116 117 impl TUnPacker for RtpAacUnPacker { unpack(&mut self, reader: &mut BytesReader) -> Result<(), UnPackerError>118 fn unpack(&mut self, reader: &mut BytesReader) -> Result<(), UnPackerError> { 119 let rtp_packet = RtpPacket::unmarshal(reader)?; 120 121 if let Some(f) = &self.on_packet_for_rtcp_handler { 122 f(rtp_packet.clone()); 123 } 124 125 let mut reader_payload = BytesReader::new(rtp_packet.payload); 126 127 let au_headers_length = (reader_payload.read_u16::<BigEndian>()? + 7) / 8; 128 let au_header_length = 2; 129 let aus_number = au_headers_length / au_header_length; 130 131 let mut au_lengths = Vec::new(); 132 for _ in 0..aus_number { 133 let au_length = (((reader_payload.read_u8()? as u16) << 8) 134 | ((reader_payload.read_u8()? as u16) & 0xF8)) as usize; 135 au_lengths.push(au_length / 8); 136 } 137 138 log::debug!( 139 "send audio : au_headers_length :{}, aus_number: {}, au_lengths: {:?}", 140 au_headers_length, 141 aus_number, 142 au_lengths, 143 ); 144 145 for (i, item) in au_lengths.iter().enumerate() { 146 let au_data = reader_payload.read_bytes(*item)?; 147 if let Some(f) = &self.on_frame_handler { 148 f(FrameData::Audio { 149 timestamp: rtp_packet.header.timestamp + i as u32 * 1024, 150 data: au_data, 151 })?; 152 } 153 } 154 155 Ok(()) 156 } on_frame_handler(&mut self, f: OnFrameFn)157 fn on_frame_handler(&mut self, f: OnFrameFn) { 158 self.on_frame_handler = Some(f); 159 } 160 } 161 162 impl TRtpReceiverForRtcp for RtpAacUnPacker { on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2)163 fn on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2) { 164 self.on_packet_for_rtcp_handler = Some(f); 165 } 166 } 167