xref: /webrtc/webrtc/src/peer_connection/mod.rs (revision 60ef728a)
1 #[cfg(test)]
2 pub(crate) mod peer_connection_test;
3 
4 pub mod certificate;
5 pub mod configuration;
6 pub mod offer_answer_options;
7 pub(crate) mod operation;
8 mod peer_connection_internal;
9 pub mod peer_connection_state;
10 pub mod policy;
11 pub mod sdp;
12 pub mod signaling_state;
13 
14 use crate::api::media_engine::MediaEngine;
15 use crate::api::setting_engine::SettingEngine;
16 use crate::api::API;
17 use crate::data_channel::data_channel_init::RTCDataChannelInit;
18 use crate::data_channel::data_channel_parameters::DataChannelParameters;
19 use crate::data_channel::data_channel_state::RTCDataChannelState;
20 use crate::data_channel::RTCDataChannel;
21 use crate::dtls_transport::dtls_fingerprint::RTCDtlsFingerprint;
22 use crate::dtls_transport::dtls_parameters::DTLSParameters;
23 use crate::dtls_transport::dtls_role::{
24     DTLSRole, DEFAULT_DTLS_ROLE_ANSWER, DEFAULT_DTLS_ROLE_OFFER,
25 };
26 use crate::dtls_transport::dtls_transport_state::RTCDtlsTransportState;
27 use crate::dtls_transport::RTCDtlsTransport;
28 use crate::error::{flatten_errs, Error, Result};
29 use crate::ice_transport::ice_candidate::{RTCIceCandidate, RTCIceCandidateInit};
30 use crate::ice_transport::ice_connection_state::RTCIceConnectionState;
31 use crate::ice_transport::ice_gatherer::RTCIceGatherOptions;
32 use crate::ice_transport::ice_gatherer::{
33     OnGatheringCompleteHdlrFn, OnICEGathererStateChangeHdlrFn, OnLocalCandidateHdlrFn,
34     RTCIceGatherer,
35 };
36 use crate::ice_transport::ice_gatherer_state::RTCIceGathererState;
37 use crate::ice_transport::ice_gathering_state::RTCIceGatheringState;
38 use crate::ice_transport::ice_parameters::RTCIceParameters;
39 use crate::ice_transport::ice_role::RTCIceRole;
40 use crate::ice_transport::ice_transport_state::RTCIceTransportState;
41 use crate::ice_transport::RTCIceTransport;
42 use crate::peer_connection::certificate::RTCCertificate;
43 use crate::peer_connection::configuration::RTCConfiguration;
44 use crate::peer_connection::offer_answer_options::{RTCAnswerOptions, RTCOfferOptions};
45 use crate::peer_connection::operation::{Operation, Operations};
46 use crate::peer_connection::peer_connection_state::{
47     NegotiationNeededState, RTCPeerConnectionState,
48 };
49 use crate::peer_connection::sdp::sdp_type::RTCSdpType;
50 use crate::peer_connection::sdp::session_description::RTCSessionDescription;
51 use crate::peer_connection::sdp::*;
52 use crate::peer_connection::signaling_state::{
53     check_next_signaling_state, RTCSignalingState, StateChangeOp,
54 };
55 use crate::rtp_transceiver::rtp_codec::{RTCRtpHeaderExtensionCapability, RTPCodecType};
56 use crate::rtp_transceiver::rtp_receiver::RTCRtpReceiver;
57 use crate::rtp_transceiver::rtp_sender::RTCRtpSender;
58 use crate::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection;
59 use crate::rtp_transceiver::{
60     find_by_mid, handle_unknown_rtp_packet, satisfy_type_and_direction, RTCRtpTransceiver,
61 };
62 use crate::rtp_transceiver::{RTCRtpTransceiverInit, SSRC};
63 use crate::sctp_transport::sctp_transport_capabilities::SCTPTransportCapabilities;
64 use crate::sctp_transport::sctp_transport_state::RTCSctpTransportState;
65 use crate::sctp_transport::RTCSctpTransport;
66 use crate::stats::StatsReport;
67 use crate::track::track_local::TrackLocal;
68 use crate::track::track_remote::TrackRemote;
69 
70 use ::ice::candidate::candidate_base::unmarshal_candidate;
71 use ::ice::candidate::Candidate;
72 use ::sdp::description::session::*;
73 use ::sdp::util::ConnectionRole;
74 use arc_swap::ArcSwapOption;
75 use async_trait::async_trait;
76 use interceptor::{stats, Attributes, Interceptor, RTCPWriter};
77 use peer_connection_internal::*;
78 use rand::{thread_rng, Rng};
79 use rcgen::KeyPair;
80 use srtp::stream::Stream;
81 use std::future::Future;
82 use std::pin::Pin;
83 use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering};
84 use std::sync::Arc;
85 use std::time::{SystemTime, UNIX_EPOCH};
86 use tokio::sync::{mpsc, Mutex};
87 
88 /// SIMULCAST_PROBE_COUNT is the amount of RTP Packets
89 /// that handleUndeclaredSSRC will read and try to dispatch from
90 /// mid and rid values
91 pub(crate) const SIMULCAST_PROBE_COUNT: usize = 10;
92 
93 /// SIMULCAST_MAX_PROBE_ROUTINES is how many active routines can be used to probe
94 /// If the total amount of incoming SSRCes exceeds this new requests will be ignored
95 pub(crate) const SIMULCAST_MAX_PROBE_ROUTINES: u64 = 25;
96 
97 pub(crate) const MEDIA_SECTION_APPLICATION: &str = "application";
98 
99 const RUNES_ALPHA: &[u8] = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
100 
101 /// math_rand_alpha generates a mathmatical random alphabet sequence of the requested length.
math_rand_alpha(n: usize) -> String102 pub fn math_rand_alpha(n: usize) -> String {
103     let mut rng = thread_rng();
104 
105     let rand_string: String = (0..n)
106         .map(|_| {
107             let idx = rng.gen_range(0..RUNES_ALPHA.len());
108             RUNES_ALPHA[idx] as char
109         })
110         .collect();
111 
112     rand_string
113 }
114 
115 pub type OnSignalingStateChangeHdlrFn = Box<
116     dyn (FnMut(RTCSignalingState) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
117         + Send
118         + Sync,
119 >;
120 
121 pub type OnICEConnectionStateChangeHdlrFn = Box<
122     dyn (FnMut(RTCIceConnectionState) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
123         + Send
124         + Sync,
125 >;
126 
127 pub type OnPeerConnectionStateChangeHdlrFn = Box<
128     dyn (FnMut(RTCPeerConnectionState) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
129         + Send
130         + Sync,
131 >;
132 
133 pub type OnDataChannelHdlrFn = Box<
134     dyn (FnMut(Arc<RTCDataChannel>) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
135         + Send
136         + Sync,
137 >;
138 
139 pub type OnTrackHdlrFn = Box<
140     dyn (FnMut(
141             Arc<TrackRemote>,
142             Arc<RTCRtpReceiver>,
143             Arc<RTCRtpTransceiver>,
144         ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
145         + Send
146         + Sync,
147 >;
148 
149 pub type OnNegotiationNeededHdlrFn =
150     Box<dyn (FnMut() -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>) + Send + Sync>;
151 
152 #[derive(Clone)]
153 struct StartTransportsParams {
154     ice_transport: Arc<RTCIceTransport>,
155     dtls_transport: Arc<RTCDtlsTransport>,
156     on_peer_connection_state_change_handler: Arc<Mutex<Option<OnPeerConnectionStateChangeHdlrFn>>>,
157     is_closed: Arc<AtomicBool>,
158     peer_connection_state: Arc<AtomicU8>,
159     ice_connection_state: Arc<AtomicU8>,
160 }
161 
162 #[derive(Clone)]
163 struct CheckNegotiationNeededParams {
164     sctp_transport: Arc<RTCSctpTransport>,
165     rtp_transceivers: Arc<Mutex<Vec<Arc<RTCRtpTransceiver>>>>,
166     current_local_description: Arc<Mutex<Option<RTCSessionDescription>>>,
167     current_remote_description: Arc<Mutex<Option<RTCSessionDescription>>>,
168 }
169 
170 #[derive(Clone)]
171 struct NegotiationNeededParams {
172     on_negotiation_needed_handler: Arc<ArcSwapOption<Mutex<OnNegotiationNeededHdlrFn>>>,
173     is_closed: Arc<AtomicBool>,
174     ops: Arc<Operations>,
175     negotiation_needed_state: Arc<AtomicU8>,
176     is_negotiation_needed: Arc<AtomicBool>,
177     signaling_state: Arc<AtomicU8>,
178     check_negotiation_needed_params: CheckNegotiationNeededParams,
179 }
180 
181 /// PeerConnection represents a WebRTC connection that establishes a
182 /// peer-to-peer communications with another PeerConnection instance in a
183 /// browser, or to another endpoint implementing the required protocols.
184 pub struct RTCPeerConnection {
185     stats_id: String,
186     idp_login_url: Option<String>,
187 
188     configuration: RTCConfiguration,
189 
190     interceptor_rtcp_writer: Arc<dyn RTCPWriter + Send + Sync>,
191 
192     interceptor: Arc<dyn Interceptor + Send + Sync>,
193 
194     pub(crate) internal: Arc<PeerConnectionInternal>,
195 }
196 
197 impl std::fmt::Debug for RTCPeerConnection {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result198     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199         f.debug_struct("RTCPeerConnection")
200             .field("stats_id", &self.stats_id)
201             .field("idp_login_url", &self.idp_login_url)
202             .field("signaling_state", &self.signaling_state())
203             .field("ice_connection_state", &self.ice_connection_state())
204             .finish()
205     }
206 }
207 
208 impl std::fmt::Display for RTCPeerConnection {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result209     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210         write!(f, "(RTCPeerConnection {})", self.stats_id)
211     }
212 }
213 
214 impl RTCPeerConnection {
215     /// creates a PeerConnection with the default codecs and
216     /// interceptors.  See register_default_codecs and register_default_interceptors.
217     ///
218     /// If you wish to customize the set of available codecs or the set of
219     /// active interceptors, create a MediaEngine and call api.new_peer_connection
220     /// instead of this function.
new(api: &API, mut configuration: RTCConfiguration) -> Result<Self>221     pub(crate) async fn new(api: &API, mut configuration: RTCConfiguration) -> Result<Self> {
222         RTCPeerConnection::init_configuration(&mut configuration)?;
223 
224         let (interceptor, stats_interceptor): (Arc<dyn Interceptor + Send + Sync>, _) = {
225             let mut chain = api.interceptor_registry.build_chain("")?;
226             let stats_interceptor = stats::make_stats_interceptor("");
227             chain.add(stats_interceptor.clone());
228 
229             (Arc::new(chain), stats_interceptor)
230         };
231 
232         let weak_interceptor = Arc::downgrade(&interceptor);
233         let (internal, configuration) =
234             PeerConnectionInternal::new(api, weak_interceptor, stats_interceptor, configuration)
235                 .await?;
236         let internal_rtcp_writer = Arc::clone(&internal) as Arc<dyn RTCPWriter + Send + Sync>;
237         let interceptor_rtcp_writer = interceptor.bind_rtcp_writer(internal_rtcp_writer).await;
238 
239         // <https://w3c.github.io/webrtc-pc/#constructor> (Step #2)
240         // Some variables defined explicitly despite their implicit zero values to
241         // allow better readability to understand what is happening.
242         Ok(RTCPeerConnection {
243             stats_id: format!(
244                 "PeerConnection-{}",
245                 SystemTime::now()
246                     .duration_since(UNIX_EPOCH)
247                     .unwrap()
248                     .as_nanos()
249             ),
250             interceptor,
251             interceptor_rtcp_writer,
252             internal,
253             configuration,
254             idp_login_url: None,
255         })
256     }
257 
258     /// init_configuration defines validation of the specified Configuration and
259     /// its assignment to the internal configuration variable. This function differs
260     /// from its set_configuration counterpart because most of the checks do not
261     /// include verification statements related to the existing state. Thus the
262     /// function describes only minor verification of some the struct variables.
init_configuration(configuration: &mut RTCConfiguration) -> Result<()>263     fn init_configuration(configuration: &mut RTCConfiguration) -> Result<()> {
264         let sanitized_ice_servers = configuration.get_ice_servers();
265         if !sanitized_ice_servers.is_empty() {
266             for server in &sanitized_ice_servers {
267                 server.validate()?;
268             }
269         }
270 
271         // <https://www.w3.org/TR/webrtc/#constructor> (step #3)
272         if !configuration.certificates.is_empty() {
273             let now = SystemTime::now();
274             for cert in &configuration.certificates {
275                 cert.expires
276                     .duration_since(now)
277                     .map_err(|_| Error::ErrCertificateExpired)?;
278             }
279         } else {
280             let kp = KeyPair::generate(&rcgen::PKCS_ECDSA_P256_SHA256)?;
281             let cert = RTCCertificate::from_key_pair(kp)?;
282             configuration.certificates = vec![cert];
283         };
284 
285         Ok(())
286     }
287 
288     /// on_signaling_state_change sets an event handler which is invoked when the
289     /// peer connection's signaling state changes
on_signaling_state_change(&self, f: OnSignalingStateChangeHdlrFn)290     pub fn on_signaling_state_change(&self, f: OnSignalingStateChangeHdlrFn) {
291         self.internal
292             .on_signaling_state_change_handler
293             .store(Some(Arc::new(Mutex::new(f))))
294     }
295 
do_signaling_state_change(&self, new_state: RTCSignalingState)296     async fn do_signaling_state_change(&self, new_state: RTCSignalingState) {
297         log::info!("signaling state changed to {}", new_state);
298         if let Some(handler) = &*self.internal.on_signaling_state_change_handler.load() {
299             let mut f = handler.lock().await;
300             f(new_state).await;
301         }
302     }
303 
304     /// on_data_channel sets an event handler which is invoked when a data
305     /// channel message arrives from a remote peer.
on_data_channel(&self, f: OnDataChannelHdlrFn)306     pub fn on_data_channel(&self, f: OnDataChannelHdlrFn) {
307         self.internal
308             .on_data_channel_handler
309             .store(Some(Arc::new(Mutex::new(f))));
310     }
311 
312     /// on_negotiation_needed sets an event handler which is invoked when
313     /// a change has occurred which requires session negotiation
on_negotiation_needed(&self, f: OnNegotiationNeededHdlrFn)314     pub fn on_negotiation_needed(&self, f: OnNegotiationNeededHdlrFn) {
315         self.internal
316             .on_negotiation_needed_handler
317             .store(Some(Arc::new(Mutex::new(f))));
318     }
319 
do_negotiation_needed_inner(params: &NegotiationNeededParams) -> bool320     fn do_negotiation_needed_inner(params: &NegotiationNeededParams) -> bool {
321         // https://w3c.github.io/webrtc-pc/#updating-the-negotiation-needed-flag
322         // non-canon step 1
323         let state: NegotiationNeededState = params
324             .negotiation_needed_state
325             .load(Ordering::SeqCst)
326             .into();
327         if state == NegotiationNeededState::Run {
328             params
329                 .negotiation_needed_state
330                 .store(NegotiationNeededState::Queue as u8, Ordering::SeqCst);
331             false
332         } else if state == NegotiationNeededState::Queue {
333             false
334         } else {
335             params
336                 .negotiation_needed_state
337                 .store(NegotiationNeededState::Run as u8, Ordering::SeqCst);
338             true
339         }
340     }
341     /// do_negotiation_needed enqueues negotiation_needed_op if necessary
342     /// caller of this method should hold `pc.mu` lock
do_negotiation_needed(params: NegotiationNeededParams)343     async fn do_negotiation_needed(params: NegotiationNeededParams) {
344         if !RTCPeerConnection::do_negotiation_needed_inner(&params) {
345             return;
346         }
347 
348         let params2 = params.clone();
349         let _ = params
350             .ops
351             .enqueue(Operation::new(
352                 move || {
353                     let params3 = params2.clone();
354                     Box::pin(async move { RTCPeerConnection::negotiation_needed_op(params3).await })
355                 },
356                 "do_negotiation_needed",
357             ))
358             .await;
359     }
360 
after_negotiation_needed_op(params: NegotiationNeededParams) -> bool361     async fn after_negotiation_needed_op(params: NegotiationNeededParams) -> bool {
362         let old_negotiation_needed_state = params.negotiation_needed_state.load(Ordering::SeqCst);
363 
364         params
365             .negotiation_needed_state
366             .store(NegotiationNeededState::Empty as u8, Ordering::SeqCst);
367 
368         if old_negotiation_needed_state == NegotiationNeededState::Queue as u8 {
369             RTCPeerConnection::do_negotiation_needed_inner(&params)
370         } else {
371             false
372         }
373     }
374 
negotiation_needed_op(params: NegotiationNeededParams) -> bool375     async fn negotiation_needed_op(params: NegotiationNeededParams) -> bool {
376         // Don't run NegotiatedNeeded checks if on_negotiation_needed is not set
377         let handler = &*params.on_negotiation_needed_handler.load();
378         if handler.is_none() {
379             return false;
380         }
381 
382         // https://www.w3.org/TR/webrtc/#updating-the-negotiation-needed-flag
383         // Step 2.1
384         if params.is_closed.load(Ordering::SeqCst) {
385             return false;
386         }
387         // non-canon step 2.2
388         if !params.ops.is_empty().await {
389             //enqueue negotiation_needed_op again by return true
390             return true;
391         }
392 
393         // non-canon, run again if there was a request
394         // starting defer(after_do_negotiation_needed(params).await);
395 
396         // Step 2.3
397         if params.signaling_state.load(Ordering::SeqCst) != RTCSignalingState::Stable as u8 {
398             return RTCPeerConnection::after_negotiation_needed_op(params).await;
399         }
400 
401         // Step 2.4
402         if !RTCPeerConnection::check_negotiation_needed(&params.check_negotiation_needed_params)
403             .await
404         {
405             params.is_negotiation_needed.store(false, Ordering::SeqCst);
406             return RTCPeerConnection::after_negotiation_needed_op(params).await;
407         }
408 
409         // Step 2.5
410         if params.is_negotiation_needed.load(Ordering::SeqCst) {
411             return RTCPeerConnection::after_negotiation_needed_op(params).await;
412         }
413 
414         // Step 2.6
415         params.is_negotiation_needed.store(true, Ordering::SeqCst);
416 
417         // Step 2.7
418         if let Some(handler) = handler {
419             let mut f = handler.lock().await;
420             f().await;
421         }
422 
423         RTCPeerConnection::after_negotiation_needed_op(params).await
424     }
425 
check_negotiation_needed(params: &CheckNegotiationNeededParams) -> bool426     async fn check_negotiation_needed(params: &CheckNegotiationNeededParams) -> bool {
427         // To check if negotiation is needed for connection, perform the following checks:
428         // Skip 1, 2 steps
429         // Step 3
430         let current_local_description = {
431             let current_local_description = params.current_local_description.lock().await;
432             current_local_description.clone()
433         };
434         let current_remote_description = {
435             let current_remote_description = params.current_remote_description.lock().await;
436             current_remote_description.clone()
437         };
438 
439         if let Some(local_desc) = &current_local_description {
440             let len_data_channel = {
441                 let data_channels = params.sctp_transport.data_channels.lock().await;
442                 data_channels.len()
443             };
444 
445             if len_data_channel != 0 && have_data_channel(local_desc).is_none() {
446                 return true;
447             }
448 
449             let transceivers = params.rtp_transceivers.lock().await;
450             for t in &*transceivers {
451                 // https://www.w3.org/TR/webrtc/#dfn-update-the-negotiation-needed-flag
452                 // Step 5.1
453                 // if t.stopping && !t.stopped {
454                 // 	return true
455                 // }
456                 let mid = t.mid();
457                 let m = mid.as_ref().and_then(|mid| get_by_mid(mid, local_desc));
458                 // Step 5.2
459                 if !t.stopped.load(Ordering::SeqCst) {
460                     if m.is_none() {
461                         return true;
462                     }
463 
464                     if let Some(m) = m {
465                         // Step 5.3.1
466                         if t.direction().has_send() {
467                             let dmsid = match m.attribute(ATTR_KEY_MSID).and_then(|o| o) {
468                                 Some(m) => m,
469                                 None => return true, // doesn't contain a single a=msid line
470                             };
471 
472                             let sender = t.sender();
473                             // (...)or the number of MSIDs from the a=msid lines in this m= section,
474                             // or the MSID values themselves, differ from what is in
475                             // transceiver.sender.[[AssociatedMediaStreamIds]], return true.
476 
477                             // TODO: This check should be robuster by storing all streams in the
478                             // local description so we can compare all of them. For no we only
479                             // consider the first one.
480 
481                             let stream_ids = sender.associated_media_stream_ids();
482                             // Different number of lines, 1 vs 0
483                             if stream_ids.is_empty() {
484                                 return true;
485                             }
486 
487                             // different stream id
488                             if dmsid.split_whitespace().next() != Some(&stream_ids[0]) {
489                                 return true;
490                             }
491                         }
492                         match local_desc.sdp_type {
493                             RTCSdpType::Offer => {
494                                 // Step 5.3.2
495                                 if let Some(remote_desc) = &current_remote_description {
496                                     if let Some(rm) =
497                                         t.mid().and_then(|mid| get_by_mid(&mid, remote_desc))
498                                     {
499                                         if get_peer_direction(m) != t.direction()
500                                             && get_peer_direction(rm) != t.direction().reverse()
501                                         {
502                                             return true;
503                                         }
504                                     } else {
505                                         return true;
506                                     }
507                                 }
508                             }
509                             RTCSdpType::Answer => {
510                                 let remote_desc = match &current_remote_description {
511                                     Some(d) => d,
512                                     None => return true,
513                                 };
514                                 let offered_direction =
515                                     match t.mid().and_then(|mid| get_by_mid(&mid, remote_desc)) {
516                                         Some(d) => {
517                                             let dir = get_peer_direction(d);
518                                             if dir == RTCRtpTransceiverDirection::Unspecified {
519                                                 RTCRtpTransceiverDirection::Inactive
520                                             } else {
521                                                 dir
522                                             }
523                                         }
524                                         None => RTCRtpTransceiverDirection::Inactive,
525                                     };
526 
527                                 let current_direction = get_peer_direction(m);
528                                 // Step 5.3.3
529                                 if current_direction
530                                     != t.direction().intersect(offered_direction.reverse())
531                                 {
532                                     return true;
533                                 }
534                             }
535                             _ => {}
536                         };
537                     }
538                 }
539                 // Step 5.4
540                 if t.stopped.load(Ordering::SeqCst) {
541                     let search_mid = match t.mid() {
542                         Some(mid) => mid,
543                         None => return false,
544                     };
545 
546                     if let Some(remote_desc) = &*params.current_remote_description.lock().await {
547                         return get_by_mid(&search_mid, local_desc).is_some()
548                             || get_by_mid(&search_mid, remote_desc).is_some();
549                     }
550                 }
551             }
552             // Step 6
553             false
554         } else {
555             true
556         }
557     }
558 
559     /// on_ice_candidate sets an event handler which is invoked when a new ICE
560     /// candidate is found.
561     /// Take note that the handler is gonna be called with a nil pointer when
562     /// gathering is finished.
on_ice_candidate(&self, f: OnLocalCandidateHdlrFn)563     pub fn on_ice_candidate(&self, f: OnLocalCandidateHdlrFn) {
564         self.internal.ice_gatherer.on_local_candidate(f)
565     }
566 
567     /// on_ice_gathering_state_change sets an event handler which is invoked when the
568     /// ICE candidate gathering state has changed.
on_ice_gathering_state_change(&self, f: OnICEGathererStateChangeHdlrFn)569     pub fn on_ice_gathering_state_change(&self, f: OnICEGathererStateChangeHdlrFn) {
570         self.internal.ice_gatherer.on_state_change(f)
571     }
572 
573     /// on_track sets an event handler which is called when remote track
574     /// arrives from a remote peer.
on_track(&self, f: OnTrackHdlrFn)575     pub fn on_track(&self, f: OnTrackHdlrFn) {
576         self.internal
577             .on_track_handler
578             .store(Some(Arc::new(Mutex::new(f))));
579     }
580 
do_track( on_track_handler: Arc<ArcSwapOption<Mutex<OnTrackHdlrFn>>>, track: Arc<TrackRemote>, receiver: Arc<RTCRtpReceiver>, transceiver: Arc<RTCRtpTransceiver>, )581     fn do_track(
582         on_track_handler: Arc<ArcSwapOption<Mutex<OnTrackHdlrFn>>>,
583         track: Arc<TrackRemote>,
584         receiver: Arc<RTCRtpReceiver>,
585         transceiver: Arc<RTCRtpTransceiver>,
586     ) {
587         log::debug!("got new track: {:?}", track);
588 
589         tokio::spawn(async move {
590             if let Some(handler) = &*on_track_handler.load() {
591                 let mut f = handler.lock().await;
592                 f(track, receiver, transceiver).await;
593             } else {
594                 log::warn!("on_track unset, unable to handle incoming media streams");
595             }
596         });
597     }
598 
599     /// on_ice_connection_state_change sets an event handler which is called
600     /// when an ICE connection state is changed.
on_ice_connection_state_change(&self, f: OnICEConnectionStateChangeHdlrFn)601     pub fn on_ice_connection_state_change(&self, f: OnICEConnectionStateChangeHdlrFn) {
602         self.internal
603             .on_ice_connection_state_change_handler
604             .store(Some(Arc::new(Mutex::new(f))));
605     }
606 
do_ice_connection_state_change( handler: &Arc<ArcSwapOption<Mutex<OnICEConnectionStateChangeHdlrFn>>>, ice_connection_state: &Arc<AtomicU8>, cs: RTCIceConnectionState, )607     async fn do_ice_connection_state_change(
608         handler: &Arc<ArcSwapOption<Mutex<OnICEConnectionStateChangeHdlrFn>>>,
609         ice_connection_state: &Arc<AtomicU8>,
610         cs: RTCIceConnectionState,
611     ) {
612         ice_connection_state.store(cs as u8, Ordering::SeqCst);
613 
614         log::info!("ICE connection state changed: {}", cs);
615         if let Some(handler) = &*handler.load() {
616             let mut f = handler.lock().await;
617             f(cs).await;
618         }
619     }
620 
621     /// on_peer_connection_state_change sets an event handler which is called
622     /// when the PeerConnectionState has changed
on_peer_connection_state_change(&self, f: OnPeerConnectionStateChangeHdlrFn)623     pub fn on_peer_connection_state_change(&self, f: OnPeerConnectionStateChangeHdlrFn) {
624         self.internal
625             .on_peer_connection_state_change_handler
626             .store(Some(Arc::new(Mutex::new(f))));
627     }
628 
do_peer_connection_state_change( handler: &Arc<ArcSwapOption<Mutex<OnPeerConnectionStateChangeHdlrFn>>>, cs: RTCPeerConnectionState, )629     async fn do_peer_connection_state_change(
630         handler: &Arc<ArcSwapOption<Mutex<OnPeerConnectionStateChangeHdlrFn>>>,
631         cs: RTCPeerConnectionState,
632     ) {
633         if let Some(handler) = &*handler.load() {
634             let mut f = handler.lock().await;
635             f(cs).await;
636         }
637     }
638 
639     /*TODO: // set_configuration updates the configuration of this PeerConnection object.
640     pub async fn set_configuration(&mut self, configuration: Configuration) -> Result<()> {
641         //nolint:gocognit
642         // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-setconfiguration (step #2)
643         if self.internal.is_closed.load(Ordering::SeqCst) {
644             return Err(Error::ErrConnectionClosed.into());
645         }
646 
647         // https://www.w3.org/TR/webrtc/#set-the-configuration (step #3)
648         if !configuration.peer_identity.is_empty() {
649             if configuration.peer_identity != self.configuration.peer_identity {
650                 return Err(Error::ErrModifyingPeerIdentity.into());
651             }
652             self.configuration.peer_identity = configuration.peer_identity;
653         }
654 
655         // https://www.w3.org/TR/webrtc/#set-the-configuration (step #4)
656         if !configuration.certificates.is_empty() {
657             if configuration.certificates.len() != self.configuration.certificates.len() {
658                 return Err(Error::ErrModifyingCertificates.into());
659             }
660 
661             self.configuration.certificates = configuration.certificates;
662         }
663 
664         // https://www.w3.org/TR/webrtc/#set-the-configuration (step #5)
665         if configuration.bundle_policy != BundlePolicy::Unspecified {
666             if configuration.bundle_policy != self.configuration.bundle_policy {
667                 return Err(Error::ErrModifyingBundlePolicy.into());
668             }
669             self.configuration.bundle_policy = configuration.bundle_policy;
670         }
671 
672         // https://www.w3.org/TR/webrtc/#set-the-configuration (step #6)
673         if configuration.rtcp_mux_policy != RTCPMuxPolicy::Unspecified {
674             if configuration.rtcp_mux_policy != self.configuration.rtcp_mux_policy {
675                 return Err(Error::ErrModifyingRTCPMuxPolicy.into());
676             }
677             self.configuration.rtcp_mux_policy = configuration.rtcp_mux_policy;
678         }
679 
680         // https://www.w3.org/TR/webrtc/#set-the-configuration (step #7)
681         if configuration.ice_candidate_pool_size != 0 {
682             if self.configuration.ice_candidate_pool_size != configuration.ice_candidate_pool_size
683                 && self.local_description().await.is_some()
684             {
685                 return Err(Error::ErrModifyingICECandidatePoolSize.into());
686             }
687             self.configuration.ice_candidate_pool_size = configuration.ice_candidate_pool_size;
688         }
689 
690         // https://www.w3.org/TR/webrtc/#set-the-configuration (step #8)
691         if configuration.ice_transport_policy != ICETransportPolicy::Unspecified {
692             self.configuration.ice_transport_policy = configuration.ice_transport_policy
693         }
694 
695         // https://www.w3.org/TR/webrtc/#set-the-configuration (step #11)
696         if !configuration.ice_servers.is_empty() {
697             // https://www.w3.org/TR/webrtc/#set-the-configuration (step #11.3)
698             for server in &configuration.ice_servers {
699                 server.validate()?;
700             }
701             self.configuration.ice_servers = configuration.ice_servers
702         }
703         Ok(())
704     }*/
705 
706     /// get_configuration returns a Configuration object representing the current
707     /// configuration of this PeerConnection object. The returned object is a
708     /// copy and direct mutation on it will not take affect until set_configuration
709     /// has been called with Configuration passed as its only argument.
710     /// <https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-getconfiguration>
get_configuration(&self) -> &RTCConfiguration711     pub fn get_configuration(&self) -> &RTCConfiguration {
712         &self.configuration
713     }
714 
get_stats_id(&self) -> &str715     pub fn get_stats_id(&self) -> &str {
716         self.stats_id.as_str()
717     }
718 
719     /// create_offer starts the PeerConnection and generates the localDescription
720     /// <https://w3c.github.io/webrtc-pc/#dom-rtcpeerconnection-createoffer>
create_offer( &self, options: Option<RTCOfferOptions>, ) -> Result<RTCSessionDescription>721     pub async fn create_offer(
722         &self,
723         options: Option<RTCOfferOptions>,
724     ) -> Result<RTCSessionDescription> {
725         let use_identity = self.idp_login_url.is_some();
726         if use_identity {
727             return Err(Error::ErrIdentityProviderNotImplemented);
728         } else if self.internal.is_closed.load(Ordering::SeqCst) {
729             return Err(Error::ErrConnectionClosed);
730         }
731 
732         if let Some(options) = options {
733             if options.ice_restart {
734                 self.internal.ice_transport.restart().await?;
735             }
736         }
737 
738         // This may be necessary to recompute if, for example, createOffer was called when only an
739         // audio RTCRtpTransceiver was added to connection, but while performing the in-parallel
740         // steps to create an offer, a video RTCRtpTransceiver was added, requiring additional
741         // inspection of video system resources.
742         let mut count = 0;
743         let mut offer;
744 
745         loop {
746             // We cache current transceivers to ensure they aren't
747             // mutated during offer generation. We later check if they have
748             // been mutated and recompute the offer if necessary.
749             let current_transceivers = {
750                 let rtp_transceivers = self.internal.rtp_transceivers.lock().await;
751                 rtp_transceivers.clone()
752             };
753 
754             // include unmatched local transceivers
755             // update the greater mid if the remote description provides a greater one
756             {
757                 let current_remote_description =
758                     self.internal.current_remote_description.lock().await;
759                 if let Some(d) = &*current_remote_description {
760                     if let Some(parsed) = &d.parsed {
761                         for media in &parsed.media_descriptions {
762                             if let Some(mid) = get_mid_value(media) {
763                                 if mid.is_empty() {
764                                     continue;
765                                 }
766                                 let numeric_mid = match mid.parse::<isize>() {
767                                     Ok(n) => n,
768                                     Err(_) => continue,
769                                 };
770                                 if numeric_mid > self.internal.greater_mid.load(Ordering::SeqCst) {
771                                     self.internal
772                                         .greater_mid
773                                         .store(numeric_mid, Ordering::SeqCst);
774                                 }
775                             }
776                         }
777                     }
778                 }
779             }
780             for t in &current_transceivers {
781                 if t.mid().is_some() {
782                     continue;
783                 }
784 
785                 if let Some(gen) = &self.internal.setting_engine.mid_generator {
786                     let current_greatest = self.internal.greater_mid.load(Ordering::SeqCst);
787                     let mid = (gen)(current_greatest);
788 
789                     // If it's possible to parse the returned mid as numeric, we will update the greater_mid field.
790                     if let Ok(numeric_mid) = mid.parse::<isize>() {
791                         if numeric_mid > self.internal.greater_mid.load(Ordering::SeqCst) {
792                             self.internal
793                                 .greater_mid
794                                 .store(numeric_mid, Ordering::SeqCst);
795                         }
796                     }
797 
798                     t.set_mid(mid)?;
799                 } else {
800                     let greater_mid = self.internal.greater_mid.fetch_add(1, Ordering::SeqCst);
801                     t.set_mid(format!("{}", greater_mid + 1))?;
802                 }
803             }
804 
805             let current_remote_description_is_none = {
806                 let current_remote_description =
807                     self.internal.current_remote_description.lock().await;
808                 current_remote_description.is_none()
809             };
810 
811             let mut d = if current_remote_description_is_none {
812                 self.internal
813                     .generate_unmatched_sdp(current_transceivers, use_identity)
814                     .await?
815             } else {
816                 self.internal
817                     .generate_matched_sdp(
818                         current_transceivers,
819                         use_identity,
820                         true, /*includeUnmatched */
821                         DEFAULT_DTLS_ROLE_OFFER.to_connection_role(),
822                     )
823                     .await?
824             };
825 
826             {
827                 let mut sdp_origin = self.internal.sdp_origin.lock().await;
828                 update_sdp_origin(&mut sdp_origin, &mut d);
829             }
830             let sdp = d.marshal();
831 
832             offer = RTCSessionDescription {
833                 sdp_type: RTCSdpType::Offer,
834                 sdp,
835                 parsed: Some(d),
836             };
837 
838             // Verify local media hasn't changed during offer
839             // generation. Recompute if necessary
840             if !self.internal.has_local_description_changed(&offer).await {
841                 break;
842             }
843             count += 1;
844             if count >= 128 {
845                 return Err(Error::ErrExcessiveRetries);
846             }
847         }
848 
849         {
850             let mut last_offer = self.internal.last_offer.lock().await;
851             *last_offer = offer.sdp.clone();
852         }
853         Ok(offer)
854     }
855 
856     /// Update the PeerConnectionState given the state of relevant transports
857     /// <https://www.w3.org/TR/webrtc/#rtcpeerconnectionstate-enum>
update_connection_state( on_peer_connection_state_change_handler: &Arc< ArcSwapOption<Mutex<OnPeerConnectionStateChangeHdlrFn>>, >, is_closed: &Arc<AtomicBool>, peer_connection_state: &Arc<AtomicU8>, ice_connection_state: RTCIceConnectionState, dtls_transport_state: RTCDtlsTransportState, )858     async fn update_connection_state(
859         on_peer_connection_state_change_handler: &Arc<
860             ArcSwapOption<Mutex<OnPeerConnectionStateChangeHdlrFn>>,
861         >,
862         is_closed: &Arc<AtomicBool>,
863         peer_connection_state: &Arc<AtomicU8>,
864         ice_connection_state: RTCIceConnectionState,
865         dtls_transport_state: RTCDtlsTransportState,
866     ) {
867         let connection_state =
868             // The RTCPeerConnection object's [[IsClosed]] slot is true.
869             if is_closed.load(Ordering::SeqCst) {
870                 RTCPeerConnectionState::Closed
871             } else if ice_connection_state == RTCIceConnectionState::Failed || dtls_transport_state == RTCDtlsTransportState::Failed {
872                 // Any of the RTCIceTransports or RTCDtlsTransports are in a "failed" state.
873                 RTCPeerConnectionState::Failed
874             } else if ice_connection_state == RTCIceConnectionState::Disconnected {
875                 // Any of the RTCIceTransports or RTCDtlsTransports are in the "disconnected"
876                 // state and none of them are in the "failed" or "connecting" or "checking" state.
877                 RTCPeerConnectionState::Disconnected
878             } else if ice_connection_state == RTCIceConnectionState::Connected && dtls_transport_state == RTCDtlsTransportState::Connected {
879                 // All RTCIceTransports and RTCDtlsTransports are in the "connected", "completed" or "closed"
880                 // state and at least one of them is in the "connected" or "completed" state.
881                 RTCPeerConnectionState::Connected
882             } else if ice_connection_state == RTCIceConnectionState::Checking && dtls_transport_state == RTCDtlsTransportState::Connecting {
883                 //  Any of the RTCIceTransports or RTCDtlsTransports are in the "connecting" or
884                 // "checking" state and none of them is in the "failed" state.
885                 RTCPeerConnectionState::Connecting
886             } else {
887                 RTCPeerConnectionState::New
888             };
889 
890         if peer_connection_state.load(Ordering::SeqCst) == connection_state as u8 {
891             return;
892         }
893 
894         log::info!("peer connection state changed: {}", connection_state);
895         peer_connection_state.store(connection_state as u8, Ordering::SeqCst);
896 
897         RTCPeerConnection::do_peer_connection_state_change(
898             on_peer_connection_state_change_handler,
899             connection_state,
900         )
901         .await;
902     }
903 
904     /// create_answer starts the PeerConnection and generates the localDescription
create_answer( &self, _options: Option<RTCAnswerOptions>, ) -> Result<RTCSessionDescription>905     pub async fn create_answer(
906         &self,
907         _options: Option<RTCAnswerOptions>,
908     ) -> Result<RTCSessionDescription> {
909         let use_identity = self.idp_login_url.is_some();
910         if self.remote_description().await.is_none() {
911             return Err(Error::ErrNoRemoteDescription);
912         } else if use_identity {
913             return Err(Error::ErrIdentityProviderNotImplemented);
914         } else if self.internal.is_closed.load(Ordering::SeqCst) {
915             return Err(Error::ErrConnectionClosed);
916         } else if self.signaling_state() != RTCSignalingState::HaveRemoteOffer
917             && self.signaling_state() != RTCSignalingState::HaveLocalPranswer
918         {
919             return Err(Error::ErrIncorrectSignalingState);
920         }
921 
922         let mut connection_role = self
923             .internal
924             .setting_engine
925             .answering_dtls_role
926             .to_connection_role();
927         if connection_role == ConnectionRole::Unspecified {
928             connection_role = DEFAULT_DTLS_ROLE_ANSWER.to_connection_role();
929         }
930 
931         let local_transceivers = self.get_transceivers().await;
932         let mut d = self
933             .internal
934             .generate_matched_sdp(
935                 local_transceivers,
936                 use_identity,
937                 false, /*includeUnmatched */
938                 connection_role,
939             )
940             .await?;
941 
942         {
943             let mut sdp_origin = self.internal.sdp_origin.lock().await;
944             update_sdp_origin(&mut sdp_origin, &mut d);
945         }
946         let sdp = d.marshal();
947 
948         let answer = RTCSessionDescription {
949             sdp_type: RTCSdpType::Answer,
950             sdp,
951             parsed: Some(d),
952         };
953 
954         {
955             let mut last_answer = self.internal.last_answer.lock().await;
956             *last_answer = answer.sdp.clone();
957         }
958         Ok(answer)
959     }
960 
961     // 4.4.1.6 Set the SessionDescription
set_description( &self, sd: &RTCSessionDescription, op: StateChangeOp, ) -> Result<()>962     pub(crate) async fn set_description(
963         &self,
964         sd: &RTCSessionDescription,
965         op: StateChangeOp,
966     ) -> Result<()> {
967         if self.internal.is_closed.load(Ordering::SeqCst) {
968             return Err(Error::ErrConnectionClosed);
969         } else if sd.sdp_type == RTCSdpType::Unspecified {
970             return Err(Error::ErrPeerConnSDPTypeInvalidValue);
971         }
972 
973         let next_state = {
974             let cur = self.signaling_state();
975             let new_sdpdoes_not_match_offer = Error::ErrSDPDoesNotMatchOffer;
976             let new_sdpdoes_not_match_answer = Error::ErrSDPDoesNotMatchAnswer;
977 
978             match op {
979                 StateChangeOp::SetLocal => {
980                     match sd.sdp_type {
981                         // stable->SetLocal(offer)->have-local-offer
982                         RTCSdpType::Offer => {
983                             let check = {
984                                 let last_offer = self.internal.last_offer.lock().await;
985                                 sd.sdp != *last_offer
986                             };
987                             if check {
988                                 Err(new_sdpdoes_not_match_offer)
989                             } else {
990                                 let next_state = check_next_signaling_state(
991                                     cur,
992                                     RTCSignalingState::HaveLocalOffer,
993                                     StateChangeOp::SetLocal,
994                                     sd.sdp_type,
995                                 );
996                                 if next_state.is_ok() {
997                                     let mut pending_local_description =
998                                         self.internal.pending_local_description.lock().await;
999                                     *pending_local_description = Some(sd.clone());
1000                                 }
1001                                 next_state
1002                             }
1003                         }
1004                         // have-remote-offer->SetLocal(answer)->stable
1005                         // have-local-pranswer->SetLocal(answer)->stable
1006                         RTCSdpType::Answer => {
1007                             let check = {
1008                                 let last_answer = self.internal.last_answer.lock().await;
1009                                 sd.sdp != *last_answer
1010                             };
1011                             if check {
1012                                 Err(new_sdpdoes_not_match_answer)
1013                             } else {
1014                                 let next_state = check_next_signaling_state(
1015                                     cur,
1016                                     RTCSignalingState::Stable,
1017                                     StateChangeOp::SetLocal,
1018                                     sd.sdp_type,
1019                                 );
1020                                 if next_state.is_ok() {
1021                                     let pending_remote_description = {
1022                                         let mut pending_remote_description =
1023                                             self.internal.pending_remote_description.lock().await;
1024                                         pending_remote_description.take()
1025                                     };
1026                                     let _pending_local_description = {
1027                                         let mut pending_local_description =
1028                                             self.internal.pending_local_description.lock().await;
1029                                         pending_local_description.take()
1030                                     };
1031 
1032                                     {
1033                                         let mut current_local_description =
1034                                             self.internal.current_local_description.lock().await;
1035                                         *current_local_description = Some(sd.clone());
1036                                     }
1037                                     {
1038                                         let mut current_remote_description =
1039                                             self.internal.current_remote_description.lock().await;
1040                                         *current_remote_description = pending_remote_description;
1041                                     }
1042                                 }
1043                                 next_state
1044                             }
1045                         }
1046                         RTCSdpType::Rollback => {
1047                             let next_state = check_next_signaling_state(
1048                                 cur,
1049                                 RTCSignalingState::Stable,
1050                                 StateChangeOp::SetLocal,
1051                                 sd.sdp_type,
1052                             );
1053                             if next_state.is_ok() {
1054                                 let mut pending_local_description =
1055                                     self.internal.pending_local_description.lock().await;
1056                                 *pending_local_description = None;
1057                             }
1058                             next_state
1059                         }
1060                         // have-remote-offer->SetLocal(pranswer)->have-local-pranswer
1061                         RTCSdpType::Pranswer => {
1062                             let check = {
1063                                 let last_answer = self.internal.last_answer.lock().await;
1064                                 sd.sdp != *last_answer
1065                             };
1066                             if check {
1067                                 Err(new_sdpdoes_not_match_answer)
1068                             } else {
1069                                 let next_state = check_next_signaling_state(
1070                                     cur,
1071                                     RTCSignalingState::HaveLocalPranswer,
1072                                     StateChangeOp::SetLocal,
1073                                     sd.sdp_type,
1074                                 );
1075                                 if next_state.is_ok() {
1076                                     let mut pending_local_description =
1077                                         self.internal.pending_local_description.lock().await;
1078                                     *pending_local_description = Some(sd.clone());
1079                                 }
1080                                 next_state
1081                             }
1082                         }
1083                         _ => Err(Error::ErrPeerConnStateChangeInvalid),
1084                     }
1085                 }
1086                 StateChangeOp::SetRemote => {
1087                     match sd.sdp_type {
1088                         // stable->SetRemote(offer)->have-remote-offer
1089                         RTCSdpType::Offer => {
1090                             let next_state = check_next_signaling_state(
1091                                 cur,
1092                                 RTCSignalingState::HaveRemoteOffer,
1093                                 StateChangeOp::SetRemote,
1094                                 sd.sdp_type,
1095                             );
1096                             if next_state.is_ok() {
1097                                 let mut pending_remote_description =
1098                                     self.internal.pending_remote_description.lock().await;
1099                                 *pending_remote_description = Some(sd.clone());
1100                             }
1101                             next_state
1102                         }
1103                         // have-local-offer->SetRemote(answer)->stable
1104                         // have-remote-pranswer->SetRemote(answer)->stable
1105                         RTCSdpType::Answer => {
1106                             let next_state = check_next_signaling_state(
1107                                 cur,
1108                                 RTCSignalingState::Stable,
1109                                 StateChangeOp::SetRemote,
1110                                 sd.sdp_type,
1111                             );
1112                             if next_state.is_ok() {
1113                                 let pending_local_description = {
1114                                     let mut pending_local_description =
1115                                         self.internal.pending_local_description.lock().await;
1116                                     pending_local_description.take()
1117                                 };
1118 
1119                                 let _pending_remote_description = {
1120                                     let mut pending_remote_description =
1121                                         self.internal.pending_remote_description.lock().await;
1122                                     pending_remote_description.take()
1123                                 };
1124 
1125                                 {
1126                                     let mut current_remote_description =
1127                                         self.internal.current_remote_description.lock().await;
1128                                     *current_remote_description = Some(sd.clone());
1129                                 }
1130                                 {
1131                                     let mut current_local_description =
1132                                         self.internal.current_local_description.lock().await;
1133                                     *current_local_description = pending_local_description;
1134                                 }
1135                             }
1136                             next_state
1137                         }
1138                         RTCSdpType::Rollback => {
1139                             let next_state = check_next_signaling_state(
1140                                 cur,
1141                                 RTCSignalingState::Stable,
1142                                 StateChangeOp::SetRemote,
1143                                 sd.sdp_type,
1144                             );
1145                             if next_state.is_ok() {
1146                                 let mut pending_remote_description =
1147                                     self.internal.pending_remote_description.lock().await;
1148                                 *pending_remote_description = None;
1149                             }
1150                             next_state
1151                         }
1152                         // have-local-offer->SetRemote(pranswer)->have-remote-pranswer
1153                         RTCSdpType::Pranswer => {
1154                             let next_state = check_next_signaling_state(
1155                                 cur,
1156                                 RTCSignalingState::HaveRemotePranswer,
1157                                 StateChangeOp::SetRemote,
1158                                 sd.sdp_type,
1159                             );
1160                             if next_state.is_ok() {
1161                                 let mut pending_remote_description =
1162                                     self.internal.pending_remote_description.lock().await;
1163                                 *pending_remote_description = Some(sd.clone());
1164                             }
1165                             next_state
1166                         }
1167                         _ => Err(Error::ErrPeerConnStateChangeInvalid),
1168                     }
1169                 } //_ => Err(Error::ErrPeerConnStateChangeUnhandled.into()),
1170             }
1171         };
1172 
1173         match next_state {
1174             Ok(next_state) => {
1175                 self.internal
1176                     .signaling_state
1177                     .store(next_state as u8, Ordering::SeqCst);
1178                 if self.signaling_state() == RTCSignalingState::Stable {
1179                     self.internal
1180                         .is_negotiation_needed
1181                         .store(false, Ordering::SeqCst);
1182                     self.internal.trigger_negotiation_needed().await;
1183                 }
1184                 self.do_signaling_state_change(next_state).await;
1185                 Ok(())
1186             }
1187             Err(err) => Err(err),
1188         }
1189     }
1190 
1191     /// set_local_description sets the SessionDescription of the local peer
set_local_description(&self, mut desc: RTCSessionDescription) -> Result<()>1192     pub async fn set_local_description(&self, mut desc: RTCSessionDescription) -> Result<()> {
1193         if self.internal.is_closed.load(Ordering::SeqCst) {
1194             return Err(Error::ErrConnectionClosed);
1195         }
1196 
1197         let have_local_description = {
1198             let current_local_description = self.internal.current_local_description.lock().await;
1199             current_local_description.is_some()
1200         };
1201 
1202         // JSEP 5.4
1203         if desc.sdp.is_empty() {
1204             match desc.sdp_type {
1205                 RTCSdpType::Answer | RTCSdpType::Pranswer => {
1206                     let last_answer = self.internal.last_answer.lock().await;
1207                     desc.sdp = last_answer.clone();
1208                 }
1209                 RTCSdpType::Offer => {
1210                     let last_offer = self.internal.last_offer.lock().await;
1211                     desc.sdp = last_offer.clone();
1212                 }
1213                 _ => return Err(Error::ErrPeerConnSDPTypeInvalidValueSetLocalDescription),
1214             }
1215         }
1216 
1217         desc.parsed = Some(desc.unmarshal()?);
1218         self.set_description(&desc, StateChangeOp::SetLocal).await?;
1219 
1220         let we_answer = desc.sdp_type == RTCSdpType::Answer;
1221         let remote_description = self.remote_description().await;
1222         let mut local_transceivers = self.get_transceivers().await;
1223         if we_answer {
1224             if let Some(parsed) = desc.parsed {
1225                 // WebRTC Spec 1.0 https://www.w3.org/TR/webrtc/
1226                 // Section 4.4.1.5
1227                 for media in &parsed.media_descriptions {
1228                     if media.media_name.media == MEDIA_SECTION_APPLICATION {
1229                         continue;
1230                     }
1231 
1232                     let kind = RTPCodecType::from(media.media_name.media.as_str());
1233                     let direction = get_peer_direction(media);
1234                     if kind == RTPCodecType::Unspecified
1235                         || direction == RTCRtpTransceiverDirection::Unspecified
1236                     {
1237                         continue;
1238                     }
1239 
1240                     let mid_value = match get_mid_value(media) {
1241                         Some(mid) if !mid.is_empty() => mid,
1242                         _ => continue,
1243                     };
1244 
1245                     let t = match find_by_mid(mid_value, &mut local_transceivers).await {
1246                         Some(t) => t,
1247                         None => continue,
1248                     };
1249                     let previous_direction = t.current_direction();
1250                     // 4.9.1.7.3 applying a local answer or pranswer
1251                     // Set transceiver.[[CurrentDirection]] and transceiver.[[FiredDirection]] to direction.
1252 
1253                     // TODO: Also set FiredDirection here.
1254                     t.set_current_direction(direction);
1255                     t.process_new_current_direction(previous_direction).await?;
1256                 }
1257             }
1258 
1259             if let Some(remote_desc) = remote_description {
1260                 self.start_rtp_senders().await?;
1261 
1262                 let pci = Arc::clone(&self.internal);
1263                 let remote_desc = Arc::new(remote_desc);
1264                 self.internal
1265                     .ops
1266                     .enqueue(Operation::new(
1267                         move || {
1268                             let pc = Arc::clone(&pci);
1269                             let rd = Arc::clone(&remote_desc);
1270                             Box::pin(async move {
1271                                 let _ = pc.start_rtp(have_local_description, rd).await;
1272                                 false
1273                             })
1274                         },
1275                         "set_local_description",
1276                     ))
1277                     .await?;
1278             }
1279         }
1280 
1281         if self.internal.ice_gatherer.state() == RTCIceGathererState::New {
1282             self.internal.ice_gatherer.gather().await
1283         } else {
1284             Ok(())
1285         }
1286     }
1287 
1288     /// local_description returns PendingLocalDescription if it is not null and
1289     /// otherwise it returns CurrentLocalDescription. This property is used to
1290     /// determine if set_local_description has already been called.
1291     /// <https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-localdescription>
local_description(&self) -> Option<RTCSessionDescription>1292     pub async fn local_description(&self) -> Option<RTCSessionDescription> {
1293         if let Some(pending_local_description) = self.pending_local_description().await {
1294             return Some(pending_local_description);
1295         }
1296         self.current_local_description().await
1297     }
1298 
1299     /// set_remote_description sets the SessionDescription of the remote peer
set_remote_description(&self, mut desc: RTCSessionDescription) -> Result<()>1300     pub async fn set_remote_description(&self, mut desc: RTCSessionDescription) -> Result<()> {
1301         if self.internal.is_closed.load(Ordering::SeqCst) {
1302             return Err(Error::ErrConnectionClosed);
1303         }
1304 
1305         let is_renegotation = {
1306             let current_remote_description = self.internal.current_remote_description.lock().await;
1307             current_remote_description.is_some()
1308         };
1309 
1310         desc.parsed = Some(desc.unmarshal()?);
1311         self.set_description(&desc, StateChangeOp::SetRemote)
1312             .await?;
1313 
1314         if let Some(parsed) = &desc.parsed {
1315             self.internal
1316                 .media_engine
1317                 .update_from_remote_description(parsed)
1318                 .await?;
1319 
1320             let mut local_transceivers = self.get_transceivers().await;
1321             let remote_description = self.remote_description().await;
1322             let we_offer = desc.sdp_type == RTCSdpType::Answer;
1323 
1324             if !we_offer {
1325                 if let Some(parsed) = remote_description.as_ref().and_then(|r| r.parsed.as_ref()) {
1326                     for media in &parsed.media_descriptions {
1327                         let mid_value = match get_mid_value(media) {
1328                             Some(m) => {
1329                                 if m.is_empty() {
1330                                     return Err(Error::ErrPeerConnRemoteDescriptionWithoutMidValue);
1331                                 } else {
1332                                     m
1333                                 }
1334                             }
1335                             None => continue,
1336                         };
1337 
1338                         if media.media_name.media == MEDIA_SECTION_APPLICATION {
1339                             continue;
1340                         }
1341 
1342                         let kind = RTPCodecType::from(media.media_name.media.as_str());
1343                         let direction = get_peer_direction(media);
1344                         if kind == RTPCodecType::Unspecified
1345                             || direction == RTCRtpTransceiverDirection::Unspecified
1346                         {
1347                             continue;
1348                         }
1349 
1350                         let t = if let Some(t) =
1351                             find_by_mid(mid_value, &mut local_transceivers).await
1352                         {
1353                             Some(t)
1354                         } else {
1355                             satisfy_type_and_direction(kind, direction, &mut local_transceivers)
1356                                 .await
1357                         };
1358 
1359                         if let Some(t) = t {
1360                             if t.mid().is_none() {
1361                                 t.set_mid(mid_value.to_owned())?;
1362                             }
1363                         } else {
1364                             let local_direction =
1365                                 if direction == RTCRtpTransceiverDirection::Recvonly {
1366                                     RTCRtpTransceiverDirection::Sendonly
1367                                 } else {
1368                                     RTCRtpTransceiverDirection::Recvonly
1369                                 };
1370 
1371                             let receive_mtu = self.internal.setting_engine.get_receive_mtu();
1372 
1373                             let receiver = Arc::new(RTCRtpReceiver::new(
1374                                 receive_mtu,
1375                                 kind,
1376                                 Arc::clone(&self.internal.dtls_transport),
1377                                 Arc::clone(&self.internal.media_engine),
1378                                 Arc::clone(&self.interceptor),
1379                             ));
1380 
1381                             let sender = Arc::new(
1382                                 RTCRtpSender::new(
1383                                     receive_mtu,
1384                                     None,
1385                                     Arc::clone(&self.internal.dtls_transport),
1386                                     Arc::clone(&self.internal.media_engine),
1387                                     Arc::clone(&self.interceptor),
1388                                     false,
1389                                 )
1390                                 .await,
1391                             );
1392 
1393                             let t = RTCRtpTransceiver::new(
1394                                 receiver,
1395                                 sender,
1396                                 local_direction,
1397                                 kind,
1398                                 vec![],
1399                                 Arc::clone(&self.internal.media_engine),
1400                                 Some(Box::new(self.internal.make_negotiation_needed_trigger())),
1401                             )
1402                             .await;
1403 
1404                             self.internal.add_rtp_transceiver(Arc::clone(&t)).await;
1405 
1406                             if t.mid().is_none() {
1407                                 t.set_mid(mid_value.to_owned())?;
1408                             }
1409                         }
1410                     }
1411                 }
1412             }
1413 
1414             if we_offer {
1415                 // WebRTC Spec 1.0 https://www.w3.org/TR/webrtc/
1416                 // 4.5.9.2
1417                 // This is an answer from the remote.
1418                 if let Some(parsed) = remote_description.as_ref().and_then(|r| r.parsed.as_ref()) {
1419                     for media in &parsed.media_descriptions {
1420                         let mid_value = match get_mid_value(media) {
1421                             Some(m) => {
1422                                 if m.is_empty() {
1423                                     return Err(Error::ErrPeerConnRemoteDescriptionWithoutMidValue);
1424                                 } else {
1425                                     m
1426                                 }
1427                             }
1428                             None => continue,
1429                         };
1430 
1431                         if media.media_name.media == MEDIA_SECTION_APPLICATION {
1432                             continue;
1433                         }
1434                         let kind = RTPCodecType::from(media.media_name.media.as_str());
1435                         let direction = get_peer_direction(media);
1436                         if kind == RTPCodecType::Unspecified
1437                             || direction == RTCRtpTransceiverDirection::Unspecified
1438                         {
1439                             continue;
1440                         }
1441 
1442                         if let Some(t) = find_by_mid(mid_value, &mut local_transceivers).await {
1443                             let previous_direction = t.current_direction();
1444 
1445                             // 4.5.9.2.9
1446                             // Let direction be an RTCRtpTransceiverDirection value representing the direction
1447                             // from the media description, but with the send and receive directions reversed to
1448                             // represent this peer's point of view. If the media description is rejected,
1449                             // set direction to "inactive".
1450                             let reversed_direction = direction.reverse();
1451 
1452                             // 4.5.9.2.13.2
1453                             // Set transceiver.[[CurrentDirection]] and transceiver.[[Direction]]s to direction.
1454                             t.set_current_direction(reversed_direction);
1455                             // TODO: According to the specification we should set
1456                             // transceiver.[[Direction]] here, however libWebrtc doesn't do this.
1457                             // NOTE: After raising this it seems like the specification might
1458                             // change to remove the setting of transceiver.[[Direction]].
1459                             // See https://github.com/w3c/webrtc-pc/issues/2751#issuecomment-1185901962
1460                             // t.set_direction_internal(reversed_direction);
1461                             t.process_new_current_direction(previous_direction).await?;
1462                         }
1463                     }
1464                 }
1465             }
1466 
1467             let (remote_ufrag, remote_pwd, candidates) = extract_ice_details(parsed).await?;
1468 
1469             if is_renegotation
1470                 && self
1471                     .internal
1472                     .ice_transport
1473                     .have_remote_credentials_change(&remote_ufrag, &remote_pwd)
1474                     .await
1475             {
1476                 // An ICE Restart only happens implicitly for a set_remote_description of type offer
1477                 if !we_offer {
1478                     self.internal.ice_transport.restart().await?;
1479                 }
1480 
1481                 self.internal
1482                     .ice_transport
1483                     .set_remote_credentials(remote_ufrag.clone(), remote_pwd.clone())
1484                     .await?;
1485             }
1486 
1487             for candidate in candidates {
1488                 self.internal
1489                     .ice_transport
1490                     .add_remote_candidate(Some(candidate))
1491                     .await?;
1492             }
1493 
1494             if is_renegotation {
1495                 if we_offer {
1496                     self.start_rtp_senders().await?;
1497 
1498                     let pci = Arc::clone(&self.internal);
1499                     let remote_desc = Arc::new(desc);
1500                     self.internal
1501                         .ops
1502                         .enqueue(Operation::new(
1503                             move || {
1504                                 let pc = Arc::clone(&pci);
1505                                 let rd = Arc::clone(&remote_desc);
1506                                 Box::pin(async move {
1507                                     let _ = pc.start_rtp(true, rd).await;
1508                                     false
1509                                 })
1510                             },
1511                             "set_remote_description renegotiation",
1512                         ))
1513                         .await?;
1514                 }
1515                 return Ok(());
1516             }
1517 
1518             let mut remote_is_lite = false;
1519             for a in &parsed.attributes {
1520                 if a.key.trim() == ATTR_KEY_ICELITE {
1521                     remote_is_lite = true;
1522                     break;
1523                 }
1524             }
1525 
1526             let (fingerprint, fingerprint_hash) = extract_fingerprint(parsed)?;
1527 
1528             // If one of the agents is lite and the other one is not, the lite agent must be the controlling agent.
1529             // If both or neither agents are lite the offering agent is controlling.
1530             // RFC 8445 S6.1.1
1531             let ice_role = if (we_offer
1532                 && remote_is_lite == self.internal.setting_engine.candidates.ice_lite)
1533                 || (remote_is_lite && !self.internal.setting_engine.candidates.ice_lite)
1534             {
1535                 RTCIceRole::Controlling
1536             } else {
1537                 RTCIceRole::Controlled
1538             };
1539 
1540             // Start the networking in a new routine since it will block until
1541             // the connection is actually established.
1542             if we_offer {
1543                 self.start_rtp_senders().await?;
1544             }
1545 
1546             //log::trace!("start_transports: parsed={:?}", parsed);
1547 
1548             let pci = Arc::clone(&self.internal);
1549             let dtls_role = DTLSRole::from(parsed);
1550             let remote_desc = Arc::new(desc);
1551             self.internal
1552                 .ops
1553                 .enqueue(Operation::new(
1554                     move || {
1555                         let pc = Arc::clone(&pci);
1556                         let rd = Arc::clone(&remote_desc);
1557                         let ru = remote_ufrag.clone();
1558                         let rp = remote_pwd.clone();
1559                         let fp = fingerprint.clone();
1560                         let fp_hash = fingerprint_hash.clone();
1561                         Box::pin(async move {
1562                             log::trace!(
1563                                 "start_transports: ice_role={}, dtls_role={}",
1564                                 ice_role,
1565                                 dtls_role,
1566                             );
1567                             pc.start_transports(ice_role, dtls_role, ru, rp, fp, fp_hash)
1568                                 .await;
1569 
1570                             if we_offer {
1571                                 let _ = pc.start_rtp(false, rd).await;
1572                             }
1573                             false
1574                         })
1575                     },
1576                     "set_remote_description",
1577                 ))
1578                 .await?;
1579         }
1580 
1581         Ok(())
1582     }
1583 
1584     /// start_rtp_senders starts all outbound RTP streams
start_rtp_senders(&self) -> Result<()>1585     pub(crate) async fn start_rtp_senders(&self) -> Result<()> {
1586         let current_transceivers = self.internal.rtp_transceivers.lock().await;
1587         for transceiver in &*current_transceivers {
1588             let sender = transceiver.sender();
1589             if sender.is_negotiated() && !sender.has_sent() {
1590                 sender.send(&sender.get_parameters().await).await?;
1591             }
1592         }
1593 
1594         Ok(())
1595     }
1596 
1597     /// remote_description returns pending_remote_description if it is not null and
1598     /// otherwise it returns current_remote_description. This property is used to
1599     /// determine if setRemoteDescription has already been called.
1600     /// <https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-remotedescription>
remote_description(&self) -> Option<RTCSessionDescription>1601     pub async fn remote_description(&self) -> Option<RTCSessionDescription> {
1602         self.internal.remote_description().await
1603     }
1604 
1605     /// add_ice_candidate accepts an ICE candidate string and adds it
1606     /// to the existing set of candidates.
add_ice_candidate(&self, candidate: RTCIceCandidateInit) -> Result<()>1607     pub async fn add_ice_candidate(&self, candidate: RTCIceCandidateInit) -> Result<()> {
1608         if self.remote_description().await.is_none() {
1609             return Err(Error::ErrNoRemoteDescription);
1610         }
1611 
1612         let candidate_value = match candidate.candidate.strip_prefix("candidate:") {
1613             Some(s) => s,
1614             None => candidate.candidate.as_str(),
1615         };
1616 
1617         let ice_candidate = if !candidate_value.is_empty() {
1618             let candidate: Arc<dyn Candidate + Send + Sync> =
1619                 Arc::new(unmarshal_candidate(candidate_value)?);
1620 
1621             Some(RTCIceCandidate::from(&candidate))
1622         } else {
1623             None
1624         };
1625 
1626         self.internal
1627             .ice_transport
1628             .add_remote_candidate(ice_candidate)
1629             .await
1630     }
1631 
1632     /// ice_connection_state returns the ICE connection state of the
1633     /// PeerConnection instance.
ice_connection_state(&self) -> RTCIceConnectionState1634     pub fn ice_connection_state(&self) -> RTCIceConnectionState {
1635         self.internal
1636             .ice_connection_state
1637             .load(Ordering::SeqCst)
1638             .into()
1639     }
1640 
1641     /// get_senders returns the RTPSender that are currently attached to this PeerConnection
get_senders(&self) -> Vec<Arc<RTCRtpSender>>1642     pub async fn get_senders(&self) -> Vec<Arc<RTCRtpSender>> {
1643         let mut senders = vec![];
1644         let rtp_transceivers = self.internal.rtp_transceivers.lock().await;
1645         for transceiver in &*rtp_transceivers {
1646             let sender = transceiver.sender();
1647             senders.push(sender);
1648         }
1649         senders
1650     }
1651 
1652     /// get_receivers returns the RTPReceivers that are currently attached to this PeerConnection
get_receivers(&self) -> Vec<Arc<RTCRtpReceiver>>1653     pub async fn get_receivers(&self) -> Vec<Arc<RTCRtpReceiver>> {
1654         let mut receivers = vec![];
1655         let rtp_transceivers = self.internal.rtp_transceivers.lock().await;
1656         for transceiver in &*rtp_transceivers {
1657             receivers.push(transceiver.receiver());
1658         }
1659         receivers
1660     }
1661 
1662     /// get_transceivers returns the RtpTransceiver that are currently attached to this PeerConnection
get_transceivers(&self) -> Vec<Arc<RTCRtpTransceiver>>1663     pub async fn get_transceivers(&self) -> Vec<Arc<RTCRtpTransceiver>> {
1664         let rtp_transceivers = self.internal.rtp_transceivers.lock().await;
1665         rtp_transceivers.clone()
1666     }
1667 
1668     /// add_track adds a Track to the PeerConnection
add_track( &self, track: Arc<dyn TrackLocal + Send + Sync>, ) -> Result<Arc<RTCRtpSender>>1669     pub async fn add_track(
1670         &self,
1671         track: Arc<dyn TrackLocal + Send + Sync>,
1672     ) -> Result<Arc<RTCRtpSender>> {
1673         if self.internal.is_closed.load(Ordering::SeqCst) {
1674             return Err(Error::ErrConnectionClosed);
1675         }
1676 
1677         {
1678             let rtp_transceivers = self.internal.rtp_transceivers.lock().await;
1679             for t in &*rtp_transceivers {
1680                 if !t.stopped.load(Ordering::SeqCst) && t.kind == track.kind() {
1681                     let sender = t.sender();
1682                     if sender.track().await.is_none() {
1683                         if let Err(err) = sender.replace_track(Some(track)).await {
1684                             let _ = sender.stop().await;
1685                             return Err(err);
1686                         }
1687 
1688                         t.set_direction_internal(RTCRtpTransceiverDirection::from_send_recv(
1689                             true,
1690                             t.direction().has_recv(),
1691                         ));
1692 
1693                         self.internal.trigger_negotiation_needed().await;
1694                         return Ok(sender);
1695                     }
1696                 }
1697             }
1698         }
1699 
1700         let transceiver = self
1701             .internal
1702             .new_transceiver_from_track(RTCRtpTransceiverDirection::Sendrecv, track)
1703             .await?;
1704         self.internal
1705             .add_rtp_transceiver(Arc::clone(&transceiver))
1706             .await;
1707 
1708         Ok(transceiver.sender())
1709     }
1710 
1711     /// remove_track removes a Track from the PeerConnection
remove_track(&self, sender: &Arc<RTCRtpSender>) -> Result<()>1712     pub async fn remove_track(&self, sender: &Arc<RTCRtpSender>) -> Result<()> {
1713         if self.internal.is_closed.load(Ordering::SeqCst) {
1714             return Err(Error::ErrConnectionClosed);
1715         }
1716 
1717         let mut transceiver = None;
1718         {
1719             let rtp_transceivers = self.internal.rtp_transceivers.lock().await;
1720             for t in &*rtp_transceivers {
1721                 if t.sender().id == sender.id {
1722                     if sender.track().await.is_none() {
1723                         return Ok(());
1724                     }
1725                     transceiver = Some(t.clone());
1726                     break;
1727                 }
1728             }
1729         }
1730 
1731         let t = transceiver.ok_or(Error::ErrSenderNotCreatedByConnection)?;
1732 
1733         // This also happens in `set_sending_track` but we need to make sure we do this
1734         // before we call sender.stop to avoid a race condition when removing tracks and
1735         // generating offers.
1736         t.set_direction_internal(RTCRtpTransceiverDirection::from_send_recv(
1737             false,
1738             t.direction().has_recv(),
1739         ));
1740         // Stop the sender
1741         let sender_result = sender.stop().await;
1742         // This also updates direction
1743         let sending_track_result = t.set_sending_track(None).await;
1744 
1745         if sender_result.is_ok() && sending_track_result.is_ok() {
1746             self.internal.trigger_negotiation_needed().await;
1747         }
1748         Ok(())
1749     }
1750 
1751     /// add_transceiver_from_kind Create a new RtpTransceiver and adds it to the set of transceivers.
add_transceiver_from_kind( &self, kind: RTPCodecType, init: Option<RTCRtpTransceiverInit>, ) -> Result<Arc<RTCRtpTransceiver>>1752     pub async fn add_transceiver_from_kind(
1753         &self,
1754         kind: RTPCodecType,
1755         init: Option<RTCRtpTransceiverInit>,
1756     ) -> Result<Arc<RTCRtpTransceiver>> {
1757         self.internal.add_transceiver_from_kind(kind, init).await
1758     }
1759 
1760     /// add_transceiver_from_track Create a new RtpTransceiver(SendRecv or SendOnly) and add it to the set of transceivers.
add_transceiver_from_track( &self, track: Arc<dyn TrackLocal + Send + Sync>, init: Option<RTCRtpTransceiverInit>, ) -> Result<Arc<RTCRtpTransceiver>>1761     pub async fn add_transceiver_from_track(
1762         &self,
1763         track: Arc<dyn TrackLocal + Send + Sync>,
1764         init: Option<RTCRtpTransceiverInit>,
1765     ) -> Result<Arc<RTCRtpTransceiver>> {
1766         if self.internal.is_closed.load(Ordering::SeqCst) {
1767             return Err(Error::ErrConnectionClosed);
1768         }
1769 
1770         let direction = init
1771             .map(|init| init.direction)
1772             .unwrap_or(RTCRtpTransceiverDirection::Sendrecv);
1773 
1774         let t = self
1775             .internal
1776             .new_transceiver_from_track(direction, track)
1777             .await?;
1778 
1779         self.internal.add_rtp_transceiver(Arc::clone(&t)).await;
1780 
1781         Ok(t)
1782     }
1783 
1784     /// create_data_channel creates a new DataChannel object with the given label
1785     /// and optional DataChannelInit used to configure properties of the
1786     /// underlying channel such as data reliability.
create_data_channel( &self, label: &str, options: Option<RTCDataChannelInit>, ) -> Result<Arc<RTCDataChannel>>1787     pub async fn create_data_channel(
1788         &self,
1789         label: &str,
1790         options: Option<RTCDataChannelInit>,
1791     ) -> Result<Arc<RTCDataChannel>> {
1792         // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #2)
1793         if self.internal.is_closed.load(Ordering::SeqCst) {
1794             return Err(Error::ErrConnectionClosed);
1795         }
1796 
1797         let mut params = DataChannelParameters {
1798             label: label.to_owned(),
1799             ordered: true,
1800             ..Default::default()
1801         };
1802 
1803         // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #19)
1804         if let Some(options) = options {
1805             // Ordered indicates if data is allowed to be delivered out of order. The
1806             // default value of true, guarantees that data will be delivered in order.
1807             // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #9)
1808             if let Some(ordered) = options.ordered {
1809                 params.ordered = ordered;
1810             }
1811 
1812             // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #7)
1813             if let Some(max_packet_life_time) = options.max_packet_life_time {
1814                 params.max_packet_life_time = max_packet_life_time;
1815             }
1816 
1817             // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #8)
1818             if let Some(max_retransmits) = options.max_retransmits {
1819                 params.max_retransmits = max_retransmits;
1820             }
1821 
1822             // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #10)
1823             if let Some(protocol) = options.protocol {
1824                 params.protocol = protocol;
1825             }
1826 
1827             // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #11)
1828             if params.protocol.len() > 65535 {
1829                 return Err(Error::ErrProtocolTooLarge);
1830             }
1831 
1832             // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #12)
1833             params.negotiated = options.negotiated;
1834         }
1835 
1836         let d = Arc::new(RTCDataChannel::new(
1837             params,
1838             Arc::clone(&self.internal.setting_engine),
1839         ));
1840 
1841         // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #16)
1842         if d.max_packet_lifetime != 0 && d.max_retransmits != 0 {
1843             return Err(Error::ErrRetransmitsOrPacketLifeTime);
1844         }
1845 
1846         {
1847             let mut data_channels = self.internal.sctp_transport.data_channels.lock().await;
1848             data_channels.push(Arc::clone(&d));
1849         }
1850         self.internal
1851             .sctp_transport
1852             .data_channels_requested
1853             .fetch_add(1, Ordering::SeqCst);
1854 
1855         // If SCTP already connected open all the channels
1856         if self.internal.sctp_transport.state() == RTCSctpTransportState::Connected {
1857             d.open(Arc::clone(&self.internal.sctp_transport)).await?;
1858         }
1859 
1860         self.internal.trigger_negotiation_needed().await;
1861 
1862         Ok(d)
1863     }
1864 
1865     /// set_identity_provider is used to configure an identity provider to generate identity assertions
set_identity_provider(&self, _provider: &str) -> Result<()>1866     pub fn set_identity_provider(&self, _provider: &str) -> Result<()> {
1867         Err(Error::ErrPeerConnSetIdentityProviderNotImplemented)
1868     }
1869 
1870     /// write_rtcp sends a user provided RTCP packet to the connected peer. If no peer is connected the
1871     /// packet is discarded. It also runs any configured interceptors.
write_rtcp( &self, pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>], ) -> Result<usize>1872     pub async fn write_rtcp(
1873         &self,
1874         pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>],
1875     ) -> Result<usize> {
1876         let a = Attributes::new();
1877         Ok(self.interceptor_rtcp_writer.write(pkts, &a).await?)
1878     }
1879 
1880     /// close ends the PeerConnection
close(&self) -> Result<()>1881     pub async fn close(&self) -> Result<()> {
1882         // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #1)
1883         if self.internal.is_closed.load(Ordering::SeqCst) {
1884             return Ok(());
1885         }
1886 
1887         // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #2)
1888         self.internal.is_closed.store(true, Ordering::SeqCst);
1889 
1890         // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #3)
1891         self.internal
1892             .signaling_state
1893             .store(RTCSignalingState::Closed as u8, Ordering::SeqCst);
1894 
1895         // Try closing everything and collect the errors
1896         // Shutdown strategy:
1897         // 1. All Conn close by closing their underlying Conn.
1898         // 2. A Mux stops this chain. It won't close the underlying
1899         //    Conn if one of the endpoints is closed down. To
1900         //    continue the chain the Mux has to be closed.
1901         let mut close_errs = vec![];
1902 
1903         if let Err(err) = self.interceptor.close().await {
1904             close_errs.push(Error::new(format!("interceptor: {err}")));
1905         }
1906 
1907         // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #4)
1908         {
1909             let mut rtp_transceivers = self.internal.rtp_transceivers.lock().await;
1910             for t in &*rtp_transceivers {
1911                 if let Err(err) = t.stop().await {
1912                     close_errs.push(Error::new(format!("rtp_transceivers: {err}")));
1913                 }
1914             }
1915             rtp_transceivers.clear();
1916         }
1917 
1918         // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #5)
1919         {
1920             let mut data_channels = self.internal.sctp_transport.data_channels.lock().await;
1921             for d in &*data_channels {
1922                 if let Err(err) = d.close().await {
1923                     close_errs.push(Error::new(format!("data_channels: {err}")));
1924                 }
1925             }
1926             data_channels.clear();
1927         }
1928 
1929         // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #6)
1930         if let Err(err) = self.internal.sctp_transport.stop().await {
1931             close_errs.push(Error::new(format!("sctp_transport: {err}")));
1932         }
1933 
1934         // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #7)
1935         if let Err(err) = self.internal.dtls_transport.stop().await {
1936             close_errs.push(Error::new(format!("dtls_transport: {err}")));
1937         }
1938 
1939         // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #8, #9, #10)
1940         if let Err(err) = self.internal.ice_transport.stop().await {
1941             close_errs.push(Error::new(format!("dtls_transport: {err}")));
1942         }
1943 
1944         // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #11)
1945         RTCPeerConnection::update_connection_state(
1946             &self.internal.on_peer_connection_state_change_handler,
1947             &self.internal.is_closed,
1948             &self.internal.peer_connection_state,
1949             self.ice_connection_state(),
1950             self.internal.dtls_transport.state(),
1951         )
1952         .await;
1953 
1954         if let Err(err) = self.internal.ops.close().await {
1955             close_errs.push(Error::new(format!("ops: {err}")));
1956         }
1957 
1958         flatten_errs(close_errs)
1959     }
1960 
1961     /// CurrentLocalDescription represents the local description that was
1962     /// successfully negotiated the last time the PeerConnection transitioned
1963     /// into the stable state plus any local candidates that have been generated
1964     /// by the ICEAgent since the offer or answer was created.
current_local_description(&self) -> Option<RTCSessionDescription>1965     pub async fn current_local_description(&self) -> Option<RTCSessionDescription> {
1966         let local_description = {
1967             let current_local_description = self.internal.current_local_description.lock().await;
1968             current_local_description.clone()
1969         };
1970         let ice_gather = Some(&self.internal.ice_gatherer);
1971         let ice_gathering_state = self.ice_gathering_state();
1972 
1973         populate_local_candidates(local_description.as_ref(), ice_gather, ice_gathering_state).await
1974     }
1975 
1976     /// PendingLocalDescription represents a local description that is in the
1977     /// process of being negotiated plus any local candidates that have been
1978     /// generated by the ICEAgent since the offer or answer was created. If the
1979     /// PeerConnection is in the stable state, the value is null.
pending_local_description(&self) -> Option<RTCSessionDescription>1980     pub async fn pending_local_description(&self) -> Option<RTCSessionDescription> {
1981         let local_description = {
1982             let pending_local_description = self.internal.pending_local_description.lock().await;
1983             pending_local_description.clone()
1984         };
1985         let ice_gather = Some(&self.internal.ice_gatherer);
1986         let ice_gathering_state = self.ice_gathering_state();
1987 
1988         populate_local_candidates(local_description.as_ref(), ice_gather, ice_gathering_state).await
1989     }
1990 
1991     /// current_remote_description represents the last remote description that was
1992     /// successfully negotiated the last time the PeerConnection transitioned
1993     /// into the stable state plus any remote candidates that have been supplied
1994     /// via add_icecandidate() since the offer or answer was created.
current_remote_description(&self) -> Option<RTCSessionDescription>1995     pub async fn current_remote_description(&self) -> Option<RTCSessionDescription> {
1996         let current_remote_description = self.internal.current_remote_description.lock().await;
1997         current_remote_description.clone()
1998     }
1999 
2000     /// pending_remote_description represents a remote description that is in the
2001     /// process of being negotiated, complete with any remote candidates that
2002     /// have been supplied via add_icecandidate() since the offer or answer was
2003     /// created. If the PeerConnection is in the stable state, the value is
2004     /// null.
pending_remote_description(&self) -> Option<RTCSessionDescription>2005     pub async fn pending_remote_description(&self) -> Option<RTCSessionDescription> {
2006         let pending_remote_description = self.internal.pending_remote_description.lock().await;
2007         pending_remote_description.clone()
2008     }
2009 
2010     /// signaling_state attribute returns the signaling state of the
2011     /// PeerConnection instance.
signaling_state(&self) -> RTCSignalingState2012     pub fn signaling_state(&self) -> RTCSignalingState {
2013         self.internal.signaling_state.load(Ordering::SeqCst).into()
2014     }
2015 
2016     /// icegathering_state attribute returns the ICE gathering state of the
2017     /// PeerConnection instance.
ice_gathering_state(&self) -> RTCIceGatheringState2018     pub fn ice_gathering_state(&self) -> RTCIceGatheringState {
2019         self.internal.ice_gathering_state()
2020     }
2021 
2022     /// connection_state attribute returns the connection state of the
2023     /// PeerConnection instance.
connection_state(&self) -> RTCPeerConnectionState2024     pub fn connection_state(&self) -> RTCPeerConnectionState {
2025         self.internal
2026             .peer_connection_state
2027             .load(Ordering::SeqCst)
2028             .into()
2029     }
2030 
get_stats(&self) -> StatsReport2031     pub async fn get_stats(&self) -> StatsReport {
2032         self.internal
2033             .get_stats(self.get_stats_id().to_owned())
2034             .await
2035             .into()
2036     }
2037 
2038     /// sctp returns the SCTPTransport for this PeerConnection
2039     ///
2040     /// The SCTP transport over which SCTP data is sent and received. If SCTP has not been negotiated, the value is nil.
2041     /// <https://www.w3.org/TR/webrtc/#attributes-15>
sctp(&self) -> Arc<RTCSctpTransport>2042     pub fn sctp(&self) -> Arc<RTCSctpTransport> {
2043         Arc::clone(&self.internal.sctp_transport)
2044     }
2045 
2046     /// gathering_complete_promise is a Pion specific helper function that returns a channel that is closed when gathering is complete.
2047     /// This function may be helpful in cases where you are unable to trickle your ICE Candidates.
2048     ///
2049     /// It is better to not use this function, and instead trickle candidates. If you use this function you will see longer connection startup times.
2050     /// When the call is connected you will see no impact however.
gathering_complete_promise(&self) -> mpsc::Receiver<()>2051     pub async fn gathering_complete_promise(&self) -> mpsc::Receiver<()> {
2052         let (gathering_complete_tx, gathering_complete_rx) = mpsc::channel(1);
2053 
2054         // It's possible to miss the GatherComplete event since setGatherCompleteHandler is an atomic operation and the
2055         // promise might have been created after the gathering is finished. Therefore, we need to check if the ICE gathering
2056         // state has changed to complete so that we don't block the caller forever.
2057         let done = Arc::new(Mutex::new(Some(gathering_complete_tx)));
2058         let done2 = Arc::clone(&done);
2059         self.internal.set_gather_complete_handler(Box::new(move || {
2060             log::trace!("setGatherCompleteHandler");
2061             let done3 = Arc::clone(&done2);
2062             Box::pin(async move {
2063                 let mut d = done3.lock().await;
2064                 d.take();
2065             })
2066         }));
2067 
2068         if self.ice_gathering_state() == RTCIceGatheringState::Complete {
2069             log::trace!("ICEGatheringState::Complete");
2070             let mut d = done.lock().await;
2071             d.take();
2072         }
2073 
2074         gathering_complete_rx
2075     }
2076 
2077     /// Returns the internal [`RTCDtlsTransport`].
dtls_transport(&self) -> Arc<RTCDtlsTransport>2078     pub fn dtls_transport(&self) -> Arc<RTCDtlsTransport> {
2079         Arc::clone(&self.internal.dtls_transport)
2080     }
2081 
2082     /// Adds the specified [`RTCRtpTransceiver`] to this [`RTCPeerConnection`].
add_transceiver(&self, t: Arc<RTCRtpTransceiver>)2083     pub async fn add_transceiver(&self, t: Arc<RTCRtpTransceiver>) {
2084         self.internal.add_rtp_transceiver(t).await
2085     }
2086 }
2087