1 #[cfg(test)]
2 mod rtp_sender_test;
3 
4 use crate::api::media_engine::MediaEngine;
5 use crate::dtls_transport::RTCDtlsTransport;
6 use crate::error::{Error, Result};
7 use crate::rtp_transceiver::rtp_codec::{RTCRtpCodecParameters, RTPCodecType};
8 use crate::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection;
9 use crate::rtp_transceiver::srtp_writer_future::SrtpWriterFuture;
10 use crate::rtp_transceiver::{
11     create_stream_info, PayloadType, RTCRtpEncodingParameters, RTCRtpSendParameters,
12     RTCRtpTransceiver, SSRC,
13 };
14 use crate::track::track_local::{
15     InterceptorToTrackLocalWriter, TrackLocal, TrackLocalContext, TrackLocalWriter,
16 };
17 
18 use ice::rand::generate_crypto_random_string;
19 use interceptor::stream_info::StreamInfo;
20 use interceptor::{Attributes, Interceptor, RTCPReader, RTPWriter};
21 use std::sync::atomic::{AtomicBool, Ordering};
22 use std::sync::{Arc, Weak};
23 use tokio::sync::{mpsc, Mutex, Notify};
24 use util::sync::Mutex as SyncMutex;
25 
26 use super::srtp_writer_future::SequenceTransformer;
27 
28 pub(crate) struct RTPSenderInternal {
29     pub(crate) send_called_rx: Mutex<mpsc::Receiver<()>>,
30     pub(crate) stop_called_rx: Arc<Notify>,
31     pub(crate) stop_called_signal: Arc<AtomicBool>,
32     pub(crate) rtcp_interceptor: Mutex<Option<Arc<dyn RTCPReader + Send + Sync>>>,
33 }
34 
35 impl RTPSenderInternal {
36     /// read reads incoming RTCP for this RTPReceiver
read(&self, b: &mut [u8]) -> Result<(usize, Attributes)>37     async fn read(&self, b: &mut [u8]) -> Result<(usize, Attributes)> {
38         let mut send_called_rx = self.send_called_rx.lock().await;
39 
40         tokio::select! {
41             _ = send_called_rx.recv() =>{
42                 let rtcp_interceptor = {
43                     let rtcp_interceptor = self.rtcp_interceptor.lock().await;
44                     rtcp_interceptor.clone()
45                 };
46                 if let Some(rtcp_interceptor) = rtcp_interceptor{
47                     let a = Attributes::new();
48                     tokio::select! {
49                         _ = self.stop_called_rx.notified() => {
50                             Err(Error::ErrClosedPipe)
51                         }
52                         result = rtcp_interceptor.read(b, &a) => {
53                             Ok(result?)
54                         }
55                     }
56                 }else{
57                     Err(Error::ErrInterceptorNotBind)
58                 }
59             }
60             _ = self.stop_called_rx.notified() =>{
61                 Err(Error::ErrClosedPipe)
62             }
63         }
64     }
65 
66     /// read_rtcp is a convenience method that wraps Read and unmarshals for you.
read_rtcp( &self, receive_mtu: usize, ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)>67     async fn read_rtcp(
68         &self,
69         receive_mtu: usize,
70     ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)> {
71         let mut b = vec![0u8; receive_mtu];
72         let (n, attributes) = self.read(&mut b).await?;
73 
74         let mut buf = &b[..n];
75         let pkts = rtcp::packet::unmarshal(&mut buf)?;
76 
77         Ok((pkts, attributes))
78     }
79 }
80 
81 /// RTPSender allows an application to control how a given Track is encoded and transmitted to a remote peer
82 pub struct RTCRtpSender {
83     pub(crate) track: Mutex<Option<Arc<dyn TrackLocal + Send + Sync>>>,
84 
85     pub(crate) srtp_stream: Arc<SrtpWriterFuture>,
86     pub(crate) stream_info: Mutex<StreamInfo>,
87     seq_trans: Arc<SequenceTransformer>,
88 
89     pub(crate) context: Mutex<TrackLocalContext>,
90 
91     pub(crate) transport: Arc<RTCDtlsTransport>,
92 
93     pub(crate) payload_type: PayloadType,
94     pub(crate) ssrc: SSRC,
95     receive_mtu: usize,
96 
97     /// a transceiver sender since we can just check the
98     /// transceiver negotiation status
99     pub(crate) negotiated: AtomicBool,
100 
101     pub(crate) media_engine: Arc<MediaEngine>,
102     pub(crate) interceptor: Arc<dyn Interceptor + Send + Sync>,
103 
104     pub(crate) id: String,
105 
106     /// The id of the initial track, even if we later change to a different
107     /// track id should be use when negotiating.
108     pub(crate) initial_track_id: std::sync::Mutex<Option<String>>,
109     /// AssociatedMediaStreamIds from the WebRTC specifcations
110     pub(crate) associated_media_stream_ids: std::sync::Mutex<Vec<String>>,
111 
112     rtp_transceiver: SyncMutex<Option<Weak<RTCRtpTransceiver>>>,
113 
114     send_called_tx: SyncMutex<Option<mpsc::Sender<()>>>,
115     stop_called_tx: Arc<Notify>,
116     stop_called_signal: Arc<AtomicBool>,
117 
118     pub(crate) paused: Arc<AtomicBool>,
119 
120     internal: Arc<RTPSenderInternal>,
121 }
122 
123 impl std::fmt::Debug for RTCRtpSender {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result124     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125         f.debug_struct("RTCRtpSender")
126             .field("id", &self.id)
127             .finish()
128     }
129 }
130 
131 impl RTCRtpSender {
new( receive_mtu: usize, track: Option<Arc<dyn TrackLocal + Send + Sync>>, transport: Arc<RTCDtlsTransport>, media_engine: Arc<MediaEngine>, interceptor: Arc<dyn Interceptor + Send + Sync>, start_paused: bool, ) -> Self132     pub async fn new(
133         receive_mtu: usize,
134         track: Option<Arc<dyn TrackLocal + Send + Sync>>,
135         transport: Arc<RTCDtlsTransport>,
136         media_engine: Arc<MediaEngine>,
137         interceptor: Arc<dyn Interceptor + Send + Sync>,
138         start_paused: bool,
139     ) -> Self {
140         let id = generate_crypto_random_string(
141             32,
142             b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ",
143         );
144         let (send_called_tx, send_called_rx) = mpsc::channel(1);
145         let stop_called_tx = Arc::new(Notify::new());
146         let stop_called_rx = stop_called_tx.clone();
147         let ssrc = rand::random::<u32>();
148         let stop_called_signal = Arc::new(AtomicBool::new(false));
149 
150         let internal = Arc::new(RTPSenderInternal {
151             send_called_rx: Mutex::new(send_called_rx),
152             stop_called_rx,
153             stop_called_signal: Arc::clone(&stop_called_signal),
154             rtcp_interceptor: Mutex::new(None),
155         });
156 
157         let seq_trans = Arc::new(SequenceTransformer::new());
158         let srtp_stream = Arc::new(SrtpWriterFuture {
159             closed: AtomicBool::new(false),
160             ssrc,
161             rtp_sender: Arc::downgrade(&internal),
162             rtp_transport: Arc::clone(&transport),
163             rtcp_read_stream: Mutex::new(None),
164             rtp_write_session: Mutex::new(None),
165             seq_trans: Arc::clone(&seq_trans),
166         });
167 
168         let srtp_rtcp_reader = Arc::clone(&srtp_stream) as Arc<dyn RTCPReader + Send + Sync>;
169         let rtcp_interceptor = interceptor.bind_rtcp_reader(srtp_rtcp_reader).await;
170         {
171             let mut internal_rtcp_interceptor = internal.rtcp_interceptor.lock().await;
172             *internal_rtcp_interceptor = Some(rtcp_interceptor);
173         }
174 
175         let stream_ids = track
176             .as_ref()
177             .map(|track| vec![track.stream_id().to_string()])
178             .unwrap_or_default();
179         Self {
180             track: Mutex::new(track),
181 
182             srtp_stream,
183             stream_info: Mutex::new(StreamInfo::default()),
184             seq_trans,
185 
186             context: Mutex::new(TrackLocalContext::default()),
187             transport,
188 
189             payload_type: 0,
190             ssrc,
191             receive_mtu,
192 
193             negotiated: AtomicBool::new(false),
194 
195             media_engine,
196             interceptor,
197 
198             id,
199             initial_track_id: std::sync::Mutex::new(None),
200             associated_media_stream_ids: std::sync::Mutex::new(stream_ids),
201 
202             rtp_transceiver: SyncMutex::new(None),
203 
204             send_called_tx: SyncMutex::new(Some(send_called_tx)),
205             stop_called_tx,
206             stop_called_signal,
207 
208             paused: Arc::new(AtomicBool::new(start_paused)),
209 
210             internal,
211         }
212     }
213 
is_negotiated(&self) -> bool214     pub(crate) fn is_negotiated(&self) -> bool {
215         self.negotiated.load(Ordering::SeqCst)
216     }
217 
set_negotiated(&self)218     pub(crate) fn set_negotiated(&self) {
219         self.negotiated.store(true, Ordering::SeqCst);
220     }
221 
set_rtp_transceiver(&self, rtp_transceiver: Option<Weak<RTCRtpTransceiver>>)222     pub(crate) fn set_rtp_transceiver(&self, rtp_transceiver: Option<Weak<RTCRtpTransceiver>>) {
223         if let Some(t) = rtp_transceiver.as_ref().and_then(|t| t.upgrade()) {
224             self.set_paused(!t.direction().has_send());
225         }
226         let mut tr = self.rtp_transceiver.lock();
227         *tr = rtp_transceiver;
228     }
229 
set_paused(&self, paused: bool)230     pub(crate) fn set_paused(&self, paused: bool) {
231         self.paused.store(paused, Ordering::SeqCst);
232     }
233 
234     /// transport returns the currently-configured DTLSTransport
235     /// if one has not yet been configured
transport(&self) -> Arc<RTCDtlsTransport>236     pub fn transport(&self) -> Arc<RTCDtlsTransport> {
237         Arc::clone(&self.transport)
238     }
239 
240     /// get_parameters describes the current configuration for the encoding and
241     /// transmission of media on the sender's track.
get_parameters(&self) -> RTCRtpSendParameters242     pub async fn get_parameters(&self) -> RTCRtpSendParameters {
243         let kind = {
244             let track = self.track.lock().await;
245             if let Some(t) = &*track {
246                 t.kind()
247             } else {
248                 RTPCodecType::default()
249             }
250         };
251 
252         let mut send_parameters = {
253             RTCRtpSendParameters {
254                 rtp_parameters: self
255                     .media_engine
256                     .get_rtp_parameters_by_kind(kind, RTCRtpTransceiverDirection::Sendonly),
257                 encodings: vec![RTCRtpEncodingParameters {
258                     ssrc: self.ssrc,
259                     payload_type: self.payload_type,
260                     ..Default::default()
261                 }],
262             }
263         };
264 
265         let codecs = {
266             let tr = self.rtp_transceiver.lock().clone();
267             if let Some(t) = &tr {
268                 if let Some(t) = t.upgrade() {
269                     t.get_codecs().await
270                 } else {
271                     self.media_engine.get_codecs_by_kind(kind)
272                 }
273             } else {
274                 self.media_engine.get_codecs_by_kind(kind)
275             }
276         };
277         send_parameters.rtp_parameters.codecs = codecs;
278 
279         send_parameters
280     }
281 
282     /// track returns the RTCRtpTransceiver track, or nil
track(&self) -> Option<Arc<dyn TrackLocal + Send + Sync>>283     pub async fn track(&self) -> Option<Arc<dyn TrackLocal + Send + Sync>> {
284         let track = self.track.lock().await;
285         track.clone()
286     }
287 
288     /// replace_track replaces the track currently being used as the sender's source with a new TrackLocal.
289     /// The new track must be of the same media kind (audio, video, etc) and switching the track should not
290     /// require negotiation.
replace_track( &self, track: Option<Arc<dyn TrackLocal + Send + Sync>>, ) -> Result<()>291     pub async fn replace_track(
292         &self,
293         track: Option<Arc<dyn TrackLocal + Send + Sync>>,
294     ) -> Result<()> {
295         if let Some(t) = &track {
296             let tr = self.rtp_transceiver.lock();
297             if let Some(r) = &*tr {
298                 if let Some(r) = r.upgrade() {
299                     if r.kind != t.kind() {
300                         return Err(Error::ErrRTPSenderNewTrackHasIncorrectKind);
301                     }
302                 } else {
303                     //TODO: what about None arc?
304                 }
305             } else {
306                 //TODO: what about None tr?
307             }
308         }
309 
310         if self.has_sent() {
311             let t = {
312                 let t = self.track.lock().await;
313                 t.clone()
314             };
315             if let Some(t) = t {
316                 let context = self.context.lock().await;
317                 t.unbind(&context).await?;
318             }
319         }
320 
321         if !self.has_sent() || track.is_none() {
322             let mut t = self.track.lock().await;
323             *t = track;
324             return Ok(());
325         }
326 
327         let context = {
328             let context = self.context.lock().await;
329             context.clone()
330         };
331 
332         let result = if let Some(t) = &track {
333             self.seq_trans.reset_offset();
334 
335             let new_context = TrackLocalContext {
336                 id: context.id.clone(),
337                 params: self
338                     .media_engine
339                     .get_rtp_parameters_by_kind(t.kind(), RTCRtpTransceiverDirection::Sendonly),
340                 ssrc: context.ssrc,
341                 write_stream: context.write_stream.clone(),
342                 paused: self.paused.clone(),
343             };
344 
345             t.bind(&new_context).await
346         } else {
347             Err(Error::ErrRTPSenderTrackNil)
348         };
349 
350         match result {
351             Err(err) => {
352                 // Re-bind the original track
353                 let track = self.track.lock().await;
354                 if let Some(t) = &*track {
355                     t.bind(&context).await?;
356                 }
357 
358                 Err(err)
359             }
360             Ok(codec) => {
361                 // Codec has changed
362                 if self.payload_type != codec.payload_type {
363                     let mut context = self.context.lock().await;
364                     context.params.codecs = vec![codec];
365                 }
366 
367                 {
368                     let mut t = self.track.lock().await;
369                     *t = track;
370                 }
371 
372                 Ok(())
373             }
374         }
375     }
376 
377     /// send Attempts to set the parameters controlling the sending of media.
send(&self, parameters: &RTCRtpSendParameters) -> Result<()>378     pub async fn send(&self, parameters: &RTCRtpSendParameters) -> Result<()> {
379         if self.has_sent() {
380             return Err(Error::ErrRTPSenderSendAlreadyCalled);
381         }
382 
383         let write_stream = Arc::new(InterceptorToTrackLocalWriter::new(self.paused.clone()));
384         let (context, stream_info) = {
385             let track = self.track.lock().await;
386             let mut context = TrackLocalContext {
387                 id: self.id.clone(),
388                 params: self.media_engine.get_rtp_parameters_by_kind(
389                     if let Some(t) = &*track {
390                         t.kind()
391                     } else {
392                         RTPCodecType::default()
393                     },
394                     RTCRtpTransceiverDirection::Sendonly,
395                 ),
396                 ssrc: parameters.encodings[0].ssrc,
397                 write_stream: Some(
398                     Arc::clone(&write_stream) as Arc<dyn TrackLocalWriter + Send + Sync>
399                 ),
400                 paused: self.paused.clone(),
401             };
402 
403             let codec = if let Some(t) = &*track {
404                 t.bind(&context).await?
405             } else {
406                 RTCRtpCodecParameters::default()
407             };
408             let payload_type = codec.payload_type;
409             let capability = codec.capability.clone();
410             context.params.codecs = vec![codec];
411             let stream_info = create_stream_info(
412                 self.id.clone(),
413                 parameters.encodings[0].ssrc,
414                 payload_type,
415                 capability,
416                 &parameters.rtp_parameters.header_extensions,
417             );
418 
419             (context, stream_info)
420         };
421 
422         let srtp_rtp_writer = Arc::clone(&self.srtp_stream) as Arc<dyn RTPWriter + Send + Sync>;
423         let rtp_interceptor = self
424             .interceptor
425             .bind_local_stream(&stream_info, srtp_rtp_writer)
426             .await;
427         {
428             let mut interceptor_rtp_writer = write_stream.interceptor_rtp_writer.lock().await;
429             *interceptor_rtp_writer = Some(rtp_interceptor);
430         }
431 
432         {
433             let mut ctx = self.context.lock().await;
434             *ctx = context;
435         }
436         {
437             let mut si = self.stream_info.lock().await;
438             *si = stream_info;
439         }
440 
441         {
442             let mut send_called_tx = self.send_called_tx.lock();
443             send_called_tx.take();
444         }
445 
446         Ok(())
447     }
448 
449     /// stop irreversibly stops the RTPSender
stop(&self) -> Result<()>450     pub async fn stop(&self) -> Result<()> {
451         if self.stop_called_signal.load(Ordering::SeqCst) {
452             return Ok(());
453         }
454         self.stop_called_signal.store(true, Ordering::SeqCst);
455         self.stop_called_tx.notify_waiters();
456 
457         if !self.has_sent() {
458             return Ok(());
459         }
460 
461         self.replace_track(None).await?;
462 
463         {
464             let stream_info = self.stream_info.lock().await;
465             self.interceptor.unbind_local_stream(&stream_info).await;
466         }
467 
468         self.srtp_stream.close().await
469     }
470 
471     /// read reads incoming RTCP for this RTPReceiver
read(&self, b: &mut [u8]) -> Result<(usize, Attributes)>472     pub async fn read(&self, b: &mut [u8]) -> Result<(usize, Attributes)> {
473         self.internal.read(b).await
474     }
475 
476     /// read_rtcp is a convenience method that wraps Read and unmarshals for you.
read_rtcp( &self, ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)>477     pub async fn read_rtcp(
478         &self,
479     ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)> {
480         self.internal.read_rtcp(self.receive_mtu).await
481     }
482 
483     /// Enables overriding outgoing `RTP` packets' `sequence number`s.
484     ///
485     /// Must be called once before any data sent or never called at all.
486     ///
487     /// # Errors
488     ///
489     /// Errors if this [`RTCRtpSender`] has started to send data or sequence
490     /// transforming has been already enabled.
enable_seq_transformer(&self) -> Result<()>491     pub fn enable_seq_transformer(&self) -> Result<()> {
492         self.seq_trans.enable()
493     }
494 
495     /// has_sent tells if data has been ever sent for this instance
has_sent(&self) -> bool496     pub(crate) fn has_sent(&self) -> bool {
497         let send_called_tx = self.send_called_tx.lock();
498         send_called_tx.is_none()
499     }
500 
501     /// has_stopped tells if stop has been called
has_stopped(&self) -> bool502     pub(crate) async fn has_stopped(&self) -> bool {
503         self.stop_called_signal.load(Ordering::SeqCst)
504     }
505 
initial_track_id(&self) -> Option<String>506     pub(crate) fn initial_track_id(&self) -> Option<String> {
507         let lock = self.initial_track_id.lock().unwrap();
508 
509         lock.clone()
510     }
511 
set_initial_track_id(&self, id: String) -> Result<()>512     pub(crate) fn set_initial_track_id(&self, id: String) -> Result<()> {
513         let mut lock = self.initial_track_id.lock().unwrap();
514 
515         if lock.is_some() {
516             return Err(Error::ErrSenderInitialTrackIdAlreadySet);
517         }
518 
519         *lock = Some(id);
520 
521         Ok(())
522     }
523 
associate_media_stream_id(&self, id: String) -> bool524     pub(crate) fn associate_media_stream_id(&self, id: String) -> bool {
525         let mut lock = self.associated_media_stream_ids.lock().unwrap();
526 
527         if lock.contains(&id) {
528             return false;
529         }
530 
531         lock.push(id);
532 
533         true
534     }
535 
associated_media_stream_ids(&self) -> Vec<String>536     pub(crate) fn associated_media_stream_ids(&self) -> Vec<String> {
537         let lock = self.associated_media_stream_ids.lock().unwrap();
538 
539         lock.clone()
540     }
541 }
542