xref: /webrtc/webrtc/src/rtp_transceiver/mod.rs (revision b6cd8091)
1 #[cfg(test)]
2 mod rtp_transceiver_test;
3 
4 use crate::api::media_engine::MediaEngine;
5 use crate::error::{Error, Result};
6 use crate::rtp_transceiver::rtp_codec::*;
7 use crate::rtp_transceiver::rtp_receiver::{RTCRtpReceiver, RTPReceiverInternal};
8 use crate::rtp_transceiver::rtp_sender::RTCRtpSender;
9 use crate::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection;
10 use crate::track::track_local::TrackLocal;
11 
12 use interceptor::{
13     stream_info::{RTPHeaderExtension, StreamInfo},
14     Attributes,
15 };
16 
17 use log::trace;
18 use serde::{Deserialize, Serialize};
19 use std::fmt;
20 use std::future::Future;
21 use std::pin::Pin;
22 use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
23 use std::sync::Arc;
24 use tokio::sync::{Mutex, OnceCell};
25 use util::Unmarshal;
26 
27 pub(crate) mod fmtp;
28 pub mod rtp_codec;
29 pub mod rtp_receiver;
30 pub mod rtp_sender;
31 pub mod rtp_transceiver_direction;
32 pub(crate) mod srtp_writer_future;
33 use util::sync::Mutex as SyncMutex;
34 
35 /// SSRC represents a synchronization source
36 /// A synchronization source is a randomly chosen
37 /// value meant to be globally unique within a particular
38 /// RTP session. Used to identify a single stream of media.
39 /// <https://tools.ietf.org/html/rfc3550#section-3>
40 #[allow(clippy::upper_case_acronyms)]
41 pub type SSRC = u32;
42 
43 /// PayloadType identifies the format of the RTP payload and determines
44 /// its interpretation by the application. Each codec in a RTP Session
45 /// will have a different PayloadType
46 /// <https://tools.ietf.org/html/rfc3550#section-3>
47 pub type PayloadType = u8;
48 
49 /// TYPE_RTCP_FBT_RANSPORT_CC ..
50 pub const TYPE_RTCP_FB_TRANSPORT_CC: &str = "transport-cc";
51 
52 /// TYPE_RTCP_FB_GOOG_REMB ..
53 pub const TYPE_RTCP_FB_GOOG_REMB: &str = "goog-remb";
54 
55 /// TYPE_RTCP_FB_ACK ..
56 pub const TYPE_RTCP_FB_ACK: &str = "ack";
57 
58 /// TYPE_RTCP_FB_CCM ..
59 pub const TYPE_RTCP_FB_CCM: &str = "ccm";
60 
61 /// TYPE_RTCP_FB_NACK ..
62 pub const TYPE_RTCP_FB_NACK: &str = "nack";
63 
64 /// rtcpfeedback signals the connection to use additional RTCP packet types.
65 /// <https://draft.ortc.org/#dom-rtcrtcpfeedback>
66 #[derive(Default, Debug, Clone, PartialEq, Eq)]
67 pub struct RTCPFeedback {
68     /// Type is the type of feedback.
69     /// see: <https://draft.ortc.org/#dom-rtcrtcpfeedback>
70     /// valid: ack, ccm, nack, goog-remb, transport-cc
71     pub typ: String,
72 
73     /// The parameter value depends on the type.
74     /// For example, type="nack" parameter="pli" will send Picture Loss Indicator packets.
75     pub parameter: String,
76 }
77 
78 /// RTPCapabilities represents the capabilities of a transceiver
79 /// <https://w3c.github.io/webrtc-pc/#rtcrtpcapabilities>
80 #[derive(Default, Debug, Clone)]
81 pub struct RTCRtpCapabilities {
82     pub codecs: Vec<RTCRtpCodecCapability>,
83     pub header_extensions: Vec<RTCRtpHeaderExtensionCapability>,
84 }
85 
86 /// RTPRtxParameters dictionary contains information relating to retransmission (RTX) settings.
87 /// <https://draft.ortc.org/#dom-rtcrtprtxparameters>
88 #[derive(Default, Debug, Clone, Serialize, Deserialize)]
89 pub struct RTCRtpRtxParameters {
90     pub ssrc: SSRC,
91 }
92 
93 /// RTPCodingParameters provides information relating to both encoding and decoding.
94 /// This is a subset of the RFC since Pion WebRTC doesn't implement encoding/decoding itself
95 /// <http://draft.ortc.org/#dom-rtcrtpcodingparameters>
96 #[derive(Default, Debug, Clone, Serialize, Deserialize)]
97 pub struct RTCRtpCodingParameters {
98     pub rid: String,
99     pub ssrc: SSRC,
100     pub payload_type: PayloadType,
101     pub rtx: RTCRtpRtxParameters,
102 }
103 
104 /// RTPDecodingParameters provides information relating to both encoding and decoding.
105 /// This is a subset of the RFC since Pion WebRTC doesn't implement decoding itself
106 /// <http://draft.ortc.org/#dom-rtcrtpdecodingparameters>
107 pub type RTCRtpDecodingParameters = RTCRtpCodingParameters;
108 
109 /// RTPEncodingParameters provides information relating to both encoding and decoding.
110 /// This is a subset of the RFC since Pion WebRTC doesn't implement encoding itself
111 /// <http://draft.ortc.org/#dom-rtcrtpencodingparameters>
112 pub type RTCRtpEncodingParameters = RTCRtpCodingParameters;
113 
114 /// RTPReceiveParameters contains the RTP stack settings used by receivers
115 #[derive(Debug)]
116 pub struct RTCRtpReceiveParameters {
117     pub encodings: Vec<RTCRtpDecodingParameters>,
118 }
119 
120 /// RTPSendParameters contains the RTP stack settings used by receivers
121 #[derive(Debug)]
122 pub struct RTCRtpSendParameters {
123     pub rtp_parameters: RTCRtpParameters,
124     pub encodings: Vec<RTCRtpEncodingParameters>,
125 }
126 
127 /// RTPTransceiverInit dictionary is used when calling the WebRTC function addTransceiver() to provide configuration options for the new transceiver.
128 pub struct RTCRtpTransceiverInit {
129     pub direction: RTCRtpTransceiverDirection,
130     pub send_encodings: Vec<RTCRtpEncodingParameters>,
131     // Streams       []*Track
132 }
133 
create_stream_info( id: String, ssrc: SSRC, payload_type: PayloadType, codec: RTCRtpCodecCapability, webrtc_header_extensions: &[RTCRtpHeaderExtensionParameters], ) -> StreamInfo134 pub(crate) fn create_stream_info(
135     id: String,
136     ssrc: SSRC,
137     payload_type: PayloadType,
138     codec: RTCRtpCodecCapability,
139     webrtc_header_extensions: &[RTCRtpHeaderExtensionParameters],
140 ) -> StreamInfo {
141     let header_extensions: Vec<RTPHeaderExtension> = webrtc_header_extensions
142         .iter()
143         .map(|h| RTPHeaderExtension {
144             id: h.id,
145             uri: h.uri.clone(),
146         })
147         .collect();
148 
149     let feedbacks: Vec<_> = codec
150         .rtcp_feedback
151         .iter()
152         .map(|f| interceptor::stream_info::RTCPFeedback {
153             typ: f.typ.clone(),
154             parameter: f.parameter.clone(),
155         })
156         .collect();
157 
158     StreamInfo {
159         id,
160         attributes: Attributes::new(),
161         ssrc,
162         payload_type,
163         rtp_header_extensions: header_extensions,
164         mime_type: codec.mime_type,
165         clock_rate: codec.clock_rate,
166         channels: codec.channels,
167         sdp_fmtp_line: codec.sdp_fmtp_line,
168         rtcp_feedback: feedbacks,
169     }
170 }
171 
172 pub type TriggerNegotiationNeededFnOption =
173     Option<Box<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> + Send + Sync>>;
174 
175 /// RTPTransceiver represents a combination of an RTPSender and an RTPReceiver that share a common mid.
176 pub struct RTCRtpTransceiver {
177     mid: OnceCell<String>,                    //atomic.Value
178     sender: SyncMutex<Arc<RTCRtpSender>>,     //atomic.Value
179     receiver: SyncMutex<Arc<RTCRtpReceiver>>, //atomic.Value
180 
181     direction: AtomicU8,         //RTPTransceiverDirection
182     current_direction: AtomicU8, //RTPTransceiverDirection
183 
184     codecs: Arc<Mutex<Vec<RTCRtpCodecParameters>>>, // User provided codecs via set_codec_preferences
185 
186     pub(crate) stopped: AtomicBool,
187     pub(crate) kind: RTPCodecType,
188 
189     media_engine: Arc<MediaEngine>,
190 
191     trigger_negotiation_needed: Mutex<TriggerNegotiationNeededFnOption>,
192 }
193 
194 impl RTCRtpTransceiver {
new( receiver: Arc<RTCRtpReceiver>, sender: Arc<RTCRtpSender>, direction: RTCRtpTransceiverDirection, kind: RTPCodecType, codecs: Vec<RTCRtpCodecParameters>, media_engine: Arc<MediaEngine>, trigger_negotiation_needed: TriggerNegotiationNeededFnOption, ) -> Arc<Self>195     pub async fn new(
196         receiver: Arc<RTCRtpReceiver>,
197         sender: Arc<RTCRtpSender>,
198         direction: RTCRtpTransceiverDirection,
199         kind: RTPCodecType,
200         codecs: Vec<RTCRtpCodecParameters>,
201         media_engine: Arc<MediaEngine>,
202         trigger_negotiation_needed: TriggerNegotiationNeededFnOption,
203     ) -> Arc<Self> {
204         let codecs = Arc::new(Mutex::new(codecs));
205         receiver.set_transceiver_codecs(Some(Arc::clone(&codecs)));
206 
207         let t = Arc::new(RTCRtpTransceiver {
208             mid: OnceCell::new(),
209             sender: SyncMutex::new(sender),
210             receiver: SyncMutex::new(receiver),
211 
212             direction: AtomicU8::new(direction as u8),
213             current_direction: AtomicU8::new(RTCRtpTransceiverDirection::Unspecified as u8),
214 
215             codecs,
216             stopped: AtomicBool::new(false),
217             kind,
218             media_engine,
219             trigger_negotiation_needed: Mutex::new(trigger_negotiation_needed),
220         });
221         t.sender().set_rtp_transceiver(Some(Arc::downgrade(&t)));
222 
223         t
224     }
225 
226     /// set_codec_preferences sets preferred list of supported codecs
227     /// if codecs is empty or nil we reset to default from MediaEngine
set_codec_preferences(&self, codecs: Vec<RTCRtpCodecParameters>) -> Result<()>228     pub async fn set_codec_preferences(&self, codecs: Vec<RTCRtpCodecParameters>) -> Result<()> {
229         for codec in &codecs {
230             let media_engine_codecs = self.media_engine.get_codecs_by_kind(self.kind);
231             let (_, match_type) = codec_parameters_fuzzy_search(codec, &media_engine_codecs);
232             if match_type == CodecMatch::None {
233                 return Err(Error::ErrRTPTransceiverCodecUnsupported);
234             }
235         }
236 
237         {
238             let mut c = self.codecs.lock().await;
239             *c = codecs;
240         }
241         Ok(())
242     }
243 
244     /// Codecs returns list of supported codecs
get_codecs(&self) -> Vec<RTCRtpCodecParameters>245     pub(crate) async fn get_codecs(&self) -> Vec<RTCRtpCodecParameters> {
246         let mut codecs = self.codecs.lock().await;
247         RTPReceiverInternal::get_codecs(&mut codecs, self.kind, &self.media_engine)
248     }
249 
250     /// sender returns the RTPTransceiver's RTPSender if it has one
sender(&self) -> Arc<RTCRtpSender>251     pub fn sender(&self) -> Arc<RTCRtpSender> {
252         let sender = self.sender.lock();
253         sender.clone()
254     }
255 
256     /// set_sender_track sets the RTPSender and Track to current transceiver
set_sender_track( self: &Arc<Self>, sender: Arc<RTCRtpSender>, track: Option<Arc<dyn TrackLocal + Send + Sync>>, ) -> Result<()>257     pub async fn set_sender_track(
258         self: &Arc<Self>,
259         sender: Arc<RTCRtpSender>,
260         track: Option<Arc<dyn TrackLocal + Send + Sync>>,
261     ) -> Result<()> {
262         self.set_sender(sender);
263         self.set_sending_track(track).await
264     }
265 
set_sender(self: &Arc<Self>, s: Arc<RTCRtpSender>)266     pub fn set_sender(self: &Arc<Self>, s: Arc<RTCRtpSender>) {
267         s.set_rtp_transceiver(Some(Arc::downgrade(self)));
268 
269         let prev_sender = self.sender();
270         prev_sender.set_rtp_transceiver(None);
271 
272         {
273             let mut sender = self.sender.lock();
274             *sender = s;
275         }
276     }
277 
278     /// receiver returns the RTPTransceiver's RTPReceiver if it has one
receiver(&self) -> Arc<RTCRtpReceiver>279     pub fn receiver(&self) -> Arc<RTCRtpReceiver> {
280         let receiver = self.receiver.lock();
281         receiver.clone()
282     }
283 
set_receiver(&self, r: Arc<RTCRtpReceiver>)284     pub(crate) fn set_receiver(&self, r: Arc<RTCRtpReceiver>) {
285         r.set_transceiver_codecs(Some(Arc::clone(&self.codecs)));
286 
287         {
288             let mut receiver = self.receiver.lock();
289             (*receiver).set_transceiver_codecs(None);
290 
291             *receiver = r;
292         }
293     }
294 
295     /// set_mid sets the RTPTransceiver's mid. If it was already set, will return an error.
set_mid(&self, mid: String) -> Result<()>296     pub(crate) fn set_mid(&self, mid: String) -> Result<()> {
297         self.mid
298             .set(mid)
299             .map_err(|_| Error::ErrRTPTransceiverCannotChangeMid)
300     }
301 
302     /// mid gets the Transceiver's mid value. When not already set, this value will be set in CreateOffer or create_answer.
mid(&self) -> Option<String>303     pub fn mid(&self) -> Option<String> {
304         self.mid.get().map(Clone::clone)
305     }
306 
307     /// kind returns RTPTransceiver's kind.
kind(&self) -> RTPCodecType308     pub fn kind(&self) -> RTPCodecType {
309         self.kind
310     }
311 
312     /// direction returns the RTPTransceiver's desired direction.
direction(&self) -> RTCRtpTransceiverDirection313     pub fn direction(&self) -> RTCRtpTransceiverDirection {
314         self.direction.load(Ordering::SeqCst).into()
315     }
316 
317     /// Set the direction of this transceiver. This might trigger a renegotiation.
set_direction(&self, d: RTCRtpTransceiverDirection)318     pub async fn set_direction(&self, d: RTCRtpTransceiverDirection) {
319         let changed = self.set_direction_internal(d);
320 
321         if changed {
322             let lock = self.trigger_negotiation_needed.lock().await;
323             if let Some(trigger) = &*lock {
324                 (trigger)().await;
325             }
326         }
327     }
328 
set_direction_internal(&self, d: RTCRtpTransceiverDirection) -> bool329     pub(crate) fn set_direction_internal(&self, d: RTCRtpTransceiverDirection) -> bool {
330         let previous: RTCRtpTransceiverDirection =
331             self.direction.swap(d as u8, Ordering::SeqCst).into();
332 
333         let changed = d != previous;
334 
335         if changed {
336             trace!(
337                 "Changing direction of transceiver from {} to {}",
338                 previous,
339                 d
340             );
341         }
342 
343         changed
344     }
345 
346     /// current_direction returns the RTPTransceiver's current direction as negotiated.
347     ///
348     /// If this transceiver has never been negotiated or if it's stopped this returns [`RTCRtpTransceiverDirection::Unspecified`].
current_direction(&self) -> RTCRtpTransceiverDirection349     pub fn current_direction(&self) -> RTCRtpTransceiverDirection {
350         if self.stopped.load(Ordering::SeqCst) {
351             return RTCRtpTransceiverDirection::Unspecified;
352         }
353 
354         self.current_direction.load(Ordering::SeqCst).into()
355     }
356 
set_current_direction(&self, d: RTCRtpTransceiverDirection)357     pub(crate) fn set_current_direction(&self, d: RTCRtpTransceiverDirection) {
358         let previous: RTCRtpTransceiverDirection = self
359             .current_direction
360             .swap(d as u8, Ordering::SeqCst)
361             .into();
362 
363         if d != previous {
364             trace!(
365                 "Changing current direction of transceiver from {} to {}",
366                 previous,
367                 d,
368             );
369         }
370     }
371 
372     /// Perform any subsequent actions after altering the transceiver's direction.
373     ///
374     /// After changing the transceiver's direction this method should be called to perform any
375     /// side-effects that results from the new direction, such as pausing/resuming the RTP receiver.
process_new_current_direction( &self, previous_direction: RTCRtpTransceiverDirection, ) -> Result<()>376     pub(crate) async fn process_new_current_direction(
377         &self,
378         previous_direction: RTCRtpTransceiverDirection,
379     ) -> Result<()> {
380         if self.stopped.load(Ordering::SeqCst) {
381             return Ok(());
382         }
383 
384         let current_direction = self.current_direction();
385         if previous_direction != current_direction {
386             let mid = self.mid();
387             trace!(
388                 "Processing transceiver({:?}) direction change from {} to {}",
389                 mid,
390                 previous_direction,
391                 current_direction
392             );
393         } else {
394             // no change.
395             return Ok(());
396         }
397 
398         {
399             let receiver = self.receiver.lock().clone();
400             let pause_receiver = !current_direction.has_recv();
401 
402             if pause_receiver {
403                 receiver.pause().await?;
404             } else {
405                 receiver.resume().await?;
406             }
407         }
408 
409         let pause_sender = !current_direction.has_send();
410         {
411             let sender = &*self.sender.lock();
412             sender.set_paused(pause_sender);
413         }
414 
415         Ok(())
416     }
417 
418     /// stop irreversibly stops the RTPTransceiver
stop(&self) -> Result<()>419     pub async fn stop(&self) -> Result<()> {
420         if self.stopped.load(Ordering::SeqCst) {
421             return Ok(());
422         }
423 
424         self.stopped.store(true, Ordering::SeqCst);
425 
426         {
427             let sender = self.sender.lock();
428             sender.stop().await?;
429         }
430         {
431             let r = self.receiver.lock();
432             r.stop().await?;
433         }
434 
435         self.set_direction_internal(RTCRtpTransceiverDirection::Inactive);
436 
437         Ok(())
438     }
439 
set_sending_track( &self, track: Option<Arc<dyn TrackLocal + Send + Sync>>, ) -> Result<()>440     pub(crate) async fn set_sending_track(
441         &self,
442         track: Option<Arc<dyn TrackLocal + Send + Sync>>,
443     ) -> Result<()> {
444         let track_is_none = track.is_none();
445         {
446             let sender = self.sender.lock().clone();
447             sender.replace_track(track).await?;
448         }
449 
450         let direction = self.direction();
451         let should_send = !track_is_none;
452         let should_recv = direction.has_recv();
453         self.set_direction_internal(RTCRtpTransceiverDirection::from_send_recv(
454             should_send,
455             should_recv,
456         ));
457 
458         Ok(())
459     }
460 }
461 
462 impl fmt::Debug for RTCRtpTransceiver {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result463     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
464         f.debug_struct("RTCRtpTransceiver")
465             .field("mid", &self.mid)
466             .field("sender", &self.sender)
467             .field("receiver", &self.receiver)
468             .field("direction", &self.direction)
469             .field("current_direction", &self.current_direction)
470             .field("codecs", &self.codecs)
471             .field("stopped", &self.stopped)
472             .field("kind", &self.kind)
473             .finish()
474     }
475 }
476 
find_by_mid( mid: &str, local_transceivers: &mut Vec<Arc<RTCRtpTransceiver>>, ) -> Option<Arc<RTCRtpTransceiver>>477 pub(crate) async fn find_by_mid(
478     mid: &str,
479     local_transceivers: &mut Vec<Arc<RTCRtpTransceiver>>,
480 ) -> Option<Arc<RTCRtpTransceiver>> {
481     for (i, t) in local_transceivers.iter().enumerate() {
482         if t.mid().as_deref() == Some(mid) {
483             return Some(local_transceivers.remove(i));
484         }
485     }
486 
487     None
488 }
489 
490 /// Given a direction+type pluck a transceiver from the passed list
491 /// if no entry satisfies the requested type+direction return a inactive Transceiver
satisfy_type_and_direction( remote_kind: RTPCodecType, remote_direction: RTCRtpTransceiverDirection, local_transceivers: &mut Vec<Arc<RTCRtpTransceiver>>, ) -> Option<Arc<RTCRtpTransceiver>>492 pub(crate) async fn satisfy_type_and_direction(
493     remote_kind: RTPCodecType,
494     remote_direction: RTCRtpTransceiverDirection,
495     local_transceivers: &mut Vec<Arc<RTCRtpTransceiver>>,
496 ) -> Option<Arc<RTCRtpTransceiver>> {
497     // Get direction order from most preferred to least
498     let get_preferred_directions = || -> Vec<RTCRtpTransceiverDirection> {
499         match remote_direction {
500             RTCRtpTransceiverDirection::Sendrecv => vec![
501                 RTCRtpTransceiverDirection::Recvonly,
502                 RTCRtpTransceiverDirection::Sendrecv,
503             ],
504             RTCRtpTransceiverDirection::Sendonly => vec![RTCRtpTransceiverDirection::Recvonly],
505             RTCRtpTransceiverDirection::Recvonly => vec![
506                 RTCRtpTransceiverDirection::Sendonly,
507                 RTCRtpTransceiverDirection::Sendrecv,
508             ],
509             _ => vec![],
510         }
511     };
512 
513     for possible_direction in get_preferred_directions() {
514         for (i, t) in local_transceivers.iter().enumerate() {
515             if t.mid().is_none() && t.kind == remote_kind && possible_direction == t.direction() {
516                 return Some(local_transceivers.remove(i));
517             }
518         }
519     }
520 
521     None
522 }
523 
524 /// handle_unknown_rtp_packet consumes a single RTP Packet and returns information that is helpful
525 /// for demuxing and handling an unknown SSRC (usually for Simulcast)
handle_unknown_rtp_packet( buf: &[u8], mid_extension_id: u8, sid_extension_id: u8, rsid_extension_id: u8, ) -> Result<(String, String, String, PayloadType)>526 pub(crate) fn handle_unknown_rtp_packet(
527     buf: &[u8],
528     mid_extension_id: u8,
529     sid_extension_id: u8,
530     rsid_extension_id: u8,
531 ) -> Result<(String, String, String, PayloadType)> {
532     let mut reader = buf;
533     let rp = rtp::packet::Packet::unmarshal(&mut reader)?;
534 
535     if !rp.header.extension {
536         return Ok((String::new(), String::new(), String::new(), 0));
537     }
538 
539     let payload_type = rp.header.payload_type;
540 
541     let mid = if let Some(payload) = rp.header.get_extension(mid_extension_id) {
542         String::from_utf8(payload.to_vec())?
543     } else {
544         String::new()
545     };
546 
547     let rid = if let Some(payload) = rp.header.get_extension(sid_extension_id) {
548         String::from_utf8(payload.to_vec())?
549     } else {
550         String::new()
551     };
552 
553     let srid = if let Some(payload) = rp.header.get_extension(rsid_extension_id) {
554         String::from_utf8(payload.to_vec())?
555     } else {
556         String::new()
557     };
558 
559     Ok((mid, rid, srid, payload_type))
560 }
561