1 use crate::dtls_transport::RTCDtlsTransport;
2 use crate::error::{Error, Result};
3 use crate::rtp_transceiver::rtp_sender::RTPSenderInternal;
4 use crate::rtp_transceiver::SSRC;
5 
6 use srtp::session::Session;
7 use srtp::stream::Stream;
8 
9 use async_trait::async_trait;
10 use bytes::Bytes;
11 use interceptor::{Attributes, RTCPReader, RTPWriter};
12 use std::sync::atomic::{AtomicBool, Ordering};
13 use std::sync::{Arc, Weak};
14 use tokio::sync::Mutex;
15 use util;
16 
17 /// `RTP` packet sequence number manager.
18 ///
19 /// Used to override outgoing `RTP` packets' sequence numbers. On creating it is
20 /// unabled and can be enabled before sending data begining. Once data sending
21 /// began it can not be enabled any more.
22 pub(crate) struct SequenceTransformer(util::sync::Mutex<SequenceTransformerInner>);
23 
24 /// [`SequenceTransformer`] inner.
25 struct SequenceTransformerInner {
26     offset: u16,
27     last_sq: u16,
28     reset_needed: bool,
29     enabled: bool,
30     data_sent: bool,
31 }
32 
33 impl SequenceTransformer {
34     /// Creates a new [`SequenceTransformer`].
new() -> Self35     pub(crate) fn new() -> Self {
36         Self(util::sync::Mutex::new(SequenceTransformerInner {
37             offset: 0,
38             last_sq: rand::random(),
39             reset_needed: false,
40             enabled: false,
41             data_sent: false,
42         }))
43     }
44 
45     /// Enables this [`SequenceTransformer`].
46     ///
47     /// # Errors
48     ///
49     /// With [`Error::ErrRTPSenderSeqTransEnabled`] on trying to enable already
50     /// enabled [`SequenceTransformer`].
51     ///
52     /// With [`Error::ErrRTPSenderSeqTransEnabled`] on trying to enable
53     /// [`SequenceTransformer`] after data sending began.
enable(&self) -> Result<()>54     pub(crate) fn enable(&self) -> Result<()> {
55         let mut guard = self.0.lock();
56 
57         if guard.enabled {
58             return Err(Error::ErrRTPSenderSeqTransEnabled);
59         }
60 
61         (!guard.data_sent)
62             .then(|| {
63                 guard.enabled = true;
64             })
65             .ok_or(Error::ErrRTPSenderDataSent)
66     }
67 
68     /// Indicates [`SequenceTransformer`] about necessity of recalculating
69     /// `offset`.
reset_offset(&self)70     pub(crate) fn reset_offset(&self) {
71         self.0.lock().reset_needed = true;
72     }
73 
74     /// Gets [`Some`] consistent `sequence number` if this [`SequenceTransformer`] is
75     /// enabled or [`None`] if it is not.
76     ///
77     /// Once this method is called, considers data sending began.
seq_number(&self, raw_sn: u16) -> Option<u16>78     fn seq_number(&self, raw_sn: u16) -> Option<u16> {
79         let mut guard = self.0.lock();
80         guard.data_sent = true;
81 
82         if !guard.enabled {
83             return None;
84         }
85 
86         let offset = guard
87             .reset_needed
88             .then(|| {
89                 guard.reset_needed = false;
90                 let offset = guard.last_sq.overflowing_sub(raw_sn.overflowing_sub(1).0).0;
91                 guard.offset = offset;
92                 offset
93             })
94             .unwrap_or(guard.offset);
95         let next = raw_sn.overflowing_add(offset).0;
96         guard.last_sq = next;
97 
98         Some(next)
99     }
100 }
101 
102 /// SrtpWriterFuture blocks Read/Write calls until
103 /// the SRTP Session is available
104 pub(crate) struct SrtpWriterFuture {
105     pub(crate) closed: AtomicBool,
106     pub(crate) ssrc: SSRC,
107     pub(crate) rtp_sender: Weak<RTPSenderInternal>,
108     pub(crate) rtp_transport: Arc<RTCDtlsTransport>,
109     pub(crate) rtcp_read_stream: Mutex<Option<Arc<Stream>>>, // atomic.Value // *
110     pub(crate) rtp_write_session: Mutex<Option<Arc<Session>>>, // atomic.Value // *
111     pub(crate) seq_trans: Arc<SequenceTransformer>,
112 }
113 
114 impl SrtpWriterFuture {
init(&self, return_when_no_srtp: bool) -> Result<()>115     async fn init(&self, return_when_no_srtp: bool) -> Result<()> {
116         if return_when_no_srtp {
117             {
118                 if let Some(rtp_sender) = self.rtp_sender.upgrade() {
119                     if rtp_sender.stop_called_signal.load(Ordering::SeqCst) {
120                         return Err(Error::ErrClosedPipe);
121                     }
122                 } else {
123                     return Err(Error::ErrClosedPipe);
124                 }
125             }
126 
127             if !self.rtp_transport.srtp_ready_signal.load(Ordering::SeqCst) {
128                 return Ok(());
129             }
130         } else {
131             let mut rx = self.rtp_transport.srtp_ready_rx.lock().await;
132             if let Some(srtp_ready_rx) = &mut *rx {
133                 if let Some(rtp_sender) = self.rtp_sender.upgrade() {
134                     tokio::select! {
135                         _ = rtp_sender.stop_called_rx.notified()=> return Err(Error::ErrClosedPipe),
136                         _ = srtp_ready_rx.recv() =>{}
137                     }
138                 } else {
139                     return Err(Error::ErrClosedPipe);
140                 }
141             }
142         }
143 
144         if self.closed.load(Ordering::SeqCst) {
145             return Err(Error::ErrClosedPipe);
146         }
147 
148         if let Some(srtcp_session) = self.rtp_transport.get_srtcp_session().await {
149             let rtcp_read_stream = srtcp_session.open(self.ssrc).await;
150             let mut stream = self.rtcp_read_stream.lock().await;
151             *stream = Some(rtcp_read_stream);
152         }
153 
154         {
155             let srtp_session = self.rtp_transport.get_srtp_session().await;
156             let mut session = self.rtp_write_session.lock().await;
157             *session = srtp_session;
158         }
159 
160         Ok(())
161     }
162 
close(&self) -> Result<()>163     pub async fn close(&self) -> Result<()> {
164         if self.closed.load(Ordering::SeqCst) {
165             return Ok(());
166         }
167         self.closed.store(true, Ordering::SeqCst);
168 
169         let stream = {
170             let mut stream = self.rtcp_read_stream.lock().await;
171             stream.take()
172         };
173         if let Some(rtcp_read_stream) = stream {
174             Ok(rtcp_read_stream.close().await?)
175         } else {
176             Ok(())
177         }
178     }
179 
read(&self, b: &mut [u8]) -> Result<usize>180     pub async fn read(&self, b: &mut [u8]) -> Result<usize> {
181         {
182             let stream = {
183                 let stream = self.rtcp_read_stream.lock().await;
184                 stream.clone()
185             };
186             if let Some(rtcp_read_stream) = stream {
187                 return Ok(rtcp_read_stream.read(b).await?);
188             }
189         }
190 
191         self.init(false).await?;
192 
193         {
194             let stream = {
195                 let stream = self.rtcp_read_stream.lock().await;
196                 stream.clone()
197             };
198             if let Some(rtcp_read_stream) = stream {
199                 return Ok(rtcp_read_stream.read(b).await?);
200             }
201         }
202 
203         Ok(0)
204     }
205 
write_rtp(&self, pkt: &rtp::packet::Packet) -> Result<usize>206     pub async fn write_rtp(&self, pkt: &rtp::packet::Packet) -> Result<usize> {
207         {
208             let session = {
209                 let session = self.rtp_write_session.lock().await;
210                 session.clone()
211             };
212             if let Some(rtp_write_session) = session {
213                 return Ok(rtp_write_session.write_rtp(pkt).await?);
214             }
215         }
216 
217         self.init(true).await?;
218 
219         {
220             let session = {
221                 let session = self.rtp_write_session.lock().await;
222                 session.clone()
223             };
224             if let Some(rtp_write_session) = session {
225                 return Ok(rtp_write_session.write_rtp(pkt).await?);
226             }
227         }
228 
229         Ok(0)
230     }
231 
write(&self, b: &Bytes) -> Result<usize>232     pub async fn write(&self, b: &Bytes) -> Result<usize> {
233         {
234             let session = {
235                 let session = self.rtp_write_session.lock().await;
236                 session.clone()
237             };
238             if let Some(rtp_write_session) = session {
239                 return Ok(rtp_write_session.write(b, true).await?);
240             }
241         }
242 
243         self.init(true).await?;
244 
245         {
246             let session = {
247                 let session = self.rtp_write_session.lock().await;
248                 session.clone()
249             };
250             if let Some(rtp_write_session) = session {
251                 return Ok(rtp_write_session.write(b, true).await?);
252             }
253         }
254 
255         Ok(0)
256     }
257 }
258 
259 type IResult<T> = std::result::Result<T, interceptor::Error>;
260 
261 #[async_trait]
262 impl RTCPReader for SrtpWriterFuture {
read(&self, buf: &mut [u8], a: &Attributes) -> IResult<(usize, Attributes)>263     async fn read(&self, buf: &mut [u8], a: &Attributes) -> IResult<(usize, Attributes)> {
264         Ok((self.read(buf).await?, a.clone()))
265     }
266 }
267 
268 #[async_trait]
269 impl RTPWriter for SrtpWriterFuture {
write(&self, pkt: &rtp::packet::Packet, _a: &Attributes) -> IResult<usize>270     async fn write(&self, pkt: &rtp::packet::Packet, _a: &Attributes) -> IResult<usize> {
271         Ok(
272             match self.seq_trans.seq_number(pkt.header.sequence_number) {
273                 Some(seq_num) => {
274                     let mut new_pkt = pkt.clone();
275                     new_pkt.header.sequence_number = seq_num;
276                     self.write_rtp(&new_pkt).await?
277                 }
278                 None => self.write_rtp(pkt).await?,
279             },
280         )
281     }
282 }
283