xref: /xiu/protocol/rtsp/src/rtp/rtp_aac.rs (revision b36cf5da)
1 use super::errors::PackerError;
2 use super::errors::UnPackerError;
3 use super::utils::OnFrameFn;
4 use super::utils::OnRtpPacketFn;
5 use super::utils::OnRtpPacketFn2;
6 use super::utils::TPacker;
7 
8 use super::utils::TRtpReceiverForRtcp;
9 use super::utils::TUnPacker;
10 use super::utils::Unmarshal;
11 use super::RtpHeader;
12 use super::RtpPacket;
13 use async_trait::async_trait;
14 use byteorder::BigEndian;
15 use bytes::{BufMut, BytesMut};
16 
17 use bytesio::bytes_reader::BytesReader;
18 use bytesio::bytesio::TNetIO;
19 use std::sync::Arc;
20 use streamhub::define::FrameData;
21 use tokio::sync::Mutex;
22 
23 // pub type OnPacketFn = fn(BytesMut) -> Result<(), PackerError>;
24 
25 pub struct RtpAacPacker {
26     header: RtpHeader,
27     on_packet_handler: Option<OnRtpPacketFn>,
28     on_packet_for_rtcp_handler: Option<OnRtpPacketFn2>,
29     io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>,
30 }
31 
32 impl RtpAacPacker {
new( payload_type: u8, ssrc: u32, init_seq: u16, io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, ) -> Self33     pub fn new(
34         payload_type: u8,
35         ssrc: u32,
36         init_seq: u16,
37         io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>,
38     ) -> Self {
39         RtpAacPacker {
40             header: RtpHeader {
41                 payload_type,
42                 seq_number: init_seq,
43                 ssrc,
44                 version: 2,
45                 marker: 1,
46                 ..Default::default()
47             },
48             io,
49             on_packet_handler: None,
50             on_packet_for_rtcp_handler: None,
51         }
52     }
53 }
54 #[async_trait]
55 impl TPacker for RtpAacPacker {
pack(&mut self, data: &mut BytesMut, timestamp: u32) -> Result<(), PackerError>56     async fn pack(&mut self, data: &mut BytesMut, timestamp: u32) -> Result<(), PackerError> {
57         self.header.timestamp = timestamp;
58 
59         let data_len = data.len();
60         let mut packet = RtpPacket::new(self.header.clone());
61         packet.payload.put_u16(16);
62         packet.payload.put_u8((data_len >> 5) as u8);
63         packet.payload.put_u8(((data_len & 0x1F) << 3) as u8);
64         packet.payload.put(data);
65 
66         if let Some(f) = &self.on_packet_for_rtcp_handler {
67             f(packet.clone());
68         }
69 
70         if let Some(f) = &self.on_packet_handler {
71             f(self.io.clone(), packet).await?;
72         }
73 
74         self.header.seq_number += 1;
75 
76         Ok(())
77     }
78 
on_packet_handler(&mut self, f: OnRtpPacketFn)79     fn on_packet_handler(&mut self, f: OnRtpPacketFn) {
80         self.on_packet_handler = Some(f);
81     }
82 }
83 
84 impl TRtpReceiverForRtcp for RtpAacPacker {
on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2)85     fn on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2) {
86         self.on_packet_for_rtcp_handler = Some(f);
87     }
88 }
89 
90 #[derive(Default)]
91 pub struct RtpAacUnPacker {
92     on_frame_handler: Option<OnFrameFn>,
93     on_packet_for_rtcp_handler: Option<OnRtpPacketFn2>,
94 }
95 
96 // +---------+-----------+-----------+---------------+
97 // | RTP     | AU Header | Auxiliary | Access Unit   |
98 // | Header  | Section   | Section   | Data Section  |
99 // +---------+-----------+-----------+---------------+
100 // 	<----------RTP Packet Payload----------->
101 //
102 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+- .. -+-+-+-+-+-+-+-+-+-+
103 // |AU-headers-length|AU-header|AU-header|      |AU-header|padding|
104 // |                 |   (1)   |   (2)   |      |   (n)   | bits  |
105 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+- .. -+-+-+-+-+-+-+-+-+-+
106 
107 // Au-headers-length 2 bytes
108 
109 impl RtpAacUnPacker {
new() -> Self110     pub fn new() -> Self {
111         Self {
112             ..Default::default()
113         }
114     }
115 }
116 
117 impl TUnPacker for RtpAacUnPacker {
unpack(&mut self, reader: &mut BytesReader) -> Result<(), UnPackerError>118     fn unpack(&mut self, reader: &mut BytesReader) -> Result<(), UnPackerError> {
119         let rtp_packet = RtpPacket::unmarshal(reader)?;
120 
121         if let Some(f) = &self.on_packet_for_rtcp_handler {
122             f(rtp_packet.clone());
123         }
124 
125         let mut reader_payload = BytesReader::new(rtp_packet.payload);
126 
127         let au_headers_length = (reader_payload.read_u16::<BigEndian>()? + 7) / 8;
128         let au_header_length = 2;
129         let aus_number = au_headers_length / au_header_length;
130 
131         let mut au_lengths = Vec::new();
132         for _ in 0..aus_number {
133             let au_length = (((reader_payload.read_u8()? as u16) << 8)
134                 | ((reader_payload.read_u8()? as u16) & 0xF8)) as usize;
135             au_lengths.push(au_length / 8);
136         }
137 
138         log::debug!(
139             "send audio : au_headers_length :{}, aus_number: {}, au_lengths: {:?}",
140             au_headers_length,
141             aus_number,
142             au_lengths,
143         );
144 
145         for (i, item) in au_lengths.iter().enumerate() {
146             let au_data = reader_payload.read_bytes(*item)?;
147             if let Some(f) = &self.on_frame_handler {
148                 f(FrameData::Audio {
149                     timestamp: rtp_packet.header.timestamp + i as u32 * 1024,
150                     data: au_data,
151                 })?;
152             }
153         }
154 
155         Ok(())
156     }
on_frame_handler(&mut self, f: OnFrameFn)157     fn on_frame_handler(&mut self, f: OnFrameFn) {
158         self.on_frame_handler = Some(f);
159     }
160 }
161 
162 impl TRtpReceiverForRtcp for RtpAacUnPacker {
on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2)163     fn on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2) {
164         self.on_packet_for_rtcp_handler = Some(f);
165     }
166 }
167