1 #[cfg(test)]
2 pub(crate) mod peer_connection_test;
3
4 pub mod certificate;
5 pub mod configuration;
6 pub mod offer_answer_options;
7 pub(crate) mod operation;
8 mod peer_connection_internal;
9 pub mod peer_connection_state;
10 pub mod policy;
11 pub mod sdp;
12 pub mod signaling_state;
13
14 use crate::api::media_engine::MediaEngine;
15 use crate::api::setting_engine::SettingEngine;
16 use crate::api::API;
17 use crate::data_channel::data_channel_init::RTCDataChannelInit;
18 use crate::data_channel::data_channel_parameters::DataChannelParameters;
19 use crate::data_channel::data_channel_state::RTCDataChannelState;
20 use crate::data_channel::RTCDataChannel;
21 use crate::dtls_transport::dtls_fingerprint::RTCDtlsFingerprint;
22 use crate::dtls_transport::dtls_parameters::DTLSParameters;
23 use crate::dtls_transport::dtls_role::{
24 DTLSRole, DEFAULT_DTLS_ROLE_ANSWER, DEFAULT_DTLS_ROLE_OFFER,
25 };
26 use crate::dtls_transport::dtls_transport_state::RTCDtlsTransportState;
27 use crate::dtls_transport::RTCDtlsTransport;
28 use crate::error::{flatten_errs, Error, Result};
29 use crate::ice_transport::ice_candidate::{RTCIceCandidate, RTCIceCandidateInit};
30 use crate::ice_transport::ice_connection_state::RTCIceConnectionState;
31 use crate::ice_transport::ice_gatherer::RTCIceGatherOptions;
32 use crate::ice_transport::ice_gatherer::{
33 OnGatheringCompleteHdlrFn, OnICEGathererStateChangeHdlrFn, OnLocalCandidateHdlrFn,
34 RTCIceGatherer,
35 };
36 use crate::ice_transport::ice_gatherer_state::RTCIceGathererState;
37 use crate::ice_transport::ice_gathering_state::RTCIceGatheringState;
38 use crate::ice_transport::ice_parameters::RTCIceParameters;
39 use crate::ice_transport::ice_role::RTCIceRole;
40 use crate::ice_transport::ice_transport_state::RTCIceTransportState;
41 use crate::ice_transport::RTCIceTransport;
42 use crate::peer_connection::certificate::RTCCertificate;
43 use crate::peer_connection::configuration::RTCConfiguration;
44 use crate::peer_connection::offer_answer_options::{RTCAnswerOptions, RTCOfferOptions};
45 use crate::peer_connection::operation::{Operation, Operations};
46 use crate::peer_connection::peer_connection_state::{
47 NegotiationNeededState, RTCPeerConnectionState,
48 };
49 use crate::peer_connection::sdp::sdp_type::RTCSdpType;
50 use crate::peer_connection::sdp::session_description::RTCSessionDescription;
51 use crate::peer_connection::sdp::*;
52 use crate::peer_connection::signaling_state::{
53 check_next_signaling_state, RTCSignalingState, StateChangeOp,
54 };
55 use crate::rtp_transceiver::rtp_codec::{RTCRtpHeaderExtensionCapability, RTPCodecType};
56 use crate::rtp_transceiver::rtp_receiver::RTCRtpReceiver;
57 use crate::rtp_transceiver::rtp_sender::RTCRtpSender;
58 use crate::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection;
59 use crate::rtp_transceiver::{
60 find_by_mid, handle_unknown_rtp_packet, satisfy_type_and_direction, RTCRtpTransceiver,
61 };
62 use crate::rtp_transceiver::{RTCRtpTransceiverInit, SSRC};
63 use crate::sctp_transport::sctp_transport_capabilities::SCTPTransportCapabilities;
64 use crate::sctp_transport::sctp_transport_state::RTCSctpTransportState;
65 use crate::sctp_transport::RTCSctpTransport;
66 use crate::stats::StatsReport;
67 use crate::track::track_local::TrackLocal;
68 use crate::track::track_remote::TrackRemote;
69
70 use ::ice::candidate::candidate_base::unmarshal_candidate;
71 use ::ice::candidate::Candidate;
72 use ::sdp::description::session::*;
73 use ::sdp::util::ConnectionRole;
74 use arc_swap::ArcSwapOption;
75 use async_trait::async_trait;
76 use interceptor::{stats, Attributes, Interceptor, RTCPWriter};
77 use peer_connection_internal::*;
78 use rand::{thread_rng, Rng};
79 use rcgen::KeyPair;
80 use srtp::stream::Stream;
81 use std::future::Future;
82 use std::pin::Pin;
83 use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering};
84 use std::sync::Arc;
85 use std::time::{SystemTime, UNIX_EPOCH};
86 use tokio::sync::{mpsc, Mutex};
87
88 /// SIMULCAST_PROBE_COUNT is the amount of RTP Packets
89 /// that handleUndeclaredSSRC will read and try to dispatch from
90 /// mid and rid values
91 pub(crate) const SIMULCAST_PROBE_COUNT: usize = 10;
92
93 /// SIMULCAST_MAX_PROBE_ROUTINES is how many active routines can be used to probe
94 /// If the total amount of incoming SSRCes exceeds this new requests will be ignored
95 pub(crate) const SIMULCAST_MAX_PROBE_ROUTINES: u64 = 25;
96
97 pub(crate) const MEDIA_SECTION_APPLICATION: &str = "application";
98
99 const RUNES_ALPHA: &[u8] = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
100
101 /// math_rand_alpha generates a mathmatical random alphabet sequence of the requested length.
math_rand_alpha(n: usize) -> String102 pub fn math_rand_alpha(n: usize) -> String {
103 let mut rng = thread_rng();
104
105 let rand_string: String = (0..n)
106 .map(|_| {
107 let idx = rng.gen_range(0..RUNES_ALPHA.len());
108 RUNES_ALPHA[idx] as char
109 })
110 .collect();
111
112 rand_string
113 }
114
115 pub type OnSignalingStateChangeHdlrFn = Box<
116 dyn (FnMut(RTCSignalingState) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
117 + Send
118 + Sync,
119 >;
120
121 pub type OnICEConnectionStateChangeHdlrFn = Box<
122 dyn (FnMut(RTCIceConnectionState) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
123 + Send
124 + Sync,
125 >;
126
127 pub type OnPeerConnectionStateChangeHdlrFn = Box<
128 dyn (FnMut(RTCPeerConnectionState) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
129 + Send
130 + Sync,
131 >;
132
133 pub type OnDataChannelHdlrFn = Box<
134 dyn (FnMut(Arc<RTCDataChannel>) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
135 + Send
136 + Sync,
137 >;
138
139 pub type OnTrackHdlrFn = Box<
140 dyn (FnMut(
141 Arc<TrackRemote>,
142 Arc<RTCRtpReceiver>,
143 Arc<RTCRtpTransceiver>,
144 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
145 + Send
146 + Sync,
147 >;
148
149 pub type OnNegotiationNeededHdlrFn =
150 Box<dyn (FnMut() -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>) + Send + Sync>;
151
152 #[derive(Clone)]
153 struct StartTransportsParams {
154 ice_transport: Arc<RTCIceTransport>,
155 dtls_transport: Arc<RTCDtlsTransport>,
156 on_peer_connection_state_change_handler: Arc<Mutex<Option<OnPeerConnectionStateChangeHdlrFn>>>,
157 is_closed: Arc<AtomicBool>,
158 peer_connection_state: Arc<AtomicU8>,
159 ice_connection_state: Arc<AtomicU8>,
160 }
161
162 #[derive(Clone)]
163 struct CheckNegotiationNeededParams {
164 sctp_transport: Arc<RTCSctpTransport>,
165 rtp_transceivers: Arc<Mutex<Vec<Arc<RTCRtpTransceiver>>>>,
166 current_local_description: Arc<Mutex<Option<RTCSessionDescription>>>,
167 current_remote_description: Arc<Mutex<Option<RTCSessionDescription>>>,
168 }
169
170 #[derive(Clone)]
171 struct NegotiationNeededParams {
172 on_negotiation_needed_handler: Arc<ArcSwapOption<Mutex<OnNegotiationNeededHdlrFn>>>,
173 is_closed: Arc<AtomicBool>,
174 ops: Arc<Operations>,
175 negotiation_needed_state: Arc<AtomicU8>,
176 is_negotiation_needed: Arc<AtomicBool>,
177 signaling_state: Arc<AtomicU8>,
178 check_negotiation_needed_params: CheckNegotiationNeededParams,
179 }
180
181 /// PeerConnection represents a WebRTC connection that establishes a
182 /// peer-to-peer communications with another PeerConnection instance in a
183 /// browser, or to another endpoint implementing the required protocols.
184 pub struct RTCPeerConnection {
185 stats_id: String,
186 idp_login_url: Option<String>,
187
188 configuration: RTCConfiguration,
189
190 interceptor_rtcp_writer: Arc<dyn RTCPWriter + Send + Sync>,
191
192 interceptor: Arc<dyn Interceptor + Send + Sync>,
193
194 pub(crate) internal: Arc<PeerConnectionInternal>,
195 }
196
197 impl std::fmt::Debug for RTCPeerConnection {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result198 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199 f.debug_struct("RTCPeerConnection")
200 .field("stats_id", &self.stats_id)
201 .field("idp_login_url", &self.idp_login_url)
202 .field("signaling_state", &self.signaling_state())
203 .field("ice_connection_state", &self.ice_connection_state())
204 .finish()
205 }
206 }
207
208 impl std::fmt::Display for RTCPeerConnection {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result209 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210 write!(f, "(RTCPeerConnection {})", self.stats_id)
211 }
212 }
213
214 impl RTCPeerConnection {
215 /// creates a PeerConnection with the default codecs and
216 /// interceptors. See register_default_codecs and register_default_interceptors.
217 ///
218 /// If you wish to customize the set of available codecs or the set of
219 /// active interceptors, create a MediaEngine and call api.new_peer_connection
220 /// instead of this function.
new(api: &API, mut configuration: RTCConfiguration) -> Result<Self>221 pub(crate) async fn new(api: &API, mut configuration: RTCConfiguration) -> Result<Self> {
222 RTCPeerConnection::init_configuration(&mut configuration)?;
223
224 let (interceptor, stats_interceptor): (Arc<dyn Interceptor + Send + Sync>, _) = {
225 let mut chain = api.interceptor_registry.build_chain("")?;
226 let stats_interceptor = stats::make_stats_interceptor("");
227 chain.add(stats_interceptor.clone());
228
229 (Arc::new(chain), stats_interceptor)
230 };
231
232 let weak_interceptor = Arc::downgrade(&interceptor);
233 let (internal, configuration) =
234 PeerConnectionInternal::new(api, weak_interceptor, stats_interceptor, configuration)
235 .await?;
236 let internal_rtcp_writer = Arc::clone(&internal) as Arc<dyn RTCPWriter + Send + Sync>;
237 let interceptor_rtcp_writer = interceptor.bind_rtcp_writer(internal_rtcp_writer).await;
238
239 // <https://w3c.github.io/webrtc-pc/#constructor> (Step #2)
240 // Some variables defined explicitly despite their implicit zero values to
241 // allow better readability to understand what is happening.
242 Ok(RTCPeerConnection {
243 stats_id: format!(
244 "PeerConnection-{}",
245 SystemTime::now()
246 .duration_since(UNIX_EPOCH)
247 .unwrap()
248 .as_nanos()
249 ),
250 interceptor,
251 interceptor_rtcp_writer,
252 internal,
253 configuration,
254 idp_login_url: None,
255 })
256 }
257
258 /// init_configuration defines validation of the specified Configuration and
259 /// its assignment to the internal configuration variable. This function differs
260 /// from its set_configuration counterpart because most of the checks do not
261 /// include verification statements related to the existing state. Thus the
262 /// function describes only minor verification of some the struct variables.
init_configuration(configuration: &mut RTCConfiguration) -> Result<()>263 fn init_configuration(configuration: &mut RTCConfiguration) -> Result<()> {
264 let sanitized_ice_servers = configuration.get_ice_servers();
265 if !sanitized_ice_servers.is_empty() {
266 for server in &sanitized_ice_servers {
267 server.validate()?;
268 }
269 }
270
271 // <https://www.w3.org/TR/webrtc/#constructor> (step #3)
272 if !configuration.certificates.is_empty() {
273 let now = SystemTime::now();
274 for cert in &configuration.certificates {
275 cert.expires
276 .duration_since(now)
277 .map_err(|_| Error::ErrCertificateExpired)?;
278 }
279 } else {
280 let kp = KeyPair::generate(&rcgen::PKCS_ECDSA_P256_SHA256)?;
281 let cert = RTCCertificate::from_key_pair(kp)?;
282 configuration.certificates = vec![cert];
283 };
284
285 Ok(())
286 }
287
288 /// on_signaling_state_change sets an event handler which is invoked when the
289 /// peer connection's signaling state changes
on_signaling_state_change(&self, f: OnSignalingStateChangeHdlrFn)290 pub fn on_signaling_state_change(&self, f: OnSignalingStateChangeHdlrFn) {
291 self.internal
292 .on_signaling_state_change_handler
293 .store(Some(Arc::new(Mutex::new(f))))
294 }
295
do_signaling_state_change(&self, new_state: RTCSignalingState)296 async fn do_signaling_state_change(&self, new_state: RTCSignalingState) {
297 log::info!("signaling state changed to {}", new_state);
298 if let Some(handler) = &*self.internal.on_signaling_state_change_handler.load() {
299 let mut f = handler.lock().await;
300 f(new_state).await;
301 }
302 }
303
304 /// on_data_channel sets an event handler which is invoked when a data
305 /// channel message arrives from a remote peer.
on_data_channel(&self, f: OnDataChannelHdlrFn)306 pub fn on_data_channel(&self, f: OnDataChannelHdlrFn) {
307 self.internal
308 .on_data_channel_handler
309 .store(Some(Arc::new(Mutex::new(f))));
310 }
311
312 /// on_negotiation_needed sets an event handler which is invoked when
313 /// a change has occurred which requires session negotiation
on_negotiation_needed(&self, f: OnNegotiationNeededHdlrFn)314 pub fn on_negotiation_needed(&self, f: OnNegotiationNeededHdlrFn) {
315 self.internal
316 .on_negotiation_needed_handler
317 .store(Some(Arc::new(Mutex::new(f))));
318 }
319
do_negotiation_needed_inner(params: &NegotiationNeededParams) -> bool320 fn do_negotiation_needed_inner(params: &NegotiationNeededParams) -> bool {
321 // https://w3c.github.io/webrtc-pc/#updating-the-negotiation-needed-flag
322 // non-canon step 1
323 let state: NegotiationNeededState = params
324 .negotiation_needed_state
325 .load(Ordering::SeqCst)
326 .into();
327 if state == NegotiationNeededState::Run {
328 params
329 .negotiation_needed_state
330 .store(NegotiationNeededState::Queue as u8, Ordering::SeqCst);
331 false
332 } else if state == NegotiationNeededState::Queue {
333 false
334 } else {
335 params
336 .negotiation_needed_state
337 .store(NegotiationNeededState::Run as u8, Ordering::SeqCst);
338 true
339 }
340 }
341 /// do_negotiation_needed enqueues negotiation_needed_op if necessary
342 /// caller of this method should hold `pc.mu` lock
do_negotiation_needed(params: NegotiationNeededParams)343 async fn do_negotiation_needed(params: NegotiationNeededParams) {
344 if !RTCPeerConnection::do_negotiation_needed_inner(¶ms) {
345 return;
346 }
347
348 let params2 = params.clone();
349 let _ = params
350 .ops
351 .enqueue(Operation::new(
352 move || {
353 let params3 = params2.clone();
354 Box::pin(async move { RTCPeerConnection::negotiation_needed_op(params3).await })
355 },
356 "do_negotiation_needed",
357 ))
358 .await;
359 }
360
after_negotiation_needed_op(params: NegotiationNeededParams) -> bool361 async fn after_negotiation_needed_op(params: NegotiationNeededParams) -> bool {
362 let old_negotiation_needed_state = params.negotiation_needed_state.load(Ordering::SeqCst);
363
364 params
365 .negotiation_needed_state
366 .store(NegotiationNeededState::Empty as u8, Ordering::SeqCst);
367
368 if old_negotiation_needed_state == NegotiationNeededState::Queue as u8 {
369 RTCPeerConnection::do_negotiation_needed_inner(¶ms)
370 } else {
371 false
372 }
373 }
374
negotiation_needed_op(params: NegotiationNeededParams) -> bool375 async fn negotiation_needed_op(params: NegotiationNeededParams) -> bool {
376 // Don't run NegotiatedNeeded checks if on_negotiation_needed is not set
377 let handler = &*params.on_negotiation_needed_handler.load();
378 if handler.is_none() {
379 return false;
380 }
381
382 // https://www.w3.org/TR/webrtc/#updating-the-negotiation-needed-flag
383 // Step 2.1
384 if params.is_closed.load(Ordering::SeqCst) {
385 return false;
386 }
387 // non-canon step 2.2
388 if !params.ops.is_empty().await {
389 //enqueue negotiation_needed_op again by return true
390 return true;
391 }
392
393 // non-canon, run again if there was a request
394 // starting defer(after_do_negotiation_needed(params).await);
395
396 // Step 2.3
397 if params.signaling_state.load(Ordering::SeqCst) != RTCSignalingState::Stable as u8 {
398 return RTCPeerConnection::after_negotiation_needed_op(params).await;
399 }
400
401 // Step 2.4
402 if !RTCPeerConnection::check_negotiation_needed(¶ms.check_negotiation_needed_params)
403 .await
404 {
405 params.is_negotiation_needed.store(false, Ordering::SeqCst);
406 return RTCPeerConnection::after_negotiation_needed_op(params).await;
407 }
408
409 // Step 2.5
410 if params.is_negotiation_needed.load(Ordering::SeqCst) {
411 return RTCPeerConnection::after_negotiation_needed_op(params).await;
412 }
413
414 // Step 2.6
415 params.is_negotiation_needed.store(true, Ordering::SeqCst);
416
417 // Step 2.7
418 if let Some(handler) = handler {
419 let mut f = handler.lock().await;
420 f().await;
421 }
422
423 RTCPeerConnection::after_negotiation_needed_op(params).await
424 }
425
check_negotiation_needed(params: &CheckNegotiationNeededParams) -> bool426 async fn check_negotiation_needed(params: &CheckNegotiationNeededParams) -> bool {
427 // To check if negotiation is needed for connection, perform the following checks:
428 // Skip 1, 2 steps
429 // Step 3
430 let current_local_description = {
431 let current_local_description = params.current_local_description.lock().await;
432 current_local_description.clone()
433 };
434 let current_remote_description = {
435 let current_remote_description = params.current_remote_description.lock().await;
436 current_remote_description.clone()
437 };
438
439 if let Some(local_desc) = ¤t_local_description {
440 let len_data_channel = {
441 let data_channels = params.sctp_transport.data_channels.lock().await;
442 data_channels.len()
443 };
444
445 if len_data_channel != 0 && have_data_channel(local_desc).is_none() {
446 return true;
447 }
448
449 let transceivers = params.rtp_transceivers.lock().await;
450 for t in &*transceivers {
451 // https://www.w3.org/TR/webrtc/#dfn-update-the-negotiation-needed-flag
452 // Step 5.1
453 // if t.stopping && !t.stopped {
454 // return true
455 // }
456 let mid = t.mid();
457 let m = mid.as_ref().and_then(|mid| get_by_mid(mid, local_desc));
458 // Step 5.2
459 if !t.stopped.load(Ordering::SeqCst) {
460 if m.is_none() {
461 return true;
462 }
463
464 if let Some(m) = m {
465 // Step 5.3.1
466 if t.direction().has_send() {
467 let dmsid = match m.attribute(ATTR_KEY_MSID).and_then(|o| o) {
468 Some(m) => m,
469 None => return true, // doesn't contain a single a=msid line
470 };
471
472 let sender = t.sender();
473 // (...)or the number of MSIDs from the a=msid lines in this m= section,
474 // or the MSID values themselves, differ from what is in
475 // transceiver.sender.[[AssociatedMediaStreamIds]], return true.
476
477 // TODO: This check should be robuster by storing all streams in the
478 // local description so we can compare all of them. For no we only
479 // consider the first one.
480
481 let stream_ids = sender.associated_media_stream_ids();
482 // Different number of lines, 1 vs 0
483 if stream_ids.is_empty() {
484 return true;
485 }
486
487 // different stream id
488 if dmsid.split_whitespace().next() != Some(&stream_ids[0]) {
489 return true;
490 }
491 }
492 match local_desc.sdp_type {
493 RTCSdpType::Offer => {
494 // Step 5.3.2
495 if let Some(remote_desc) = ¤t_remote_description {
496 if let Some(rm) =
497 t.mid().and_then(|mid| get_by_mid(&mid, remote_desc))
498 {
499 if get_peer_direction(m) != t.direction()
500 && get_peer_direction(rm) != t.direction().reverse()
501 {
502 return true;
503 }
504 } else {
505 return true;
506 }
507 }
508 }
509 RTCSdpType::Answer => {
510 let remote_desc = match ¤t_remote_description {
511 Some(d) => d,
512 None => return true,
513 };
514 let offered_direction =
515 match t.mid().and_then(|mid| get_by_mid(&mid, remote_desc)) {
516 Some(d) => {
517 let dir = get_peer_direction(d);
518 if dir == RTCRtpTransceiverDirection::Unspecified {
519 RTCRtpTransceiverDirection::Inactive
520 } else {
521 dir
522 }
523 }
524 None => RTCRtpTransceiverDirection::Inactive,
525 };
526
527 let current_direction = get_peer_direction(m);
528 // Step 5.3.3
529 if current_direction
530 != t.direction().intersect(offered_direction.reverse())
531 {
532 return true;
533 }
534 }
535 _ => {}
536 };
537 }
538 }
539 // Step 5.4
540 if t.stopped.load(Ordering::SeqCst) {
541 let search_mid = match t.mid() {
542 Some(mid) => mid,
543 None => return false,
544 };
545
546 if let Some(remote_desc) = &*params.current_remote_description.lock().await {
547 return get_by_mid(&search_mid, local_desc).is_some()
548 || get_by_mid(&search_mid, remote_desc).is_some();
549 }
550 }
551 }
552 // Step 6
553 false
554 } else {
555 true
556 }
557 }
558
559 /// on_ice_candidate sets an event handler which is invoked when a new ICE
560 /// candidate is found.
561 /// Take note that the handler is gonna be called with a nil pointer when
562 /// gathering is finished.
on_ice_candidate(&self, f: OnLocalCandidateHdlrFn)563 pub fn on_ice_candidate(&self, f: OnLocalCandidateHdlrFn) {
564 self.internal.ice_gatherer.on_local_candidate(f)
565 }
566
567 /// on_ice_gathering_state_change sets an event handler which is invoked when the
568 /// ICE candidate gathering state has changed.
on_ice_gathering_state_change(&self, f: OnICEGathererStateChangeHdlrFn)569 pub fn on_ice_gathering_state_change(&self, f: OnICEGathererStateChangeHdlrFn) {
570 self.internal.ice_gatherer.on_state_change(f)
571 }
572
573 /// on_track sets an event handler which is called when remote track
574 /// arrives from a remote peer.
on_track(&self, f: OnTrackHdlrFn)575 pub fn on_track(&self, f: OnTrackHdlrFn) {
576 self.internal
577 .on_track_handler
578 .store(Some(Arc::new(Mutex::new(f))));
579 }
580
do_track( on_track_handler: Arc<ArcSwapOption<Mutex<OnTrackHdlrFn>>>, track: Arc<TrackRemote>, receiver: Arc<RTCRtpReceiver>, transceiver: Arc<RTCRtpTransceiver>, )581 fn do_track(
582 on_track_handler: Arc<ArcSwapOption<Mutex<OnTrackHdlrFn>>>,
583 track: Arc<TrackRemote>,
584 receiver: Arc<RTCRtpReceiver>,
585 transceiver: Arc<RTCRtpTransceiver>,
586 ) {
587 log::debug!("got new track: {:?}", track);
588
589 tokio::spawn(async move {
590 if let Some(handler) = &*on_track_handler.load() {
591 let mut f = handler.lock().await;
592 f(track, receiver, transceiver).await;
593 } else {
594 log::warn!("on_track unset, unable to handle incoming media streams");
595 }
596 });
597 }
598
599 /// on_ice_connection_state_change sets an event handler which is called
600 /// when an ICE connection state is changed.
on_ice_connection_state_change(&self, f: OnICEConnectionStateChangeHdlrFn)601 pub fn on_ice_connection_state_change(&self, f: OnICEConnectionStateChangeHdlrFn) {
602 self.internal
603 .on_ice_connection_state_change_handler
604 .store(Some(Arc::new(Mutex::new(f))));
605 }
606
do_ice_connection_state_change( handler: &Arc<ArcSwapOption<Mutex<OnICEConnectionStateChangeHdlrFn>>>, ice_connection_state: &Arc<AtomicU8>, cs: RTCIceConnectionState, )607 async fn do_ice_connection_state_change(
608 handler: &Arc<ArcSwapOption<Mutex<OnICEConnectionStateChangeHdlrFn>>>,
609 ice_connection_state: &Arc<AtomicU8>,
610 cs: RTCIceConnectionState,
611 ) {
612 ice_connection_state.store(cs as u8, Ordering::SeqCst);
613
614 log::info!("ICE connection state changed: {}", cs);
615 if let Some(handler) = &*handler.load() {
616 let mut f = handler.lock().await;
617 f(cs).await;
618 }
619 }
620
621 /// on_peer_connection_state_change sets an event handler which is called
622 /// when the PeerConnectionState has changed
on_peer_connection_state_change(&self, f: OnPeerConnectionStateChangeHdlrFn)623 pub fn on_peer_connection_state_change(&self, f: OnPeerConnectionStateChangeHdlrFn) {
624 self.internal
625 .on_peer_connection_state_change_handler
626 .store(Some(Arc::new(Mutex::new(f))));
627 }
628
do_peer_connection_state_change( handler: &Arc<ArcSwapOption<Mutex<OnPeerConnectionStateChangeHdlrFn>>>, cs: RTCPeerConnectionState, )629 async fn do_peer_connection_state_change(
630 handler: &Arc<ArcSwapOption<Mutex<OnPeerConnectionStateChangeHdlrFn>>>,
631 cs: RTCPeerConnectionState,
632 ) {
633 if let Some(handler) = &*handler.load() {
634 let mut f = handler.lock().await;
635 f(cs).await;
636 }
637 }
638
639 /*TODO: // set_configuration updates the configuration of this PeerConnection object.
640 pub async fn set_configuration(&mut self, configuration: Configuration) -> Result<()> {
641 //nolint:gocognit
642 // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-setconfiguration (step #2)
643 if self.internal.is_closed.load(Ordering::SeqCst) {
644 return Err(Error::ErrConnectionClosed.into());
645 }
646
647 // https://www.w3.org/TR/webrtc/#set-the-configuration (step #3)
648 if !configuration.peer_identity.is_empty() {
649 if configuration.peer_identity != self.configuration.peer_identity {
650 return Err(Error::ErrModifyingPeerIdentity.into());
651 }
652 self.configuration.peer_identity = configuration.peer_identity;
653 }
654
655 // https://www.w3.org/TR/webrtc/#set-the-configuration (step #4)
656 if !configuration.certificates.is_empty() {
657 if configuration.certificates.len() != self.configuration.certificates.len() {
658 return Err(Error::ErrModifyingCertificates.into());
659 }
660
661 self.configuration.certificates = configuration.certificates;
662 }
663
664 // https://www.w3.org/TR/webrtc/#set-the-configuration (step #5)
665 if configuration.bundle_policy != BundlePolicy::Unspecified {
666 if configuration.bundle_policy != self.configuration.bundle_policy {
667 return Err(Error::ErrModifyingBundlePolicy.into());
668 }
669 self.configuration.bundle_policy = configuration.bundle_policy;
670 }
671
672 // https://www.w3.org/TR/webrtc/#set-the-configuration (step #6)
673 if configuration.rtcp_mux_policy != RTCPMuxPolicy::Unspecified {
674 if configuration.rtcp_mux_policy != self.configuration.rtcp_mux_policy {
675 return Err(Error::ErrModifyingRTCPMuxPolicy.into());
676 }
677 self.configuration.rtcp_mux_policy = configuration.rtcp_mux_policy;
678 }
679
680 // https://www.w3.org/TR/webrtc/#set-the-configuration (step #7)
681 if configuration.ice_candidate_pool_size != 0 {
682 if self.configuration.ice_candidate_pool_size != configuration.ice_candidate_pool_size
683 && self.local_description().await.is_some()
684 {
685 return Err(Error::ErrModifyingICECandidatePoolSize.into());
686 }
687 self.configuration.ice_candidate_pool_size = configuration.ice_candidate_pool_size;
688 }
689
690 // https://www.w3.org/TR/webrtc/#set-the-configuration (step #8)
691 if configuration.ice_transport_policy != ICETransportPolicy::Unspecified {
692 self.configuration.ice_transport_policy = configuration.ice_transport_policy
693 }
694
695 // https://www.w3.org/TR/webrtc/#set-the-configuration (step #11)
696 if !configuration.ice_servers.is_empty() {
697 // https://www.w3.org/TR/webrtc/#set-the-configuration (step #11.3)
698 for server in &configuration.ice_servers {
699 server.validate()?;
700 }
701 self.configuration.ice_servers = configuration.ice_servers
702 }
703 Ok(())
704 }*/
705
706 /// get_configuration returns a Configuration object representing the current
707 /// configuration of this PeerConnection object. The returned object is a
708 /// copy and direct mutation on it will not take affect until set_configuration
709 /// has been called with Configuration passed as its only argument.
710 /// <https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-getconfiguration>
get_configuration(&self) -> &RTCConfiguration711 pub fn get_configuration(&self) -> &RTCConfiguration {
712 &self.configuration
713 }
714
get_stats_id(&self) -> &str715 pub fn get_stats_id(&self) -> &str {
716 self.stats_id.as_str()
717 }
718
719 /// create_offer starts the PeerConnection and generates the localDescription
720 /// <https://w3c.github.io/webrtc-pc/#dom-rtcpeerconnection-createoffer>
create_offer( &self, options: Option<RTCOfferOptions>, ) -> Result<RTCSessionDescription>721 pub async fn create_offer(
722 &self,
723 options: Option<RTCOfferOptions>,
724 ) -> Result<RTCSessionDescription> {
725 let use_identity = self.idp_login_url.is_some();
726 if use_identity {
727 return Err(Error::ErrIdentityProviderNotImplemented);
728 } else if self.internal.is_closed.load(Ordering::SeqCst) {
729 return Err(Error::ErrConnectionClosed);
730 }
731
732 if let Some(options) = options {
733 if options.ice_restart {
734 self.internal.ice_transport.restart().await?;
735 }
736 }
737
738 // This may be necessary to recompute if, for example, createOffer was called when only an
739 // audio RTCRtpTransceiver was added to connection, but while performing the in-parallel
740 // steps to create an offer, a video RTCRtpTransceiver was added, requiring additional
741 // inspection of video system resources.
742 let mut count = 0;
743 let mut offer;
744
745 loop {
746 // We cache current transceivers to ensure they aren't
747 // mutated during offer generation. We later check if they have
748 // been mutated and recompute the offer if necessary.
749 let current_transceivers = {
750 let rtp_transceivers = self.internal.rtp_transceivers.lock().await;
751 rtp_transceivers.clone()
752 };
753
754 // include unmatched local transceivers
755 // update the greater mid if the remote description provides a greater one
756 {
757 let current_remote_description =
758 self.internal.current_remote_description.lock().await;
759 if let Some(d) = &*current_remote_description {
760 if let Some(parsed) = &d.parsed {
761 for media in &parsed.media_descriptions {
762 if let Some(mid) = get_mid_value(media) {
763 if mid.is_empty() {
764 continue;
765 }
766 let numeric_mid = match mid.parse::<isize>() {
767 Ok(n) => n,
768 Err(_) => continue,
769 };
770 if numeric_mid > self.internal.greater_mid.load(Ordering::SeqCst) {
771 self.internal
772 .greater_mid
773 .store(numeric_mid, Ordering::SeqCst);
774 }
775 }
776 }
777 }
778 }
779 }
780 for t in ¤t_transceivers {
781 if t.mid().is_some() {
782 continue;
783 }
784
785 if let Some(gen) = &self.internal.setting_engine.mid_generator {
786 let current_greatest = self.internal.greater_mid.load(Ordering::SeqCst);
787 let mid = (gen)(current_greatest);
788
789 // If it's possible to parse the returned mid as numeric, we will update the greater_mid field.
790 if let Ok(numeric_mid) = mid.parse::<isize>() {
791 if numeric_mid > self.internal.greater_mid.load(Ordering::SeqCst) {
792 self.internal
793 .greater_mid
794 .store(numeric_mid, Ordering::SeqCst);
795 }
796 }
797
798 t.set_mid(mid)?;
799 } else {
800 let greater_mid = self.internal.greater_mid.fetch_add(1, Ordering::SeqCst);
801 t.set_mid(format!("{}", greater_mid + 1))?;
802 }
803 }
804
805 let current_remote_description_is_none = {
806 let current_remote_description =
807 self.internal.current_remote_description.lock().await;
808 current_remote_description.is_none()
809 };
810
811 let mut d = if current_remote_description_is_none {
812 self.internal
813 .generate_unmatched_sdp(current_transceivers, use_identity)
814 .await?
815 } else {
816 self.internal
817 .generate_matched_sdp(
818 current_transceivers,
819 use_identity,
820 true, /*includeUnmatched */
821 DEFAULT_DTLS_ROLE_OFFER.to_connection_role(),
822 )
823 .await?
824 };
825
826 {
827 let mut sdp_origin = self.internal.sdp_origin.lock().await;
828 update_sdp_origin(&mut sdp_origin, &mut d);
829 }
830 let sdp = d.marshal();
831
832 offer = RTCSessionDescription {
833 sdp_type: RTCSdpType::Offer,
834 sdp,
835 parsed: Some(d),
836 };
837
838 // Verify local media hasn't changed during offer
839 // generation. Recompute if necessary
840 if !self.internal.has_local_description_changed(&offer).await {
841 break;
842 }
843 count += 1;
844 if count >= 128 {
845 return Err(Error::ErrExcessiveRetries);
846 }
847 }
848
849 {
850 let mut last_offer = self.internal.last_offer.lock().await;
851 *last_offer = offer.sdp.clone();
852 }
853 Ok(offer)
854 }
855
856 /// Update the PeerConnectionState given the state of relevant transports
857 /// <https://www.w3.org/TR/webrtc/#rtcpeerconnectionstate-enum>
update_connection_state( on_peer_connection_state_change_handler: &Arc< ArcSwapOption<Mutex<OnPeerConnectionStateChangeHdlrFn>>, >, is_closed: &Arc<AtomicBool>, peer_connection_state: &Arc<AtomicU8>, ice_connection_state: RTCIceConnectionState, dtls_transport_state: RTCDtlsTransportState, )858 async fn update_connection_state(
859 on_peer_connection_state_change_handler: &Arc<
860 ArcSwapOption<Mutex<OnPeerConnectionStateChangeHdlrFn>>,
861 >,
862 is_closed: &Arc<AtomicBool>,
863 peer_connection_state: &Arc<AtomicU8>,
864 ice_connection_state: RTCIceConnectionState,
865 dtls_transport_state: RTCDtlsTransportState,
866 ) {
867 let connection_state =
868 // The RTCPeerConnection object's [[IsClosed]] slot is true.
869 if is_closed.load(Ordering::SeqCst) {
870 RTCPeerConnectionState::Closed
871 } else if ice_connection_state == RTCIceConnectionState::Failed || dtls_transport_state == RTCDtlsTransportState::Failed {
872 // Any of the RTCIceTransports or RTCDtlsTransports are in a "failed" state.
873 RTCPeerConnectionState::Failed
874 } else if ice_connection_state == RTCIceConnectionState::Disconnected {
875 // Any of the RTCIceTransports or RTCDtlsTransports are in the "disconnected"
876 // state and none of them are in the "failed" or "connecting" or "checking" state.
877 RTCPeerConnectionState::Disconnected
878 } else if ice_connection_state == RTCIceConnectionState::Connected && dtls_transport_state == RTCDtlsTransportState::Connected {
879 // All RTCIceTransports and RTCDtlsTransports are in the "connected", "completed" or "closed"
880 // state and at least one of them is in the "connected" or "completed" state.
881 RTCPeerConnectionState::Connected
882 } else if ice_connection_state == RTCIceConnectionState::Checking && dtls_transport_state == RTCDtlsTransportState::Connecting {
883 // Any of the RTCIceTransports or RTCDtlsTransports are in the "connecting" or
884 // "checking" state and none of them is in the "failed" state.
885 RTCPeerConnectionState::Connecting
886 } else {
887 RTCPeerConnectionState::New
888 };
889
890 if peer_connection_state.load(Ordering::SeqCst) == connection_state as u8 {
891 return;
892 }
893
894 log::info!("peer connection state changed: {}", connection_state);
895 peer_connection_state.store(connection_state as u8, Ordering::SeqCst);
896
897 RTCPeerConnection::do_peer_connection_state_change(
898 on_peer_connection_state_change_handler,
899 connection_state,
900 )
901 .await;
902 }
903
904 /// create_answer starts the PeerConnection and generates the localDescription
create_answer( &self, _options: Option<RTCAnswerOptions>, ) -> Result<RTCSessionDescription>905 pub async fn create_answer(
906 &self,
907 _options: Option<RTCAnswerOptions>,
908 ) -> Result<RTCSessionDescription> {
909 let use_identity = self.idp_login_url.is_some();
910 if self.remote_description().await.is_none() {
911 return Err(Error::ErrNoRemoteDescription);
912 } else if use_identity {
913 return Err(Error::ErrIdentityProviderNotImplemented);
914 } else if self.internal.is_closed.load(Ordering::SeqCst) {
915 return Err(Error::ErrConnectionClosed);
916 } else if self.signaling_state() != RTCSignalingState::HaveRemoteOffer
917 && self.signaling_state() != RTCSignalingState::HaveLocalPranswer
918 {
919 return Err(Error::ErrIncorrectSignalingState);
920 }
921
922 let mut connection_role = self
923 .internal
924 .setting_engine
925 .answering_dtls_role
926 .to_connection_role();
927 if connection_role == ConnectionRole::Unspecified {
928 connection_role = DEFAULT_DTLS_ROLE_ANSWER.to_connection_role();
929 }
930
931 let local_transceivers = self.get_transceivers().await;
932 let mut d = self
933 .internal
934 .generate_matched_sdp(
935 local_transceivers,
936 use_identity,
937 false, /*includeUnmatched */
938 connection_role,
939 )
940 .await?;
941
942 {
943 let mut sdp_origin = self.internal.sdp_origin.lock().await;
944 update_sdp_origin(&mut sdp_origin, &mut d);
945 }
946 let sdp = d.marshal();
947
948 let answer = RTCSessionDescription {
949 sdp_type: RTCSdpType::Answer,
950 sdp,
951 parsed: Some(d),
952 };
953
954 {
955 let mut last_answer = self.internal.last_answer.lock().await;
956 *last_answer = answer.sdp.clone();
957 }
958 Ok(answer)
959 }
960
961 // 4.4.1.6 Set the SessionDescription
set_description( &self, sd: &RTCSessionDescription, op: StateChangeOp, ) -> Result<()>962 pub(crate) async fn set_description(
963 &self,
964 sd: &RTCSessionDescription,
965 op: StateChangeOp,
966 ) -> Result<()> {
967 if self.internal.is_closed.load(Ordering::SeqCst) {
968 return Err(Error::ErrConnectionClosed);
969 } else if sd.sdp_type == RTCSdpType::Unspecified {
970 return Err(Error::ErrPeerConnSDPTypeInvalidValue);
971 }
972
973 let next_state = {
974 let cur = self.signaling_state();
975 let new_sdpdoes_not_match_offer = Error::ErrSDPDoesNotMatchOffer;
976 let new_sdpdoes_not_match_answer = Error::ErrSDPDoesNotMatchAnswer;
977
978 match op {
979 StateChangeOp::SetLocal => {
980 match sd.sdp_type {
981 // stable->SetLocal(offer)->have-local-offer
982 RTCSdpType::Offer => {
983 let check = {
984 let last_offer = self.internal.last_offer.lock().await;
985 sd.sdp != *last_offer
986 };
987 if check {
988 Err(new_sdpdoes_not_match_offer)
989 } else {
990 let next_state = check_next_signaling_state(
991 cur,
992 RTCSignalingState::HaveLocalOffer,
993 StateChangeOp::SetLocal,
994 sd.sdp_type,
995 );
996 if next_state.is_ok() {
997 let mut pending_local_description =
998 self.internal.pending_local_description.lock().await;
999 *pending_local_description = Some(sd.clone());
1000 }
1001 next_state
1002 }
1003 }
1004 // have-remote-offer->SetLocal(answer)->stable
1005 // have-local-pranswer->SetLocal(answer)->stable
1006 RTCSdpType::Answer => {
1007 let check = {
1008 let last_answer = self.internal.last_answer.lock().await;
1009 sd.sdp != *last_answer
1010 };
1011 if check {
1012 Err(new_sdpdoes_not_match_answer)
1013 } else {
1014 let next_state = check_next_signaling_state(
1015 cur,
1016 RTCSignalingState::Stable,
1017 StateChangeOp::SetLocal,
1018 sd.sdp_type,
1019 );
1020 if next_state.is_ok() {
1021 let pending_remote_description = {
1022 let mut pending_remote_description =
1023 self.internal.pending_remote_description.lock().await;
1024 pending_remote_description.take()
1025 };
1026 let _pending_local_description = {
1027 let mut pending_local_description =
1028 self.internal.pending_local_description.lock().await;
1029 pending_local_description.take()
1030 };
1031
1032 {
1033 let mut current_local_description =
1034 self.internal.current_local_description.lock().await;
1035 *current_local_description = Some(sd.clone());
1036 }
1037 {
1038 let mut current_remote_description =
1039 self.internal.current_remote_description.lock().await;
1040 *current_remote_description = pending_remote_description;
1041 }
1042 }
1043 next_state
1044 }
1045 }
1046 RTCSdpType::Rollback => {
1047 let next_state = check_next_signaling_state(
1048 cur,
1049 RTCSignalingState::Stable,
1050 StateChangeOp::SetLocal,
1051 sd.sdp_type,
1052 );
1053 if next_state.is_ok() {
1054 let mut pending_local_description =
1055 self.internal.pending_local_description.lock().await;
1056 *pending_local_description = None;
1057 }
1058 next_state
1059 }
1060 // have-remote-offer->SetLocal(pranswer)->have-local-pranswer
1061 RTCSdpType::Pranswer => {
1062 let check = {
1063 let last_answer = self.internal.last_answer.lock().await;
1064 sd.sdp != *last_answer
1065 };
1066 if check {
1067 Err(new_sdpdoes_not_match_answer)
1068 } else {
1069 let next_state = check_next_signaling_state(
1070 cur,
1071 RTCSignalingState::HaveLocalPranswer,
1072 StateChangeOp::SetLocal,
1073 sd.sdp_type,
1074 );
1075 if next_state.is_ok() {
1076 let mut pending_local_description =
1077 self.internal.pending_local_description.lock().await;
1078 *pending_local_description = Some(sd.clone());
1079 }
1080 next_state
1081 }
1082 }
1083 _ => Err(Error::ErrPeerConnStateChangeInvalid),
1084 }
1085 }
1086 StateChangeOp::SetRemote => {
1087 match sd.sdp_type {
1088 // stable->SetRemote(offer)->have-remote-offer
1089 RTCSdpType::Offer => {
1090 let next_state = check_next_signaling_state(
1091 cur,
1092 RTCSignalingState::HaveRemoteOffer,
1093 StateChangeOp::SetRemote,
1094 sd.sdp_type,
1095 );
1096 if next_state.is_ok() {
1097 let mut pending_remote_description =
1098 self.internal.pending_remote_description.lock().await;
1099 *pending_remote_description = Some(sd.clone());
1100 }
1101 next_state
1102 }
1103 // have-local-offer->SetRemote(answer)->stable
1104 // have-remote-pranswer->SetRemote(answer)->stable
1105 RTCSdpType::Answer => {
1106 let next_state = check_next_signaling_state(
1107 cur,
1108 RTCSignalingState::Stable,
1109 StateChangeOp::SetRemote,
1110 sd.sdp_type,
1111 );
1112 if next_state.is_ok() {
1113 let pending_local_description = {
1114 let mut pending_local_description =
1115 self.internal.pending_local_description.lock().await;
1116 pending_local_description.take()
1117 };
1118
1119 let _pending_remote_description = {
1120 let mut pending_remote_description =
1121 self.internal.pending_remote_description.lock().await;
1122 pending_remote_description.take()
1123 };
1124
1125 {
1126 let mut current_remote_description =
1127 self.internal.current_remote_description.lock().await;
1128 *current_remote_description = Some(sd.clone());
1129 }
1130 {
1131 let mut current_local_description =
1132 self.internal.current_local_description.lock().await;
1133 *current_local_description = pending_local_description;
1134 }
1135 }
1136 next_state
1137 }
1138 RTCSdpType::Rollback => {
1139 let next_state = check_next_signaling_state(
1140 cur,
1141 RTCSignalingState::Stable,
1142 StateChangeOp::SetRemote,
1143 sd.sdp_type,
1144 );
1145 if next_state.is_ok() {
1146 let mut pending_remote_description =
1147 self.internal.pending_remote_description.lock().await;
1148 *pending_remote_description = None;
1149 }
1150 next_state
1151 }
1152 // have-local-offer->SetRemote(pranswer)->have-remote-pranswer
1153 RTCSdpType::Pranswer => {
1154 let next_state = check_next_signaling_state(
1155 cur,
1156 RTCSignalingState::HaveRemotePranswer,
1157 StateChangeOp::SetRemote,
1158 sd.sdp_type,
1159 );
1160 if next_state.is_ok() {
1161 let mut pending_remote_description =
1162 self.internal.pending_remote_description.lock().await;
1163 *pending_remote_description = Some(sd.clone());
1164 }
1165 next_state
1166 }
1167 _ => Err(Error::ErrPeerConnStateChangeInvalid),
1168 }
1169 } //_ => Err(Error::ErrPeerConnStateChangeUnhandled.into()),
1170 }
1171 };
1172
1173 match next_state {
1174 Ok(next_state) => {
1175 self.internal
1176 .signaling_state
1177 .store(next_state as u8, Ordering::SeqCst);
1178 if self.signaling_state() == RTCSignalingState::Stable {
1179 self.internal
1180 .is_negotiation_needed
1181 .store(false, Ordering::SeqCst);
1182 self.internal.trigger_negotiation_needed().await;
1183 }
1184 self.do_signaling_state_change(next_state).await;
1185 Ok(())
1186 }
1187 Err(err) => Err(err),
1188 }
1189 }
1190
1191 /// set_local_description sets the SessionDescription of the local peer
set_local_description(&self, mut desc: RTCSessionDescription) -> Result<()>1192 pub async fn set_local_description(&self, mut desc: RTCSessionDescription) -> Result<()> {
1193 if self.internal.is_closed.load(Ordering::SeqCst) {
1194 return Err(Error::ErrConnectionClosed);
1195 }
1196
1197 let have_local_description = {
1198 let current_local_description = self.internal.current_local_description.lock().await;
1199 current_local_description.is_some()
1200 };
1201
1202 // JSEP 5.4
1203 if desc.sdp.is_empty() {
1204 match desc.sdp_type {
1205 RTCSdpType::Answer | RTCSdpType::Pranswer => {
1206 let last_answer = self.internal.last_answer.lock().await;
1207 desc.sdp = last_answer.clone();
1208 }
1209 RTCSdpType::Offer => {
1210 let last_offer = self.internal.last_offer.lock().await;
1211 desc.sdp = last_offer.clone();
1212 }
1213 _ => return Err(Error::ErrPeerConnSDPTypeInvalidValueSetLocalDescription),
1214 }
1215 }
1216
1217 desc.parsed = Some(desc.unmarshal()?);
1218 self.set_description(&desc, StateChangeOp::SetLocal).await?;
1219
1220 let we_answer = desc.sdp_type == RTCSdpType::Answer;
1221 let remote_description = self.remote_description().await;
1222 let mut local_transceivers = self.get_transceivers().await;
1223 if we_answer {
1224 if let Some(parsed) = desc.parsed {
1225 // WebRTC Spec 1.0 https://www.w3.org/TR/webrtc/
1226 // Section 4.4.1.5
1227 for media in &parsed.media_descriptions {
1228 if media.media_name.media == MEDIA_SECTION_APPLICATION {
1229 continue;
1230 }
1231
1232 let kind = RTPCodecType::from(media.media_name.media.as_str());
1233 let direction = get_peer_direction(media);
1234 if kind == RTPCodecType::Unspecified
1235 || direction == RTCRtpTransceiverDirection::Unspecified
1236 {
1237 continue;
1238 }
1239
1240 let mid_value = match get_mid_value(media) {
1241 Some(mid) if !mid.is_empty() => mid,
1242 _ => continue,
1243 };
1244
1245 let t = match find_by_mid(mid_value, &mut local_transceivers).await {
1246 Some(t) => t,
1247 None => continue,
1248 };
1249 let previous_direction = t.current_direction();
1250 // 4.9.1.7.3 applying a local answer or pranswer
1251 // Set transceiver.[[CurrentDirection]] and transceiver.[[FiredDirection]] to direction.
1252
1253 // TODO: Also set FiredDirection here.
1254 t.set_current_direction(direction);
1255 t.process_new_current_direction(previous_direction).await?;
1256 }
1257 }
1258
1259 if let Some(remote_desc) = remote_description {
1260 self.start_rtp_senders().await?;
1261
1262 let pci = Arc::clone(&self.internal);
1263 let remote_desc = Arc::new(remote_desc);
1264 self.internal
1265 .ops
1266 .enqueue(Operation::new(
1267 move || {
1268 let pc = Arc::clone(&pci);
1269 let rd = Arc::clone(&remote_desc);
1270 Box::pin(async move {
1271 let _ = pc.start_rtp(have_local_description, rd).await;
1272 false
1273 })
1274 },
1275 "set_local_description",
1276 ))
1277 .await?;
1278 }
1279 }
1280
1281 if self.internal.ice_gatherer.state() == RTCIceGathererState::New {
1282 self.internal.ice_gatherer.gather().await
1283 } else {
1284 Ok(())
1285 }
1286 }
1287
1288 /// local_description returns PendingLocalDescription if it is not null and
1289 /// otherwise it returns CurrentLocalDescription. This property is used to
1290 /// determine if set_local_description has already been called.
1291 /// <https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-localdescription>
local_description(&self) -> Option<RTCSessionDescription>1292 pub async fn local_description(&self) -> Option<RTCSessionDescription> {
1293 if let Some(pending_local_description) = self.pending_local_description().await {
1294 return Some(pending_local_description);
1295 }
1296 self.current_local_description().await
1297 }
1298
1299 /// set_remote_description sets the SessionDescription of the remote peer
set_remote_description(&self, mut desc: RTCSessionDescription) -> Result<()>1300 pub async fn set_remote_description(&self, mut desc: RTCSessionDescription) -> Result<()> {
1301 if self.internal.is_closed.load(Ordering::SeqCst) {
1302 return Err(Error::ErrConnectionClosed);
1303 }
1304
1305 let is_renegotation = {
1306 let current_remote_description = self.internal.current_remote_description.lock().await;
1307 current_remote_description.is_some()
1308 };
1309
1310 desc.parsed = Some(desc.unmarshal()?);
1311 self.set_description(&desc, StateChangeOp::SetRemote)
1312 .await?;
1313
1314 if let Some(parsed) = &desc.parsed {
1315 self.internal
1316 .media_engine
1317 .update_from_remote_description(parsed)
1318 .await?;
1319
1320 let mut local_transceivers = self.get_transceivers().await;
1321 let remote_description = self.remote_description().await;
1322 let we_offer = desc.sdp_type == RTCSdpType::Answer;
1323
1324 if !we_offer {
1325 if let Some(parsed) = remote_description.as_ref().and_then(|r| r.parsed.as_ref()) {
1326 for media in &parsed.media_descriptions {
1327 let mid_value = match get_mid_value(media) {
1328 Some(m) => {
1329 if m.is_empty() {
1330 return Err(Error::ErrPeerConnRemoteDescriptionWithoutMidValue);
1331 } else {
1332 m
1333 }
1334 }
1335 None => continue,
1336 };
1337
1338 if media.media_name.media == MEDIA_SECTION_APPLICATION {
1339 continue;
1340 }
1341
1342 let kind = RTPCodecType::from(media.media_name.media.as_str());
1343 let direction = get_peer_direction(media);
1344 if kind == RTPCodecType::Unspecified
1345 || direction == RTCRtpTransceiverDirection::Unspecified
1346 {
1347 continue;
1348 }
1349
1350 let t = if let Some(t) =
1351 find_by_mid(mid_value, &mut local_transceivers).await
1352 {
1353 Some(t)
1354 } else {
1355 satisfy_type_and_direction(kind, direction, &mut local_transceivers)
1356 .await
1357 };
1358
1359 if let Some(t) = t {
1360 if t.mid().is_none() {
1361 t.set_mid(mid_value.to_owned())?;
1362 }
1363 } else {
1364 let local_direction =
1365 if direction == RTCRtpTransceiverDirection::Recvonly {
1366 RTCRtpTransceiverDirection::Sendonly
1367 } else {
1368 RTCRtpTransceiverDirection::Recvonly
1369 };
1370
1371 let receive_mtu = self.internal.setting_engine.get_receive_mtu();
1372
1373 let receiver = Arc::new(RTCRtpReceiver::new(
1374 receive_mtu,
1375 kind,
1376 Arc::clone(&self.internal.dtls_transport),
1377 Arc::clone(&self.internal.media_engine),
1378 Arc::clone(&self.interceptor),
1379 ));
1380
1381 let sender = Arc::new(
1382 RTCRtpSender::new(
1383 receive_mtu,
1384 None,
1385 Arc::clone(&self.internal.dtls_transport),
1386 Arc::clone(&self.internal.media_engine),
1387 Arc::clone(&self.interceptor),
1388 false,
1389 )
1390 .await,
1391 );
1392
1393 let t = RTCRtpTransceiver::new(
1394 receiver,
1395 sender,
1396 local_direction,
1397 kind,
1398 vec![],
1399 Arc::clone(&self.internal.media_engine),
1400 Some(Box::new(self.internal.make_negotiation_needed_trigger())),
1401 )
1402 .await;
1403
1404 self.internal.add_rtp_transceiver(Arc::clone(&t)).await;
1405
1406 if t.mid().is_none() {
1407 t.set_mid(mid_value.to_owned())?;
1408 }
1409 }
1410 }
1411 }
1412 }
1413
1414 if we_offer {
1415 // WebRTC Spec 1.0 https://www.w3.org/TR/webrtc/
1416 // 4.5.9.2
1417 // This is an answer from the remote.
1418 if let Some(parsed) = remote_description.as_ref().and_then(|r| r.parsed.as_ref()) {
1419 for media in &parsed.media_descriptions {
1420 let mid_value = match get_mid_value(media) {
1421 Some(m) => {
1422 if m.is_empty() {
1423 return Err(Error::ErrPeerConnRemoteDescriptionWithoutMidValue);
1424 } else {
1425 m
1426 }
1427 }
1428 None => continue,
1429 };
1430
1431 if media.media_name.media == MEDIA_SECTION_APPLICATION {
1432 continue;
1433 }
1434 let kind = RTPCodecType::from(media.media_name.media.as_str());
1435 let direction = get_peer_direction(media);
1436 if kind == RTPCodecType::Unspecified
1437 || direction == RTCRtpTransceiverDirection::Unspecified
1438 {
1439 continue;
1440 }
1441
1442 if let Some(t) = find_by_mid(mid_value, &mut local_transceivers).await {
1443 let previous_direction = t.current_direction();
1444
1445 // 4.5.9.2.9
1446 // Let direction be an RTCRtpTransceiverDirection value representing the direction
1447 // from the media description, but with the send and receive directions reversed to
1448 // represent this peer's point of view. If the media description is rejected,
1449 // set direction to "inactive".
1450 let reversed_direction = direction.reverse();
1451
1452 // 4.5.9.2.13.2
1453 // Set transceiver.[[CurrentDirection]] and transceiver.[[Direction]]s to direction.
1454 t.set_current_direction(reversed_direction);
1455 // TODO: According to the specification we should set
1456 // transceiver.[[Direction]] here, however libWebrtc doesn't do this.
1457 // NOTE: After raising this it seems like the specification might
1458 // change to remove the setting of transceiver.[[Direction]].
1459 // See https://github.com/w3c/webrtc-pc/issues/2751#issuecomment-1185901962
1460 // t.set_direction_internal(reversed_direction);
1461 t.process_new_current_direction(previous_direction).await?;
1462 }
1463 }
1464 }
1465 }
1466
1467 let (remote_ufrag, remote_pwd, candidates) = extract_ice_details(parsed).await?;
1468
1469 if is_renegotation
1470 && self
1471 .internal
1472 .ice_transport
1473 .have_remote_credentials_change(&remote_ufrag, &remote_pwd)
1474 .await
1475 {
1476 // An ICE Restart only happens implicitly for a set_remote_description of type offer
1477 if !we_offer {
1478 self.internal.ice_transport.restart().await?;
1479 }
1480
1481 self.internal
1482 .ice_transport
1483 .set_remote_credentials(remote_ufrag.clone(), remote_pwd.clone())
1484 .await?;
1485 }
1486
1487 for candidate in candidates {
1488 self.internal
1489 .ice_transport
1490 .add_remote_candidate(Some(candidate))
1491 .await?;
1492 }
1493
1494 if is_renegotation {
1495 if we_offer {
1496 self.start_rtp_senders().await?;
1497
1498 let pci = Arc::clone(&self.internal);
1499 let remote_desc = Arc::new(desc);
1500 self.internal
1501 .ops
1502 .enqueue(Operation::new(
1503 move || {
1504 let pc = Arc::clone(&pci);
1505 let rd = Arc::clone(&remote_desc);
1506 Box::pin(async move {
1507 let _ = pc.start_rtp(true, rd).await;
1508 false
1509 })
1510 },
1511 "set_remote_description renegotiation",
1512 ))
1513 .await?;
1514 }
1515 return Ok(());
1516 }
1517
1518 let mut remote_is_lite = false;
1519 for a in &parsed.attributes {
1520 if a.key.trim() == ATTR_KEY_ICELITE {
1521 remote_is_lite = true;
1522 break;
1523 }
1524 }
1525
1526 let (fingerprint, fingerprint_hash) = extract_fingerprint(parsed)?;
1527
1528 // If one of the agents is lite and the other one is not, the lite agent must be the controlling agent.
1529 // If both or neither agents are lite the offering agent is controlling.
1530 // RFC 8445 S6.1.1
1531 let ice_role = if (we_offer
1532 && remote_is_lite == self.internal.setting_engine.candidates.ice_lite)
1533 || (remote_is_lite && !self.internal.setting_engine.candidates.ice_lite)
1534 {
1535 RTCIceRole::Controlling
1536 } else {
1537 RTCIceRole::Controlled
1538 };
1539
1540 // Start the networking in a new routine since it will block until
1541 // the connection is actually established.
1542 if we_offer {
1543 self.start_rtp_senders().await?;
1544 }
1545
1546 //log::trace!("start_transports: parsed={:?}", parsed);
1547
1548 let pci = Arc::clone(&self.internal);
1549 let dtls_role = DTLSRole::from(parsed);
1550 let remote_desc = Arc::new(desc);
1551 self.internal
1552 .ops
1553 .enqueue(Operation::new(
1554 move || {
1555 let pc = Arc::clone(&pci);
1556 let rd = Arc::clone(&remote_desc);
1557 let ru = remote_ufrag.clone();
1558 let rp = remote_pwd.clone();
1559 let fp = fingerprint.clone();
1560 let fp_hash = fingerprint_hash.clone();
1561 Box::pin(async move {
1562 log::trace!(
1563 "start_transports: ice_role={}, dtls_role={}",
1564 ice_role,
1565 dtls_role,
1566 );
1567 pc.start_transports(ice_role, dtls_role, ru, rp, fp, fp_hash)
1568 .await;
1569
1570 if we_offer {
1571 let _ = pc.start_rtp(false, rd).await;
1572 }
1573 false
1574 })
1575 },
1576 "set_remote_description",
1577 ))
1578 .await?;
1579 }
1580
1581 Ok(())
1582 }
1583
1584 /// start_rtp_senders starts all outbound RTP streams
start_rtp_senders(&self) -> Result<()>1585 pub(crate) async fn start_rtp_senders(&self) -> Result<()> {
1586 let current_transceivers = self.internal.rtp_transceivers.lock().await;
1587 for transceiver in &*current_transceivers {
1588 let sender = transceiver.sender();
1589 if sender.is_negotiated() && !sender.has_sent() {
1590 sender.send(&sender.get_parameters().await).await?;
1591 }
1592 }
1593
1594 Ok(())
1595 }
1596
1597 /// remote_description returns pending_remote_description if it is not null and
1598 /// otherwise it returns current_remote_description. This property is used to
1599 /// determine if setRemoteDescription has already been called.
1600 /// <https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-remotedescription>
remote_description(&self) -> Option<RTCSessionDescription>1601 pub async fn remote_description(&self) -> Option<RTCSessionDescription> {
1602 self.internal.remote_description().await
1603 }
1604
1605 /// add_ice_candidate accepts an ICE candidate string and adds it
1606 /// to the existing set of candidates.
add_ice_candidate(&self, candidate: RTCIceCandidateInit) -> Result<()>1607 pub async fn add_ice_candidate(&self, candidate: RTCIceCandidateInit) -> Result<()> {
1608 if self.remote_description().await.is_none() {
1609 return Err(Error::ErrNoRemoteDescription);
1610 }
1611
1612 let candidate_value = match candidate.candidate.strip_prefix("candidate:") {
1613 Some(s) => s,
1614 None => candidate.candidate.as_str(),
1615 };
1616
1617 let ice_candidate = if !candidate_value.is_empty() {
1618 let candidate: Arc<dyn Candidate + Send + Sync> =
1619 Arc::new(unmarshal_candidate(candidate_value)?);
1620
1621 Some(RTCIceCandidate::from(&candidate))
1622 } else {
1623 None
1624 };
1625
1626 self.internal
1627 .ice_transport
1628 .add_remote_candidate(ice_candidate)
1629 .await
1630 }
1631
1632 /// ice_connection_state returns the ICE connection state of the
1633 /// PeerConnection instance.
ice_connection_state(&self) -> RTCIceConnectionState1634 pub fn ice_connection_state(&self) -> RTCIceConnectionState {
1635 self.internal
1636 .ice_connection_state
1637 .load(Ordering::SeqCst)
1638 .into()
1639 }
1640
1641 /// get_senders returns the RTPSender that are currently attached to this PeerConnection
get_senders(&self) -> Vec<Arc<RTCRtpSender>>1642 pub async fn get_senders(&self) -> Vec<Arc<RTCRtpSender>> {
1643 let mut senders = vec![];
1644 let rtp_transceivers = self.internal.rtp_transceivers.lock().await;
1645 for transceiver in &*rtp_transceivers {
1646 let sender = transceiver.sender();
1647 senders.push(sender);
1648 }
1649 senders
1650 }
1651
1652 /// get_receivers returns the RTPReceivers that are currently attached to this PeerConnection
get_receivers(&self) -> Vec<Arc<RTCRtpReceiver>>1653 pub async fn get_receivers(&self) -> Vec<Arc<RTCRtpReceiver>> {
1654 let mut receivers = vec![];
1655 let rtp_transceivers = self.internal.rtp_transceivers.lock().await;
1656 for transceiver in &*rtp_transceivers {
1657 receivers.push(transceiver.receiver());
1658 }
1659 receivers
1660 }
1661
1662 /// get_transceivers returns the RtpTransceiver that are currently attached to this PeerConnection
get_transceivers(&self) -> Vec<Arc<RTCRtpTransceiver>>1663 pub async fn get_transceivers(&self) -> Vec<Arc<RTCRtpTransceiver>> {
1664 let rtp_transceivers = self.internal.rtp_transceivers.lock().await;
1665 rtp_transceivers.clone()
1666 }
1667
1668 /// add_track adds a Track to the PeerConnection
add_track( &self, track: Arc<dyn TrackLocal + Send + Sync>, ) -> Result<Arc<RTCRtpSender>>1669 pub async fn add_track(
1670 &self,
1671 track: Arc<dyn TrackLocal + Send + Sync>,
1672 ) -> Result<Arc<RTCRtpSender>> {
1673 if self.internal.is_closed.load(Ordering::SeqCst) {
1674 return Err(Error::ErrConnectionClosed);
1675 }
1676
1677 {
1678 let rtp_transceivers = self.internal.rtp_transceivers.lock().await;
1679 for t in &*rtp_transceivers {
1680 if !t.stopped.load(Ordering::SeqCst) && t.kind == track.kind() {
1681 let sender = t.sender();
1682 if sender.track().await.is_none() {
1683 if let Err(err) = sender.replace_track(Some(track)).await {
1684 let _ = sender.stop().await;
1685 return Err(err);
1686 }
1687
1688 t.set_direction_internal(RTCRtpTransceiverDirection::from_send_recv(
1689 true,
1690 t.direction().has_recv(),
1691 ));
1692
1693 self.internal.trigger_negotiation_needed().await;
1694 return Ok(sender);
1695 }
1696 }
1697 }
1698 }
1699
1700 let transceiver = self
1701 .internal
1702 .new_transceiver_from_track(RTCRtpTransceiverDirection::Sendrecv, track)
1703 .await?;
1704 self.internal
1705 .add_rtp_transceiver(Arc::clone(&transceiver))
1706 .await;
1707
1708 Ok(transceiver.sender())
1709 }
1710
1711 /// remove_track removes a Track from the PeerConnection
remove_track(&self, sender: &Arc<RTCRtpSender>) -> Result<()>1712 pub async fn remove_track(&self, sender: &Arc<RTCRtpSender>) -> Result<()> {
1713 if self.internal.is_closed.load(Ordering::SeqCst) {
1714 return Err(Error::ErrConnectionClosed);
1715 }
1716
1717 let mut transceiver = None;
1718 {
1719 let rtp_transceivers = self.internal.rtp_transceivers.lock().await;
1720 for t in &*rtp_transceivers {
1721 if t.sender().id == sender.id {
1722 if sender.track().await.is_none() {
1723 return Ok(());
1724 }
1725 transceiver = Some(t.clone());
1726 break;
1727 }
1728 }
1729 }
1730
1731 let t = transceiver.ok_or(Error::ErrSenderNotCreatedByConnection)?;
1732
1733 // This also happens in `set_sending_track` but we need to make sure we do this
1734 // before we call sender.stop to avoid a race condition when removing tracks and
1735 // generating offers.
1736 t.set_direction_internal(RTCRtpTransceiverDirection::from_send_recv(
1737 false,
1738 t.direction().has_recv(),
1739 ));
1740 // Stop the sender
1741 let sender_result = sender.stop().await;
1742 // This also updates direction
1743 let sending_track_result = t.set_sending_track(None).await;
1744
1745 if sender_result.is_ok() && sending_track_result.is_ok() {
1746 self.internal.trigger_negotiation_needed().await;
1747 }
1748 Ok(())
1749 }
1750
1751 /// add_transceiver_from_kind Create a new RtpTransceiver and adds it to the set of transceivers.
add_transceiver_from_kind( &self, kind: RTPCodecType, init: Option<RTCRtpTransceiverInit>, ) -> Result<Arc<RTCRtpTransceiver>>1752 pub async fn add_transceiver_from_kind(
1753 &self,
1754 kind: RTPCodecType,
1755 init: Option<RTCRtpTransceiverInit>,
1756 ) -> Result<Arc<RTCRtpTransceiver>> {
1757 self.internal.add_transceiver_from_kind(kind, init).await
1758 }
1759
1760 /// add_transceiver_from_track Create a new RtpTransceiver(SendRecv or SendOnly) and add it to the set of transceivers.
add_transceiver_from_track( &self, track: Arc<dyn TrackLocal + Send + Sync>, init: Option<RTCRtpTransceiverInit>, ) -> Result<Arc<RTCRtpTransceiver>>1761 pub async fn add_transceiver_from_track(
1762 &self,
1763 track: Arc<dyn TrackLocal + Send + Sync>,
1764 init: Option<RTCRtpTransceiverInit>,
1765 ) -> Result<Arc<RTCRtpTransceiver>> {
1766 if self.internal.is_closed.load(Ordering::SeqCst) {
1767 return Err(Error::ErrConnectionClosed);
1768 }
1769
1770 let direction = init
1771 .map(|init| init.direction)
1772 .unwrap_or(RTCRtpTransceiverDirection::Sendrecv);
1773
1774 let t = self
1775 .internal
1776 .new_transceiver_from_track(direction, track)
1777 .await?;
1778
1779 self.internal.add_rtp_transceiver(Arc::clone(&t)).await;
1780
1781 Ok(t)
1782 }
1783
1784 /// create_data_channel creates a new DataChannel object with the given label
1785 /// and optional DataChannelInit used to configure properties of the
1786 /// underlying channel such as data reliability.
create_data_channel( &self, label: &str, options: Option<RTCDataChannelInit>, ) -> Result<Arc<RTCDataChannel>>1787 pub async fn create_data_channel(
1788 &self,
1789 label: &str,
1790 options: Option<RTCDataChannelInit>,
1791 ) -> Result<Arc<RTCDataChannel>> {
1792 // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #2)
1793 if self.internal.is_closed.load(Ordering::SeqCst) {
1794 return Err(Error::ErrConnectionClosed);
1795 }
1796
1797 let mut params = DataChannelParameters {
1798 label: label.to_owned(),
1799 ordered: true,
1800 ..Default::default()
1801 };
1802
1803 // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #19)
1804 if let Some(options) = options {
1805 // Ordered indicates if data is allowed to be delivered out of order. The
1806 // default value of true, guarantees that data will be delivered in order.
1807 // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #9)
1808 if let Some(ordered) = options.ordered {
1809 params.ordered = ordered;
1810 }
1811
1812 // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #7)
1813 if let Some(max_packet_life_time) = options.max_packet_life_time {
1814 params.max_packet_life_time = max_packet_life_time;
1815 }
1816
1817 // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #8)
1818 if let Some(max_retransmits) = options.max_retransmits {
1819 params.max_retransmits = max_retransmits;
1820 }
1821
1822 // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #10)
1823 if let Some(protocol) = options.protocol {
1824 params.protocol = protocol;
1825 }
1826
1827 // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #11)
1828 if params.protocol.len() > 65535 {
1829 return Err(Error::ErrProtocolTooLarge);
1830 }
1831
1832 // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #12)
1833 params.negotiated = options.negotiated;
1834 }
1835
1836 let d = Arc::new(RTCDataChannel::new(
1837 params,
1838 Arc::clone(&self.internal.setting_engine),
1839 ));
1840
1841 // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #16)
1842 if d.max_packet_lifetime != 0 && d.max_retransmits != 0 {
1843 return Err(Error::ErrRetransmitsOrPacketLifeTime);
1844 }
1845
1846 {
1847 let mut data_channels = self.internal.sctp_transport.data_channels.lock().await;
1848 data_channels.push(Arc::clone(&d));
1849 }
1850 self.internal
1851 .sctp_transport
1852 .data_channels_requested
1853 .fetch_add(1, Ordering::SeqCst);
1854
1855 // If SCTP already connected open all the channels
1856 if self.internal.sctp_transport.state() == RTCSctpTransportState::Connected {
1857 d.open(Arc::clone(&self.internal.sctp_transport)).await?;
1858 }
1859
1860 self.internal.trigger_negotiation_needed().await;
1861
1862 Ok(d)
1863 }
1864
1865 /// set_identity_provider is used to configure an identity provider to generate identity assertions
set_identity_provider(&self, _provider: &str) -> Result<()>1866 pub fn set_identity_provider(&self, _provider: &str) -> Result<()> {
1867 Err(Error::ErrPeerConnSetIdentityProviderNotImplemented)
1868 }
1869
1870 /// write_rtcp sends a user provided RTCP packet to the connected peer. If no peer is connected the
1871 /// packet is discarded. It also runs any configured interceptors.
write_rtcp( &self, pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>], ) -> Result<usize>1872 pub async fn write_rtcp(
1873 &self,
1874 pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>],
1875 ) -> Result<usize> {
1876 let a = Attributes::new();
1877 Ok(self.interceptor_rtcp_writer.write(pkts, &a).await?)
1878 }
1879
1880 /// close ends the PeerConnection
close(&self) -> Result<()>1881 pub async fn close(&self) -> Result<()> {
1882 // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #1)
1883 if self.internal.is_closed.load(Ordering::SeqCst) {
1884 return Ok(());
1885 }
1886
1887 // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #2)
1888 self.internal.is_closed.store(true, Ordering::SeqCst);
1889
1890 // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #3)
1891 self.internal
1892 .signaling_state
1893 .store(RTCSignalingState::Closed as u8, Ordering::SeqCst);
1894
1895 // Try closing everything and collect the errors
1896 // Shutdown strategy:
1897 // 1. All Conn close by closing their underlying Conn.
1898 // 2. A Mux stops this chain. It won't close the underlying
1899 // Conn if one of the endpoints is closed down. To
1900 // continue the chain the Mux has to be closed.
1901 let mut close_errs = vec![];
1902
1903 if let Err(err) = self.interceptor.close().await {
1904 close_errs.push(Error::new(format!("interceptor: {err}")));
1905 }
1906
1907 // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #4)
1908 {
1909 let mut rtp_transceivers = self.internal.rtp_transceivers.lock().await;
1910 for t in &*rtp_transceivers {
1911 if let Err(err) = t.stop().await {
1912 close_errs.push(Error::new(format!("rtp_transceivers: {err}")));
1913 }
1914 }
1915 rtp_transceivers.clear();
1916 }
1917
1918 // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #5)
1919 {
1920 let mut data_channels = self.internal.sctp_transport.data_channels.lock().await;
1921 for d in &*data_channels {
1922 if let Err(err) = d.close().await {
1923 close_errs.push(Error::new(format!("data_channels: {err}")));
1924 }
1925 }
1926 data_channels.clear();
1927 }
1928
1929 // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #6)
1930 if let Err(err) = self.internal.sctp_transport.stop().await {
1931 close_errs.push(Error::new(format!("sctp_transport: {err}")));
1932 }
1933
1934 // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #7)
1935 if let Err(err) = self.internal.dtls_transport.stop().await {
1936 close_errs.push(Error::new(format!("dtls_transport: {err}")));
1937 }
1938
1939 // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #8, #9, #10)
1940 if let Err(err) = self.internal.ice_transport.stop().await {
1941 close_errs.push(Error::new(format!("dtls_transport: {err}")));
1942 }
1943
1944 // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #11)
1945 RTCPeerConnection::update_connection_state(
1946 &self.internal.on_peer_connection_state_change_handler,
1947 &self.internal.is_closed,
1948 &self.internal.peer_connection_state,
1949 self.ice_connection_state(),
1950 self.internal.dtls_transport.state(),
1951 )
1952 .await;
1953
1954 if let Err(err) = self.internal.ops.close().await {
1955 close_errs.push(Error::new(format!("ops: {err}")));
1956 }
1957
1958 flatten_errs(close_errs)
1959 }
1960
1961 /// CurrentLocalDescription represents the local description that was
1962 /// successfully negotiated the last time the PeerConnection transitioned
1963 /// into the stable state plus any local candidates that have been generated
1964 /// by the ICEAgent since the offer or answer was created.
current_local_description(&self) -> Option<RTCSessionDescription>1965 pub async fn current_local_description(&self) -> Option<RTCSessionDescription> {
1966 let local_description = {
1967 let current_local_description = self.internal.current_local_description.lock().await;
1968 current_local_description.clone()
1969 };
1970 let ice_gather = Some(&self.internal.ice_gatherer);
1971 let ice_gathering_state = self.ice_gathering_state();
1972
1973 populate_local_candidates(local_description.as_ref(), ice_gather, ice_gathering_state).await
1974 }
1975
1976 /// PendingLocalDescription represents a local description that is in the
1977 /// process of being negotiated plus any local candidates that have been
1978 /// generated by the ICEAgent since the offer or answer was created. If the
1979 /// PeerConnection is in the stable state, the value is null.
pending_local_description(&self) -> Option<RTCSessionDescription>1980 pub async fn pending_local_description(&self) -> Option<RTCSessionDescription> {
1981 let local_description = {
1982 let pending_local_description = self.internal.pending_local_description.lock().await;
1983 pending_local_description.clone()
1984 };
1985 let ice_gather = Some(&self.internal.ice_gatherer);
1986 let ice_gathering_state = self.ice_gathering_state();
1987
1988 populate_local_candidates(local_description.as_ref(), ice_gather, ice_gathering_state).await
1989 }
1990
1991 /// current_remote_description represents the last remote description that was
1992 /// successfully negotiated the last time the PeerConnection transitioned
1993 /// into the stable state plus any remote candidates that have been supplied
1994 /// via add_icecandidate() since the offer or answer was created.
current_remote_description(&self) -> Option<RTCSessionDescription>1995 pub async fn current_remote_description(&self) -> Option<RTCSessionDescription> {
1996 let current_remote_description = self.internal.current_remote_description.lock().await;
1997 current_remote_description.clone()
1998 }
1999
2000 /// pending_remote_description represents a remote description that is in the
2001 /// process of being negotiated, complete with any remote candidates that
2002 /// have been supplied via add_icecandidate() since the offer or answer was
2003 /// created. If the PeerConnection is in the stable state, the value is
2004 /// null.
pending_remote_description(&self) -> Option<RTCSessionDescription>2005 pub async fn pending_remote_description(&self) -> Option<RTCSessionDescription> {
2006 let pending_remote_description = self.internal.pending_remote_description.lock().await;
2007 pending_remote_description.clone()
2008 }
2009
2010 /// signaling_state attribute returns the signaling state of the
2011 /// PeerConnection instance.
signaling_state(&self) -> RTCSignalingState2012 pub fn signaling_state(&self) -> RTCSignalingState {
2013 self.internal.signaling_state.load(Ordering::SeqCst).into()
2014 }
2015
2016 /// icegathering_state attribute returns the ICE gathering state of the
2017 /// PeerConnection instance.
ice_gathering_state(&self) -> RTCIceGatheringState2018 pub fn ice_gathering_state(&self) -> RTCIceGatheringState {
2019 self.internal.ice_gathering_state()
2020 }
2021
2022 /// connection_state attribute returns the connection state of the
2023 /// PeerConnection instance.
connection_state(&self) -> RTCPeerConnectionState2024 pub fn connection_state(&self) -> RTCPeerConnectionState {
2025 self.internal
2026 .peer_connection_state
2027 .load(Ordering::SeqCst)
2028 .into()
2029 }
2030
get_stats(&self) -> StatsReport2031 pub async fn get_stats(&self) -> StatsReport {
2032 self.internal
2033 .get_stats(self.get_stats_id().to_owned())
2034 .await
2035 .into()
2036 }
2037
2038 /// sctp returns the SCTPTransport for this PeerConnection
2039 ///
2040 /// The SCTP transport over which SCTP data is sent and received. If SCTP has not been negotiated, the value is nil.
2041 /// <https://www.w3.org/TR/webrtc/#attributes-15>
sctp(&self) -> Arc<RTCSctpTransport>2042 pub fn sctp(&self) -> Arc<RTCSctpTransport> {
2043 Arc::clone(&self.internal.sctp_transport)
2044 }
2045
2046 /// gathering_complete_promise is a Pion specific helper function that returns a channel that is closed when gathering is complete.
2047 /// This function may be helpful in cases where you are unable to trickle your ICE Candidates.
2048 ///
2049 /// It is better to not use this function, and instead trickle candidates. If you use this function you will see longer connection startup times.
2050 /// When the call is connected you will see no impact however.
gathering_complete_promise(&self) -> mpsc::Receiver<()>2051 pub async fn gathering_complete_promise(&self) -> mpsc::Receiver<()> {
2052 let (gathering_complete_tx, gathering_complete_rx) = mpsc::channel(1);
2053
2054 // It's possible to miss the GatherComplete event since setGatherCompleteHandler is an atomic operation and the
2055 // promise might have been created after the gathering is finished. Therefore, we need to check if the ICE gathering
2056 // state has changed to complete so that we don't block the caller forever.
2057 let done = Arc::new(Mutex::new(Some(gathering_complete_tx)));
2058 let done2 = Arc::clone(&done);
2059 self.internal.set_gather_complete_handler(Box::new(move || {
2060 log::trace!("setGatherCompleteHandler");
2061 let done3 = Arc::clone(&done2);
2062 Box::pin(async move {
2063 let mut d = done3.lock().await;
2064 d.take();
2065 })
2066 }));
2067
2068 if self.ice_gathering_state() == RTCIceGatheringState::Complete {
2069 log::trace!("ICEGatheringState::Complete");
2070 let mut d = done.lock().await;
2071 d.take();
2072 }
2073
2074 gathering_complete_rx
2075 }
2076
2077 /// Returns the internal [`RTCDtlsTransport`].
dtls_transport(&self) -> Arc<RTCDtlsTransport>2078 pub fn dtls_transport(&self) -> Arc<RTCDtlsTransport> {
2079 Arc::clone(&self.internal.dtls_transport)
2080 }
2081
2082 /// Adds the specified [`RTCRtpTransceiver`] to this [`RTCPeerConnection`].
add_transceiver(&self, t: Arc<RTCRtpTransceiver>)2083 pub async fn add_transceiver(&self, t: Arc<RTCRtpTransceiver>) {
2084 self.internal.add_rtp_transceiver(t).await
2085 }
2086 }
2087