1 use crate::dtls_transport::RTCDtlsTransport; 2 use crate::error::{Error, Result}; 3 use crate::rtp_transceiver::rtp_sender::RTPSenderInternal; 4 use crate::rtp_transceiver::SSRC; 5 6 use srtp::session::Session; 7 use srtp::stream::Stream; 8 9 use async_trait::async_trait; 10 use bytes::Bytes; 11 use interceptor::{Attributes, RTCPReader, RTPWriter}; 12 use std::sync::atomic::{AtomicBool, Ordering}; 13 use std::sync::{Arc, Weak}; 14 use tokio::sync::Mutex; 15 use util; 16 17 /// `RTP` packet sequence number manager. 18 /// 19 /// Used to override outgoing `RTP` packets' sequence numbers. On creating it is 20 /// unabled and can be enabled before sending data begining. Once data sending 21 /// began it can not be enabled any more. 22 pub(crate) struct SequenceTransformer(util::sync::Mutex<SequenceTransformerInner>); 23 24 /// [`SequenceTransformer`] inner. 25 struct SequenceTransformerInner { 26 offset: u16, 27 last_sq: u16, 28 reset_needed: bool, 29 enabled: bool, 30 data_sent: bool, 31 } 32 33 impl SequenceTransformer { 34 /// Creates a new [`SequenceTransformer`]. new() -> Self35 pub(crate) fn new() -> Self { 36 Self(util::sync::Mutex::new(SequenceTransformerInner { 37 offset: 0, 38 last_sq: rand::random(), 39 reset_needed: false, 40 enabled: false, 41 data_sent: false, 42 })) 43 } 44 45 /// Enables this [`SequenceTransformer`]. 46 /// 47 /// # Errors 48 /// 49 /// With [`Error::ErrRTPSenderSeqTransEnabled`] on trying to enable already 50 /// enabled [`SequenceTransformer`]. 51 /// 52 /// With [`Error::ErrRTPSenderSeqTransEnabled`] on trying to enable 53 /// [`SequenceTransformer`] after data sending began. enable(&self) -> Result<()>54 pub(crate) fn enable(&self) -> Result<()> { 55 let mut guard = self.0.lock(); 56 57 if guard.enabled { 58 return Err(Error::ErrRTPSenderSeqTransEnabled); 59 } 60 61 (!guard.data_sent) 62 .then(|| { 63 guard.enabled = true; 64 }) 65 .ok_or(Error::ErrRTPSenderDataSent) 66 } 67 68 /// Indicates [`SequenceTransformer`] about necessity of recalculating 69 /// `offset`. reset_offset(&self)70 pub(crate) fn reset_offset(&self) { 71 self.0.lock().reset_needed = true; 72 } 73 74 /// Gets [`Some`] consistent `sequence number` if this [`SequenceTransformer`] is 75 /// enabled or [`None`] if it is not. 76 /// 77 /// Once this method is called, considers data sending began. seq_number(&self, raw_sn: u16) -> Option<u16>78 fn seq_number(&self, raw_sn: u16) -> Option<u16> { 79 let mut guard = self.0.lock(); 80 guard.data_sent = true; 81 82 if !guard.enabled { 83 return None; 84 } 85 86 let offset = guard 87 .reset_needed 88 .then(|| { 89 guard.reset_needed = false; 90 let offset = guard.last_sq.overflowing_sub(raw_sn.overflowing_sub(1).0).0; 91 guard.offset = offset; 92 offset 93 }) 94 .unwrap_or(guard.offset); 95 let next = raw_sn.overflowing_add(offset).0; 96 guard.last_sq = next; 97 98 Some(next) 99 } 100 } 101 102 /// SrtpWriterFuture blocks Read/Write calls until 103 /// the SRTP Session is available 104 pub(crate) struct SrtpWriterFuture { 105 pub(crate) closed: AtomicBool, 106 pub(crate) ssrc: SSRC, 107 pub(crate) rtp_sender: Weak<RTPSenderInternal>, 108 pub(crate) rtp_transport: Arc<RTCDtlsTransport>, 109 pub(crate) rtcp_read_stream: Mutex<Option<Arc<Stream>>>, // atomic.Value // * 110 pub(crate) rtp_write_session: Mutex<Option<Arc<Session>>>, // atomic.Value // * 111 pub(crate) seq_trans: Arc<SequenceTransformer>, 112 } 113 114 impl SrtpWriterFuture { init(&self, return_when_no_srtp: bool) -> Result<()>115 async fn init(&self, return_when_no_srtp: bool) -> Result<()> { 116 if return_when_no_srtp { 117 { 118 if let Some(rtp_sender) = self.rtp_sender.upgrade() { 119 if rtp_sender.stop_called_signal.load(Ordering::SeqCst) { 120 return Err(Error::ErrClosedPipe); 121 } 122 } else { 123 return Err(Error::ErrClosedPipe); 124 } 125 } 126 127 if !self.rtp_transport.srtp_ready_signal.load(Ordering::SeqCst) { 128 return Ok(()); 129 } 130 } else { 131 let mut rx = self.rtp_transport.srtp_ready_rx.lock().await; 132 if let Some(srtp_ready_rx) = &mut *rx { 133 if let Some(rtp_sender) = self.rtp_sender.upgrade() { 134 tokio::select! { 135 _ = rtp_sender.stop_called_rx.notified()=> return Err(Error::ErrClosedPipe), 136 _ = srtp_ready_rx.recv() =>{} 137 } 138 } else { 139 return Err(Error::ErrClosedPipe); 140 } 141 } 142 } 143 144 if self.closed.load(Ordering::SeqCst) { 145 return Err(Error::ErrClosedPipe); 146 } 147 148 if let Some(srtcp_session) = self.rtp_transport.get_srtcp_session().await { 149 let rtcp_read_stream = srtcp_session.open(self.ssrc).await; 150 let mut stream = self.rtcp_read_stream.lock().await; 151 *stream = Some(rtcp_read_stream); 152 } 153 154 { 155 let srtp_session = self.rtp_transport.get_srtp_session().await; 156 let mut session = self.rtp_write_session.lock().await; 157 *session = srtp_session; 158 } 159 160 Ok(()) 161 } 162 close(&self) -> Result<()>163 pub async fn close(&self) -> Result<()> { 164 if self.closed.load(Ordering::SeqCst) { 165 return Ok(()); 166 } 167 self.closed.store(true, Ordering::SeqCst); 168 169 let stream = { 170 let mut stream = self.rtcp_read_stream.lock().await; 171 stream.take() 172 }; 173 if let Some(rtcp_read_stream) = stream { 174 Ok(rtcp_read_stream.close().await?) 175 } else { 176 Ok(()) 177 } 178 } 179 read(&self, b: &mut [u8]) -> Result<usize>180 pub async fn read(&self, b: &mut [u8]) -> Result<usize> { 181 { 182 let stream = { 183 let stream = self.rtcp_read_stream.lock().await; 184 stream.clone() 185 }; 186 if let Some(rtcp_read_stream) = stream { 187 return Ok(rtcp_read_stream.read(b).await?); 188 } 189 } 190 191 self.init(false).await?; 192 193 { 194 let stream = { 195 let stream = self.rtcp_read_stream.lock().await; 196 stream.clone() 197 }; 198 if let Some(rtcp_read_stream) = stream { 199 return Ok(rtcp_read_stream.read(b).await?); 200 } 201 } 202 203 Ok(0) 204 } 205 write_rtp(&self, pkt: &rtp::packet::Packet) -> Result<usize>206 pub async fn write_rtp(&self, pkt: &rtp::packet::Packet) -> Result<usize> { 207 { 208 let session = { 209 let session = self.rtp_write_session.lock().await; 210 session.clone() 211 }; 212 if let Some(rtp_write_session) = session { 213 return Ok(rtp_write_session.write_rtp(pkt).await?); 214 } 215 } 216 217 self.init(true).await?; 218 219 { 220 let session = { 221 let session = self.rtp_write_session.lock().await; 222 session.clone() 223 }; 224 if let Some(rtp_write_session) = session { 225 return Ok(rtp_write_session.write_rtp(pkt).await?); 226 } 227 } 228 229 Ok(0) 230 } 231 write(&self, b: &Bytes) -> Result<usize>232 pub async fn write(&self, b: &Bytes) -> Result<usize> { 233 { 234 let session = { 235 let session = self.rtp_write_session.lock().await; 236 session.clone() 237 }; 238 if let Some(rtp_write_session) = session { 239 return Ok(rtp_write_session.write(b, true).await?); 240 } 241 } 242 243 self.init(true).await?; 244 245 { 246 let session = { 247 let session = self.rtp_write_session.lock().await; 248 session.clone() 249 }; 250 if let Some(rtp_write_session) = session { 251 return Ok(rtp_write_session.write(b, true).await?); 252 } 253 } 254 255 Ok(0) 256 } 257 } 258 259 type IResult<T> = std::result::Result<T, interceptor::Error>; 260 261 #[async_trait] 262 impl RTCPReader for SrtpWriterFuture { read(&self, buf: &mut [u8], a: &Attributes) -> IResult<(usize, Attributes)>263 async fn read(&self, buf: &mut [u8], a: &Attributes) -> IResult<(usize, Attributes)> { 264 Ok((self.read(buf).await?, a.clone())) 265 } 266 } 267 268 #[async_trait] 269 impl RTPWriter for SrtpWriterFuture { write(&self, pkt: &rtp::packet::Packet, _a: &Attributes) -> IResult<usize>270 async fn write(&self, pkt: &rtp::packet::Packet, _a: &Attributes) -> IResult<usize> { 271 Ok( 272 match self.seq_trans.seq_number(pkt.header.sequence_number) { 273 Some(seq_num) => { 274 let mut new_pkt = pkt.clone(); 275 new_pkt.header.sequence_number = seq_num; 276 self.write_rtp(&new_pkt).await? 277 } 278 None => self.write_rtp(pkt).await?, 279 }, 280 ) 281 } 282 } 283