1 #[cfg(test)]
2 mod rtp_transceiver_test;
3
4 use crate::api::media_engine::MediaEngine;
5 use crate::error::{Error, Result};
6 use crate::rtp_transceiver::rtp_codec::*;
7 use crate::rtp_transceiver::rtp_receiver::{RTCRtpReceiver, RTPReceiverInternal};
8 use crate::rtp_transceiver::rtp_sender::RTCRtpSender;
9 use crate::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection;
10 use crate::track::track_local::TrackLocal;
11
12 use interceptor::{
13 stream_info::{RTPHeaderExtension, StreamInfo},
14 Attributes,
15 };
16
17 use log::trace;
18 use serde::{Deserialize, Serialize};
19 use std::fmt;
20 use std::future::Future;
21 use std::pin::Pin;
22 use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
23 use std::sync::Arc;
24 use tokio::sync::{Mutex, OnceCell};
25 use util::Unmarshal;
26
27 pub(crate) mod fmtp;
28 pub mod rtp_codec;
29 pub mod rtp_receiver;
30 pub mod rtp_sender;
31 pub mod rtp_transceiver_direction;
32 pub(crate) mod srtp_writer_future;
33 use util::sync::Mutex as SyncMutex;
34
35 /// SSRC represents a synchronization source
36 /// A synchronization source is a randomly chosen
37 /// value meant to be globally unique within a particular
38 /// RTP session. Used to identify a single stream of media.
39 /// <https://tools.ietf.org/html/rfc3550#section-3>
40 #[allow(clippy::upper_case_acronyms)]
41 pub type SSRC = u32;
42
43 /// PayloadType identifies the format of the RTP payload and determines
44 /// its interpretation by the application. Each codec in a RTP Session
45 /// will have a different PayloadType
46 /// <https://tools.ietf.org/html/rfc3550#section-3>
47 pub type PayloadType = u8;
48
49 /// TYPE_RTCP_FBT_RANSPORT_CC ..
50 pub const TYPE_RTCP_FB_TRANSPORT_CC: &str = "transport-cc";
51
52 /// TYPE_RTCP_FB_GOOG_REMB ..
53 pub const TYPE_RTCP_FB_GOOG_REMB: &str = "goog-remb";
54
55 /// TYPE_RTCP_FB_ACK ..
56 pub const TYPE_RTCP_FB_ACK: &str = "ack";
57
58 /// TYPE_RTCP_FB_CCM ..
59 pub const TYPE_RTCP_FB_CCM: &str = "ccm";
60
61 /// TYPE_RTCP_FB_NACK ..
62 pub const TYPE_RTCP_FB_NACK: &str = "nack";
63
64 /// rtcpfeedback signals the connection to use additional RTCP packet types.
65 /// <https://draft.ortc.org/#dom-rtcrtcpfeedback>
66 #[derive(Default, Debug, Clone, PartialEq, Eq)]
67 pub struct RTCPFeedback {
68 /// Type is the type of feedback.
69 /// see: <https://draft.ortc.org/#dom-rtcrtcpfeedback>
70 /// valid: ack, ccm, nack, goog-remb, transport-cc
71 pub typ: String,
72
73 /// The parameter value depends on the type.
74 /// For example, type="nack" parameter="pli" will send Picture Loss Indicator packets.
75 pub parameter: String,
76 }
77
78 /// RTPCapabilities represents the capabilities of a transceiver
79 /// <https://w3c.github.io/webrtc-pc/#rtcrtpcapabilities>
80 #[derive(Default, Debug, Clone)]
81 pub struct RTCRtpCapabilities {
82 pub codecs: Vec<RTCRtpCodecCapability>,
83 pub header_extensions: Vec<RTCRtpHeaderExtensionCapability>,
84 }
85
86 /// RTPRtxParameters dictionary contains information relating to retransmission (RTX) settings.
87 /// <https://draft.ortc.org/#dom-rtcrtprtxparameters>
88 #[derive(Default, Debug, Clone, Serialize, Deserialize)]
89 pub struct RTCRtpRtxParameters {
90 pub ssrc: SSRC,
91 }
92
93 /// RTPCodingParameters provides information relating to both encoding and decoding.
94 /// This is a subset of the RFC since Pion WebRTC doesn't implement encoding/decoding itself
95 /// <http://draft.ortc.org/#dom-rtcrtpcodingparameters>
96 #[derive(Default, Debug, Clone, Serialize, Deserialize)]
97 pub struct RTCRtpCodingParameters {
98 pub rid: String,
99 pub ssrc: SSRC,
100 pub payload_type: PayloadType,
101 pub rtx: RTCRtpRtxParameters,
102 }
103
104 /// RTPDecodingParameters provides information relating to both encoding and decoding.
105 /// This is a subset of the RFC since Pion WebRTC doesn't implement decoding itself
106 /// <http://draft.ortc.org/#dom-rtcrtpdecodingparameters>
107 pub type RTCRtpDecodingParameters = RTCRtpCodingParameters;
108
109 /// RTPEncodingParameters provides information relating to both encoding and decoding.
110 /// This is a subset of the RFC since Pion WebRTC doesn't implement encoding itself
111 /// <http://draft.ortc.org/#dom-rtcrtpencodingparameters>
112 pub type RTCRtpEncodingParameters = RTCRtpCodingParameters;
113
114 /// RTPReceiveParameters contains the RTP stack settings used by receivers
115 #[derive(Debug)]
116 pub struct RTCRtpReceiveParameters {
117 pub encodings: Vec<RTCRtpDecodingParameters>,
118 }
119
120 /// RTPSendParameters contains the RTP stack settings used by receivers
121 #[derive(Debug)]
122 pub struct RTCRtpSendParameters {
123 pub rtp_parameters: RTCRtpParameters,
124 pub encodings: Vec<RTCRtpEncodingParameters>,
125 }
126
127 /// RTPTransceiverInit dictionary is used when calling the WebRTC function addTransceiver() to provide configuration options for the new transceiver.
128 pub struct RTCRtpTransceiverInit {
129 pub direction: RTCRtpTransceiverDirection,
130 pub send_encodings: Vec<RTCRtpEncodingParameters>,
131 // Streams []*Track
132 }
133
create_stream_info( id: String, ssrc: SSRC, payload_type: PayloadType, codec: RTCRtpCodecCapability, webrtc_header_extensions: &[RTCRtpHeaderExtensionParameters], ) -> StreamInfo134 pub(crate) fn create_stream_info(
135 id: String,
136 ssrc: SSRC,
137 payload_type: PayloadType,
138 codec: RTCRtpCodecCapability,
139 webrtc_header_extensions: &[RTCRtpHeaderExtensionParameters],
140 ) -> StreamInfo {
141 let header_extensions: Vec<RTPHeaderExtension> = webrtc_header_extensions
142 .iter()
143 .map(|h| RTPHeaderExtension {
144 id: h.id,
145 uri: h.uri.clone(),
146 })
147 .collect();
148
149 let feedbacks: Vec<_> = codec
150 .rtcp_feedback
151 .iter()
152 .map(|f| interceptor::stream_info::RTCPFeedback {
153 typ: f.typ.clone(),
154 parameter: f.parameter.clone(),
155 })
156 .collect();
157
158 StreamInfo {
159 id,
160 attributes: Attributes::new(),
161 ssrc,
162 payload_type,
163 rtp_header_extensions: header_extensions,
164 mime_type: codec.mime_type,
165 clock_rate: codec.clock_rate,
166 channels: codec.channels,
167 sdp_fmtp_line: codec.sdp_fmtp_line,
168 rtcp_feedback: feedbacks,
169 }
170 }
171
172 pub type TriggerNegotiationNeededFnOption =
173 Option<Box<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> + Send + Sync>>;
174
175 /// RTPTransceiver represents a combination of an RTPSender and an RTPReceiver that share a common mid.
176 pub struct RTCRtpTransceiver {
177 mid: OnceCell<String>, //atomic.Value
178 sender: SyncMutex<Arc<RTCRtpSender>>, //atomic.Value
179 receiver: SyncMutex<Arc<RTCRtpReceiver>>, //atomic.Value
180
181 direction: AtomicU8, //RTPTransceiverDirection
182 current_direction: AtomicU8, //RTPTransceiverDirection
183
184 codecs: Arc<Mutex<Vec<RTCRtpCodecParameters>>>, // User provided codecs via set_codec_preferences
185
186 pub(crate) stopped: AtomicBool,
187 pub(crate) kind: RTPCodecType,
188
189 media_engine: Arc<MediaEngine>,
190
191 trigger_negotiation_needed: Mutex<TriggerNegotiationNeededFnOption>,
192 }
193
194 impl RTCRtpTransceiver {
new( receiver: Arc<RTCRtpReceiver>, sender: Arc<RTCRtpSender>, direction: RTCRtpTransceiverDirection, kind: RTPCodecType, codecs: Vec<RTCRtpCodecParameters>, media_engine: Arc<MediaEngine>, trigger_negotiation_needed: TriggerNegotiationNeededFnOption, ) -> Arc<Self>195 pub async fn new(
196 receiver: Arc<RTCRtpReceiver>,
197 sender: Arc<RTCRtpSender>,
198 direction: RTCRtpTransceiverDirection,
199 kind: RTPCodecType,
200 codecs: Vec<RTCRtpCodecParameters>,
201 media_engine: Arc<MediaEngine>,
202 trigger_negotiation_needed: TriggerNegotiationNeededFnOption,
203 ) -> Arc<Self> {
204 let codecs = Arc::new(Mutex::new(codecs));
205 receiver.set_transceiver_codecs(Some(Arc::clone(&codecs)));
206
207 let t = Arc::new(RTCRtpTransceiver {
208 mid: OnceCell::new(),
209 sender: SyncMutex::new(sender),
210 receiver: SyncMutex::new(receiver),
211
212 direction: AtomicU8::new(direction as u8),
213 current_direction: AtomicU8::new(RTCRtpTransceiverDirection::Unspecified as u8),
214
215 codecs,
216 stopped: AtomicBool::new(false),
217 kind,
218 media_engine,
219 trigger_negotiation_needed: Mutex::new(trigger_negotiation_needed),
220 });
221 t.sender().set_rtp_transceiver(Some(Arc::downgrade(&t)));
222
223 t
224 }
225
226 /// set_codec_preferences sets preferred list of supported codecs
227 /// if codecs is empty or nil we reset to default from MediaEngine
set_codec_preferences(&self, codecs: Vec<RTCRtpCodecParameters>) -> Result<()>228 pub async fn set_codec_preferences(&self, codecs: Vec<RTCRtpCodecParameters>) -> Result<()> {
229 for codec in &codecs {
230 let media_engine_codecs = self.media_engine.get_codecs_by_kind(self.kind);
231 let (_, match_type) = codec_parameters_fuzzy_search(codec, &media_engine_codecs);
232 if match_type == CodecMatch::None {
233 return Err(Error::ErrRTPTransceiverCodecUnsupported);
234 }
235 }
236
237 {
238 let mut c = self.codecs.lock().await;
239 *c = codecs;
240 }
241 Ok(())
242 }
243
244 /// Codecs returns list of supported codecs
get_codecs(&self) -> Vec<RTCRtpCodecParameters>245 pub(crate) async fn get_codecs(&self) -> Vec<RTCRtpCodecParameters> {
246 let mut codecs = self.codecs.lock().await;
247 RTPReceiverInternal::get_codecs(&mut codecs, self.kind, &self.media_engine)
248 }
249
250 /// sender returns the RTPTransceiver's RTPSender if it has one
sender(&self) -> Arc<RTCRtpSender>251 pub fn sender(&self) -> Arc<RTCRtpSender> {
252 let sender = self.sender.lock();
253 sender.clone()
254 }
255
256 /// set_sender_track sets the RTPSender and Track to current transceiver
set_sender_track( self: &Arc<Self>, sender: Arc<RTCRtpSender>, track: Option<Arc<dyn TrackLocal + Send + Sync>>, ) -> Result<()>257 pub async fn set_sender_track(
258 self: &Arc<Self>,
259 sender: Arc<RTCRtpSender>,
260 track: Option<Arc<dyn TrackLocal + Send + Sync>>,
261 ) -> Result<()> {
262 self.set_sender(sender);
263 self.set_sending_track(track).await
264 }
265
set_sender(self: &Arc<Self>, s: Arc<RTCRtpSender>)266 pub fn set_sender(self: &Arc<Self>, s: Arc<RTCRtpSender>) {
267 s.set_rtp_transceiver(Some(Arc::downgrade(self)));
268
269 let prev_sender = self.sender();
270 prev_sender.set_rtp_transceiver(None);
271
272 {
273 let mut sender = self.sender.lock();
274 *sender = s;
275 }
276 }
277
278 /// receiver returns the RTPTransceiver's RTPReceiver if it has one
receiver(&self) -> Arc<RTCRtpReceiver>279 pub fn receiver(&self) -> Arc<RTCRtpReceiver> {
280 let receiver = self.receiver.lock();
281 receiver.clone()
282 }
283
set_receiver(&self, r: Arc<RTCRtpReceiver>)284 pub(crate) fn set_receiver(&self, r: Arc<RTCRtpReceiver>) {
285 r.set_transceiver_codecs(Some(Arc::clone(&self.codecs)));
286
287 {
288 let mut receiver = self.receiver.lock();
289 (*receiver).set_transceiver_codecs(None);
290
291 *receiver = r;
292 }
293 }
294
295 /// set_mid sets the RTPTransceiver's mid. If it was already set, will return an error.
set_mid(&self, mid: String) -> Result<()>296 pub(crate) fn set_mid(&self, mid: String) -> Result<()> {
297 self.mid
298 .set(mid)
299 .map_err(|_| Error::ErrRTPTransceiverCannotChangeMid)
300 }
301
302 /// mid gets the Transceiver's mid value. When not already set, this value will be set in CreateOffer or create_answer.
mid(&self) -> Option<String>303 pub fn mid(&self) -> Option<String> {
304 self.mid.get().map(Clone::clone)
305 }
306
307 /// kind returns RTPTransceiver's kind.
kind(&self) -> RTPCodecType308 pub fn kind(&self) -> RTPCodecType {
309 self.kind
310 }
311
312 /// direction returns the RTPTransceiver's desired direction.
direction(&self) -> RTCRtpTransceiverDirection313 pub fn direction(&self) -> RTCRtpTransceiverDirection {
314 self.direction.load(Ordering::SeqCst).into()
315 }
316
317 /// Set the direction of this transceiver. This might trigger a renegotiation.
set_direction(&self, d: RTCRtpTransceiverDirection)318 pub async fn set_direction(&self, d: RTCRtpTransceiverDirection) {
319 let changed = self.set_direction_internal(d);
320
321 if changed {
322 let lock = self.trigger_negotiation_needed.lock().await;
323 if let Some(trigger) = &*lock {
324 (trigger)().await;
325 }
326 }
327 }
328
set_direction_internal(&self, d: RTCRtpTransceiverDirection) -> bool329 pub(crate) fn set_direction_internal(&self, d: RTCRtpTransceiverDirection) -> bool {
330 let previous: RTCRtpTransceiverDirection =
331 self.direction.swap(d as u8, Ordering::SeqCst).into();
332
333 let changed = d != previous;
334
335 if changed {
336 trace!(
337 "Changing direction of transceiver from {} to {}",
338 previous,
339 d
340 );
341 }
342
343 changed
344 }
345
346 /// current_direction returns the RTPTransceiver's current direction as negotiated.
347 ///
348 /// If this transceiver has never been negotiated or if it's stopped this returns [`RTCRtpTransceiverDirection::Unspecified`].
current_direction(&self) -> RTCRtpTransceiverDirection349 pub fn current_direction(&self) -> RTCRtpTransceiverDirection {
350 if self.stopped.load(Ordering::SeqCst) {
351 return RTCRtpTransceiverDirection::Unspecified;
352 }
353
354 self.current_direction.load(Ordering::SeqCst).into()
355 }
356
set_current_direction(&self, d: RTCRtpTransceiverDirection)357 pub(crate) fn set_current_direction(&self, d: RTCRtpTransceiverDirection) {
358 let previous: RTCRtpTransceiverDirection = self
359 .current_direction
360 .swap(d as u8, Ordering::SeqCst)
361 .into();
362
363 if d != previous {
364 trace!(
365 "Changing current direction of transceiver from {} to {}",
366 previous,
367 d,
368 );
369 }
370 }
371
372 /// Perform any subsequent actions after altering the transceiver's direction.
373 ///
374 /// After changing the transceiver's direction this method should be called to perform any
375 /// side-effects that results from the new direction, such as pausing/resuming the RTP receiver.
process_new_current_direction( &self, previous_direction: RTCRtpTransceiverDirection, ) -> Result<()>376 pub(crate) async fn process_new_current_direction(
377 &self,
378 previous_direction: RTCRtpTransceiverDirection,
379 ) -> Result<()> {
380 if self.stopped.load(Ordering::SeqCst) {
381 return Ok(());
382 }
383
384 let current_direction = self.current_direction();
385 if previous_direction != current_direction {
386 let mid = self.mid();
387 trace!(
388 "Processing transceiver({:?}) direction change from {} to {}",
389 mid,
390 previous_direction,
391 current_direction
392 );
393 } else {
394 // no change.
395 return Ok(());
396 }
397
398 {
399 let receiver = self.receiver.lock().clone();
400 let pause_receiver = !current_direction.has_recv();
401
402 if pause_receiver {
403 receiver.pause().await?;
404 } else {
405 receiver.resume().await?;
406 }
407 }
408
409 let pause_sender = !current_direction.has_send();
410 {
411 let sender = &*self.sender.lock();
412 sender.set_paused(pause_sender);
413 }
414
415 Ok(())
416 }
417
418 /// stop irreversibly stops the RTPTransceiver
stop(&self) -> Result<()>419 pub async fn stop(&self) -> Result<()> {
420 if self.stopped.load(Ordering::SeqCst) {
421 return Ok(());
422 }
423
424 self.stopped.store(true, Ordering::SeqCst);
425
426 {
427 let sender = self.sender.lock();
428 sender.stop().await?;
429 }
430 {
431 let r = self.receiver.lock();
432 r.stop().await?;
433 }
434
435 self.set_direction_internal(RTCRtpTransceiverDirection::Inactive);
436
437 Ok(())
438 }
439
set_sending_track( &self, track: Option<Arc<dyn TrackLocal + Send + Sync>>, ) -> Result<()>440 pub(crate) async fn set_sending_track(
441 &self,
442 track: Option<Arc<dyn TrackLocal + Send + Sync>>,
443 ) -> Result<()> {
444 let track_is_none = track.is_none();
445 {
446 let sender = self.sender.lock().clone();
447 sender.replace_track(track).await?;
448 }
449
450 let direction = self.direction();
451 let should_send = !track_is_none;
452 let should_recv = direction.has_recv();
453 self.set_direction_internal(RTCRtpTransceiverDirection::from_send_recv(
454 should_send,
455 should_recv,
456 ));
457
458 Ok(())
459 }
460 }
461
462 impl fmt::Debug for RTCRtpTransceiver {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result463 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
464 f.debug_struct("RTCRtpTransceiver")
465 .field("mid", &self.mid)
466 .field("sender", &self.sender)
467 .field("receiver", &self.receiver)
468 .field("direction", &self.direction)
469 .field("current_direction", &self.current_direction)
470 .field("codecs", &self.codecs)
471 .field("stopped", &self.stopped)
472 .field("kind", &self.kind)
473 .finish()
474 }
475 }
476
find_by_mid( mid: &str, local_transceivers: &mut Vec<Arc<RTCRtpTransceiver>>, ) -> Option<Arc<RTCRtpTransceiver>>477 pub(crate) async fn find_by_mid(
478 mid: &str,
479 local_transceivers: &mut Vec<Arc<RTCRtpTransceiver>>,
480 ) -> Option<Arc<RTCRtpTransceiver>> {
481 for (i, t) in local_transceivers.iter().enumerate() {
482 if t.mid().as_deref() == Some(mid) {
483 return Some(local_transceivers.remove(i));
484 }
485 }
486
487 None
488 }
489
490 /// Given a direction+type pluck a transceiver from the passed list
491 /// if no entry satisfies the requested type+direction return a inactive Transceiver
satisfy_type_and_direction( remote_kind: RTPCodecType, remote_direction: RTCRtpTransceiverDirection, local_transceivers: &mut Vec<Arc<RTCRtpTransceiver>>, ) -> Option<Arc<RTCRtpTransceiver>>492 pub(crate) async fn satisfy_type_and_direction(
493 remote_kind: RTPCodecType,
494 remote_direction: RTCRtpTransceiverDirection,
495 local_transceivers: &mut Vec<Arc<RTCRtpTransceiver>>,
496 ) -> Option<Arc<RTCRtpTransceiver>> {
497 // Get direction order from most preferred to least
498 let get_preferred_directions = || -> Vec<RTCRtpTransceiverDirection> {
499 match remote_direction {
500 RTCRtpTransceiverDirection::Sendrecv => vec![
501 RTCRtpTransceiverDirection::Recvonly,
502 RTCRtpTransceiverDirection::Sendrecv,
503 ],
504 RTCRtpTransceiverDirection::Sendonly => vec![RTCRtpTransceiverDirection::Recvonly],
505 RTCRtpTransceiverDirection::Recvonly => vec![
506 RTCRtpTransceiverDirection::Sendonly,
507 RTCRtpTransceiverDirection::Sendrecv,
508 ],
509 _ => vec![],
510 }
511 };
512
513 for possible_direction in get_preferred_directions() {
514 for (i, t) in local_transceivers.iter().enumerate() {
515 if t.mid().is_none() && t.kind == remote_kind && possible_direction == t.direction() {
516 return Some(local_transceivers.remove(i));
517 }
518 }
519 }
520
521 None
522 }
523
524 /// handle_unknown_rtp_packet consumes a single RTP Packet and returns information that is helpful
525 /// for demuxing and handling an unknown SSRC (usually for Simulcast)
handle_unknown_rtp_packet( buf: &[u8], mid_extension_id: u8, sid_extension_id: u8, rsid_extension_id: u8, ) -> Result<(String, String, String, PayloadType)>526 pub(crate) fn handle_unknown_rtp_packet(
527 buf: &[u8],
528 mid_extension_id: u8,
529 sid_extension_id: u8,
530 rsid_extension_id: u8,
531 ) -> Result<(String, String, String, PayloadType)> {
532 let mut reader = buf;
533 let rp = rtp::packet::Packet::unmarshal(&mut reader)?;
534
535 if !rp.header.extension {
536 return Ok((String::new(), String::new(), String::new(), 0));
537 }
538
539 let payload_type = rp.header.payload_type;
540
541 let mid = if let Some(payload) = rp.header.get_extension(mid_extension_id) {
542 String::from_utf8(payload.to_vec())?
543 } else {
544 String::new()
545 };
546
547 let rid = if let Some(payload) = rp.header.get_extension(sid_extension_id) {
548 String::from_utf8(payload.to_vec())?
549 } else {
550 String::new()
551 };
552
553 let srid = if let Some(payload) = rp.header.get_extension(rsid_extension_id) {
554 String::from_utf8(payload.to_vec())?
555 } else {
556 String::new()
557 };
558
559 Ok((mid, rid, srid, payload_type))
560 }
561