1 #[cfg(test)] 2 mod rtp_receiver_test; 3 4 use crate::api::media_engine::MediaEngine; 5 use crate::dtls_transport::RTCDtlsTransport; 6 use crate::error::{flatten_errs, Error, Result}; 7 use crate::peer_connection::sdp::TrackDetails; 8 use crate::rtp_transceiver::rtp_codec::{ 9 codec_parameters_fuzzy_search, CodecMatch, RTCRtpCodecCapability, RTCRtpCodecParameters, 10 RTCRtpParameters, RTPCodecType, 11 }; 12 use crate::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection; 13 use crate::rtp_transceiver::{ 14 create_stream_info, RTCRtpDecodingParameters, RTCRtpReceiveParameters, SSRC, 15 }; 16 use crate::track::track_remote::TrackRemote; 17 use crate::track::{TrackStream, TrackStreams}; 18 19 use arc_swap::ArcSwapOption; 20 use interceptor::stream_info::RTPHeaderExtension; 21 use interceptor::{Attributes, Interceptor}; 22 use log::trace; 23 use std::fmt; 24 25 use std::sync::Arc; 26 use tokio::sync::{watch, Mutex, RwLock}; 27 28 #[derive(Debug, Copy, Clone, PartialEq, Eq)] 29 #[repr(u8)] 30 pub enum State { 31 /// We haven't started yet. 32 Unstarted = 0, 33 /// We haven't started yet and additionally we've been paused. 34 UnstartedPaused = 1, 35 36 /// We have started and are running. 37 Started = 2, 38 39 /// We have been paused after starting. 40 Paused = 3, 41 42 /// We have been stopped. 43 Stopped = 4, 44 } 45 46 impl From<u8> for State { from(value: u8) -> Self47 fn from(value: u8) -> Self { 48 match value { 49 v if v == State::Unstarted as u8 => State::Unstarted, 50 v if v == State::UnstartedPaused as u8 => State::UnstartedPaused, 51 v if v == State::Started as u8 => State::Started, 52 v if v == State::Paused as u8 => State::Paused, 53 v if v == State::Stopped as u8 => State::Stopped, 54 _ => unreachable!( 55 "Invalid serialization of {}: {}", 56 std::any::type_name::<Self>(), 57 value 58 ), 59 } 60 } 61 } 62 63 impl fmt::Display for State { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result64 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 65 match self { 66 State::Unstarted => write!(f, "Unstarted"), 67 State::UnstartedPaused => write!(f, "UnstartedPaused"), 68 State::Started => write!(f, "Running"), 69 State::Paused => write!(f, "Paused"), 70 State::Stopped => write!(f, "Closed"), 71 } 72 } 73 } 74 75 impl State { transition(to: Self, tx: &watch::Sender<State>) -> Result<()>76 fn transition(to: Self, tx: &watch::Sender<State>) -> Result<()> { 77 let current = *tx.borrow(); 78 if current == to { 79 // Already in this state 80 return Ok(()); 81 } 82 83 match current { 84 Self::Unstarted 85 if matches!(to, Self::Started | Self::Stopped | Self::UnstartedPaused) => 86 { 87 let _ = tx.send(to); 88 return Ok(()); 89 } 90 Self::UnstartedPaused 91 if matches!(to, Self::Unstarted | Self::Stopped | Self::Paused) => 92 { 93 let _ = tx.send(to); 94 return Ok(()); 95 } 96 State::Started if matches!(to, Self::Paused | Self::Stopped) => { 97 let _ = tx.send(to); 98 return Ok(()); 99 } 100 State::Paused if matches!(to, Self::Started | Self::Stopped) => { 101 let _ = tx.send(to); 102 return Ok(()); 103 } 104 _ => {} 105 } 106 107 Err(Error::ErrRTPReceiverStateChangeInvalid { from: current, to }) 108 } 109 wait_for(rx: &mut watch::Receiver<State>, states: &[State]) -> Result<()>110 async fn wait_for(rx: &mut watch::Receiver<State>, states: &[State]) -> Result<()> { 111 loop { 112 let state = *rx.borrow(); 113 114 match state { 115 _ if states.contains(&state) => return Ok(()), 116 State::Stopped => { 117 return Err(Error::ErrClosedPipe); 118 } 119 _ => {} 120 } 121 122 if rx.changed().await.is_err() { 123 return Err(Error::ErrClosedPipe); 124 } 125 } 126 } 127 error_on_close(rx: &mut watch::Receiver<State>) -> Result<()>128 async fn error_on_close(rx: &mut watch::Receiver<State>) -> Result<()> { 129 if rx.changed().await.is_err() { 130 return Err(Error::ErrClosedPipe); 131 } 132 133 let state = *rx.borrow(); 134 if state == State::Stopped { 135 return Err(Error::ErrClosedPipe); 136 } 137 138 Ok(()) 139 } 140 is_started(&self) -> bool141 fn is_started(&self) -> bool { 142 matches!(self, Self::Started | Self::Paused) 143 } 144 } 145 146 pub struct RTPReceiverInternal { 147 pub(crate) kind: RTPCodecType, 148 149 // State is stored within the channel 150 state_tx: watch::Sender<State>, 151 state_rx: watch::Receiver<State>, 152 153 tracks: RwLock<Vec<TrackStreams>>, 154 155 transceiver_codecs: ArcSwapOption<Mutex<Vec<RTCRtpCodecParameters>>>, 156 157 transport: Arc<RTCDtlsTransport>, 158 media_engine: Arc<MediaEngine>, 159 interceptor: Arc<dyn Interceptor + Send + Sync>, 160 } 161 162 impl RTPReceiverInternal { 163 /// read reads incoming RTCP for this RTPReceiver read(&self, b: &mut [u8]) -> Result<(usize, Attributes)>164 async fn read(&self, b: &mut [u8]) -> Result<(usize, Attributes)> { 165 let mut state_watch_rx = self.state_tx.subscribe(); 166 // Ensure we are running or paused. When paused we still receive RTCP even if RTP traffic 167 // isn't flowing. 168 State::wait_for(&mut state_watch_rx, &[State::Started, State::Paused]).await?; 169 170 let tracks = self.tracks.read().await; 171 if let Some(t) = tracks.first() { 172 if let Some(rtcp_interceptor) = &t.stream.rtcp_interceptor { 173 let a = Attributes::new(); 174 loop { 175 tokio::select! { 176 res = State::error_on_close(&mut state_watch_rx) => { 177 res? 178 } 179 result = rtcp_interceptor.read(b, &a) => { 180 return Ok(result?) 181 } 182 } 183 } 184 } else { 185 Err(Error::ErrInterceptorNotBind) 186 } 187 } else { 188 Err(Error::ErrExistingTrack) 189 } 190 } 191 192 /// read_simulcast reads incoming RTCP for this RTPReceiver for given rid read_simulcast(&self, b: &mut [u8], rid: &str) -> Result<(usize, Attributes)>193 async fn read_simulcast(&self, b: &mut [u8], rid: &str) -> Result<(usize, Attributes)> { 194 let mut state_watch_rx = self.state_tx.subscribe(); 195 196 // Ensure we are running or paused. When paused we still recevie RTCP even if RTP traffic 197 // isn't flowing. 198 State::wait_for(&mut state_watch_rx, &[State::Started, State::Paused]).await?; 199 200 let tracks = self.tracks.read().await; 201 for t in &*tracks { 202 if t.track.rid() == rid { 203 if let Some(rtcp_interceptor) = &t.stream.rtcp_interceptor { 204 let a = Attributes::new(); 205 206 loop { 207 tokio::select! { 208 res = State::error_on_close(&mut state_watch_rx) => { 209 res? 210 } 211 result = rtcp_interceptor.read(b, &a) => { 212 return Ok(result?); 213 } 214 } 215 } 216 } else { 217 return Err(Error::ErrInterceptorNotBind); 218 } 219 } 220 } 221 Err(Error::ErrRTPReceiverForRIDTrackStreamNotFound) 222 } 223 224 /// read_rtcp is a convenience method that wraps Read and unmarshal for you. 225 /// It also runs any configured interceptors. read_rtcp( &self, receive_mtu: usize, ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)>226 async fn read_rtcp( 227 &self, 228 receive_mtu: usize, 229 ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)> { 230 let mut b = vec![0u8; receive_mtu]; 231 let (n, attributes) = self.read(&mut b).await?; 232 233 let mut buf = &b[..n]; 234 let pkts = rtcp::packet::unmarshal(&mut buf)?; 235 236 Ok((pkts, attributes)) 237 } 238 239 /// read_simulcast_rtcp is a convenience method that wraps ReadSimulcast and unmarshal for you read_simulcast_rtcp( &self, rid: &str, receive_mtu: usize, ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)>240 async fn read_simulcast_rtcp( 241 &self, 242 rid: &str, 243 receive_mtu: usize, 244 ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)> { 245 let mut b = vec![0u8; receive_mtu]; 246 let (n, attributes) = self.read_simulcast(&mut b, rid).await?; 247 248 let mut buf = &b[..n]; 249 let pkts = rtcp::packet::unmarshal(&mut buf)?; 250 251 Ok((pkts, attributes)) 252 } 253 read_rtp(&self, b: &mut [u8], tid: usize) -> Result<(usize, Attributes)>254 pub(crate) async fn read_rtp(&self, b: &mut [u8], tid: usize) -> Result<(usize, Attributes)> { 255 let mut state_watch_rx = self.state_tx.subscribe(); 256 257 // Ensure we are running. 258 State::wait_for(&mut state_watch_rx, &[State::Started]).await?; 259 260 //log::debug!("read_rtp enter tracks tid {}", tid); 261 let mut rtp_interceptor = None; 262 //let mut ssrc = 0; 263 { 264 let tracks = self.tracks.read().await; 265 for t in &*tracks { 266 if t.track.tid() == tid { 267 rtp_interceptor = t.stream.rtp_interceptor.clone(); 268 //ssrc = t.track.ssrc(); 269 break; 270 } 271 } 272 }; 273 /*log::debug!( 274 "read_rtp exit tracks with rtp_interceptor {} with tid {}", 275 rtp_interceptor.is_some(), 276 tid, 277 );*/ 278 279 if let Some(rtp_interceptor) = rtp_interceptor { 280 let a = Attributes::new(); 281 //println!( 282 // "read_rtp rtp_interceptor.read enter with tid {} ssrc {}", 283 // tid, ssrc 284 //); 285 let mut current_state = *state_watch_rx.borrow(); 286 loop { 287 tokio::select! { 288 _ = state_watch_rx.changed() => { 289 let new_state = *state_watch_rx.borrow(); 290 291 if new_state == State::Stopped { 292 return Err(Error::ErrClosedPipe); 293 } 294 current_state = new_state; 295 } 296 result = rtp_interceptor.read(b, &a) => { 297 let result = result?; 298 299 if current_state == State::Paused { 300 trace!("Dropping {} read bytes received while RTPReceiver was paused", result.0); 301 continue; 302 } 303 return Ok(result); 304 } 305 } 306 } 307 } else { 308 //log::debug!("read_rtp exit tracks with ErrRTPReceiverWithSSRCTrackStreamNotFound"); 309 Err(Error::ErrRTPReceiverWithSSRCTrackStreamNotFound) 310 } 311 } 312 get_parameters(&self) -> RTCRtpParameters313 async fn get_parameters(&self) -> RTCRtpParameters { 314 let mut parameters = self 315 .media_engine 316 .get_rtp_parameters_by_kind(self.kind, RTCRtpTransceiverDirection::Recvonly); 317 318 let transceiver_codecs = self.transceiver_codecs.load(); 319 if let Some(codecs) = &*transceiver_codecs { 320 let mut c = codecs.lock().await; 321 parameters.codecs = 322 RTPReceiverInternal::get_codecs(&mut c, self.kind, &self.media_engine); 323 } 324 325 parameters 326 } 327 get_codecs( codecs: &mut [RTCRtpCodecParameters], kind: RTPCodecType, media_engine: &Arc<MediaEngine>, ) -> Vec<RTCRtpCodecParameters>328 pub(crate) fn get_codecs( 329 codecs: &mut [RTCRtpCodecParameters], 330 kind: RTPCodecType, 331 media_engine: &Arc<MediaEngine>, 332 ) -> Vec<RTCRtpCodecParameters> { 333 let media_engine_codecs = media_engine.get_codecs_by_kind(kind); 334 if codecs.is_empty() { 335 return media_engine_codecs; 336 } 337 let mut filtered_codecs = vec![]; 338 for codec in codecs { 339 let (c, match_type) = codec_parameters_fuzzy_search(codec, &media_engine_codecs); 340 if match_type != CodecMatch::None { 341 if codec.payload_type == 0 { 342 codec.payload_type = c.payload_type; 343 } 344 filtered_codecs.push(codec.clone()); 345 } 346 } 347 348 filtered_codecs 349 } 350 351 // State 352 353 /// Get the current state and a receiver for the next state change. current_state(&self) -> State354 pub(crate) fn current_state(&self) -> State { 355 *self.state_rx.borrow() 356 } 357 start(&self) -> Result<()>358 pub(crate) fn start(&self) -> Result<()> { 359 State::transition(State::Started, &self.state_tx) 360 } 361 pause(&self) -> Result<()>362 pub(crate) fn pause(&self) -> Result<()> { 363 let current = self.current_state(); 364 365 match current { 366 State::Unstarted => State::transition(State::UnstartedPaused, &self.state_tx), 367 State::Started => State::transition(State::Paused, &self.state_tx), 368 _ => Ok(()), 369 } 370 } 371 resume(&self) -> Result<()>372 pub(crate) fn resume(&self) -> Result<()> { 373 let current = self.current_state(); 374 375 match current { 376 State::UnstartedPaused => State::transition(State::Unstarted, &self.state_tx), 377 State::Paused => State::transition(State::Started, &self.state_tx), 378 _ => Ok(()), 379 } 380 } 381 close(&self) -> Result<()>382 pub(crate) fn close(&self) -> Result<()> { 383 State::transition(State::Stopped, &self.state_tx) 384 } 385 } 386 387 /// RTPReceiver allows an application to inspect the receipt of a TrackRemote 388 pub struct RTCRtpReceiver { 389 receive_mtu: usize, 390 kind: RTPCodecType, 391 transport: Arc<RTCDtlsTransport>, 392 393 pub internal: Arc<RTPReceiverInternal>, 394 } 395 396 impl std::fmt::Debug for RTCRtpReceiver { fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result397 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 398 f.debug_struct("RTCRtpReceiver") 399 .field("kind", &self.kind) 400 .finish() 401 } 402 } 403 404 impl RTCRtpReceiver { new( receive_mtu: usize, kind: RTPCodecType, transport: Arc<RTCDtlsTransport>, media_engine: Arc<MediaEngine>, interceptor: Arc<dyn Interceptor + Send + Sync>, ) -> Self405 pub fn new( 406 receive_mtu: usize, 407 kind: RTPCodecType, 408 transport: Arc<RTCDtlsTransport>, 409 media_engine: Arc<MediaEngine>, 410 interceptor: Arc<dyn Interceptor + Send + Sync>, 411 ) -> Self { 412 let (state_tx, state_rx) = watch::channel(State::Unstarted); 413 414 RTCRtpReceiver { 415 receive_mtu, 416 kind, 417 transport: Arc::clone(&transport), 418 419 internal: Arc::new(RTPReceiverInternal { 420 kind, 421 422 tracks: RwLock::new(vec![]), 423 transport, 424 media_engine, 425 interceptor, 426 427 state_tx, 428 state_rx, 429 430 transceiver_codecs: ArcSwapOption::new(None), 431 }), 432 } 433 } 434 kind(&self) -> RTPCodecType435 pub fn kind(&self) -> RTPCodecType { 436 self.kind 437 } 438 set_transceiver_codecs( &self, codecs: Option<Arc<Mutex<Vec<RTCRtpCodecParameters>>>>, )439 pub(crate) fn set_transceiver_codecs( 440 &self, 441 codecs: Option<Arc<Mutex<Vec<RTCRtpCodecParameters>>>>, 442 ) { 443 self.internal.transceiver_codecs.store(codecs); 444 } 445 446 /// transport returns the currently-configured *DTLSTransport or nil 447 /// if one has not yet been configured transport(&self) -> Arc<RTCDtlsTransport>448 pub fn transport(&self) -> Arc<RTCDtlsTransport> { 449 Arc::clone(&self.transport) 450 } 451 452 /// get_parameters describes the current configuration for the encoding and 453 /// transmission of media on the receiver's track. get_parameters(&self) -> RTCRtpParameters454 pub async fn get_parameters(&self) -> RTCRtpParameters { 455 self.internal.get_parameters().await 456 } 457 458 /// SetRTPParameters applies provided RTPParameters the RTPReceiver's tracks. 459 /// This method is part of the ORTC API. It is not 460 /// meant to be used together with the basic WebRTC API. 461 /// The amount of provided codecs must match the number of tracks on the receiver. set_rtp_parameters(&self, params: RTCRtpParameters)462 pub async fn set_rtp_parameters(&self, params: RTCRtpParameters) { 463 let mut header_extensions = vec![]; 464 for h in ¶ms.header_extensions { 465 header_extensions.push(RTPHeaderExtension { 466 id: h.id, 467 uri: h.uri.clone(), 468 }); 469 } 470 471 let mut tracks = self.internal.tracks.write().await; 472 for (idx, codec) in params.codecs.iter().enumerate() { 473 let t = &mut tracks[idx]; 474 if let Some(stream_info) = &mut t.stream.stream_info { 475 stream_info.rtp_header_extensions = header_extensions.clone(); 476 } 477 478 let current_track = &t.track; 479 current_track.set_codec(codec.clone()); 480 current_track.set_params(params.clone()); 481 } 482 } 483 484 /// track returns the RtpTransceiver TrackRemote track(&self) -> Option<Arc<TrackRemote>>485 pub async fn track(&self) -> Option<Arc<TrackRemote>> { 486 let tracks = self.internal.tracks.read().await; 487 if tracks.len() != 1 { 488 None 489 } else { 490 tracks.first().map(|t| Arc::clone(&t.track)) 491 } 492 } 493 494 /// tracks returns the RtpTransceiver traclockks 495 /// A RTPReceiver to support Simulcast may now have multiple tracks tracks(&self) -> Vec<Arc<TrackRemote>>496 pub async fn tracks(&self) -> Vec<Arc<TrackRemote>> { 497 let tracks = self.internal.tracks.read().await; 498 tracks.iter().map(|t| Arc::clone(&t.track)).collect() 499 } 500 501 /// receive initialize the track and starts all the transports receive(&self, parameters: &RTCRtpReceiveParameters) -> Result<()>502 pub async fn receive(&self, parameters: &RTCRtpReceiveParameters) -> Result<()> { 503 let receiver = Arc::downgrade(&self.internal); 504 505 let current_state = self.internal.current_state(); 506 if current_state.is_started() { 507 return Err(Error::ErrRTPReceiverReceiveAlreadyCalled); 508 } 509 self.internal.start()?; 510 511 let (global_params, interceptor, media_engine) = { 512 ( 513 self.internal.get_parameters().await, 514 Arc::clone(&self.internal.interceptor), 515 Arc::clone(&self.internal.media_engine), 516 ) 517 }; 518 519 let codec = if let Some(codec) = global_params.codecs.first() { 520 codec.capability.clone() 521 } else { 522 RTCRtpCodecCapability::default() 523 }; 524 525 for encoding in ¶meters.encodings { 526 let (stream_info, rtp_read_stream, rtp_interceptor, rtcp_read_stream, rtcp_interceptor) = 527 if encoding.ssrc != 0 { 528 let stream_info = create_stream_info( 529 "".to_owned(), 530 encoding.ssrc, 531 0, 532 codec.clone(), 533 &global_params.header_extensions, 534 ); 535 let (rtp_read_stream, rtp_interceptor, rtcp_read_stream, rtcp_interceptor) = 536 self.transport 537 .streams_for_ssrc(encoding.ssrc, &stream_info, &interceptor) 538 .await?; 539 540 ( 541 Some(stream_info), 542 Some(rtp_read_stream), 543 Some(rtp_interceptor), 544 Some(rtcp_read_stream), 545 Some(rtcp_interceptor), 546 ) 547 } else { 548 (None, None, None, None, None) 549 }; 550 551 let t = TrackStreams { 552 track: Arc::new(TrackRemote::new( 553 self.receive_mtu, 554 self.kind, 555 encoding.ssrc, 556 encoding.rid.clone(), 557 receiver.clone(), 558 Arc::clone(&media_engine), 559 Arc::clone(&interceptor), 560 )), 561 stream: TrackStream { 562 stream_info, 563 rtp_read_stream, 564 rtp_interceptor, 565 rtcp_read_stream, 566 rtcp_interceptor, 567 }, 568 569 repair_stream: TrackStream { 570 stream_info: None, 571 rtp_read_stream: None, 572 rtp_interceptor: None, 573 rtcp_read_stream: None, 574 rtcp_interceptor: None, 575 }, 576 }; 577 578 { 579 let mut tracks = self.internal.tracks.write().await; 580 tracks.push(t); 581 }; 582 583 let rtx_ssrc = encoding.rtx.ssrc; 584 if rtx_ssrc != 0 { 585 let stream_info = create_stream_info( 586 "".to_owned(), 587 rtx_ssrc, 588 0, 589 codec.clone(), 590 &global_params.header_extensions, 591 ); 592 let (rtp_read_stream, rtp_interceptor, rtcp_read_stream, rtcp_interceptor) = self 593 .transport 594 .streams_for_ssrc(rtx_ssrc, &stream_info, &interceptor) 595 .await?; 596 597 self.receive_for_rtx( 598 rtx_ssrc, 599 "".to_owned(), 600 TrackStream { 601 stream_info: Some(stream_info), 602 rtp_read_stream: Some(rtp_read_stream), 603 rtp_interceptor: Some(rtp_interceptor), 604 rtcp_read_stream: Some(rtcp_read_stream), 605 rtcp_interceptor: Some(rtcp_interceptor), 606 }, 607 ) 608 .await?; 609 } 610 } 611 612 Ok(()) 613 } 614 615 /// read reads incoming RTCP for this RTPReceiver read(&self, b: &mut [u8]) -> Result<(usize, Attributes)>616 pub async fn read(&self, b: &mut [u8]) -> Result<(usize, Attributes)> { 617 self.internal.read(b).await 618 } 619 620 /// read_simulcast reads incoming RTCP for this RTPReceiver for given rid read_simulcast(&self, b: &mut [u8], rid: &str) -> Result<(usize, Attributes)>621 pub async fn read_simulcast(&self, b: &mut [u8], rid: &str) -> Result<(usize, Attributes)> { 622 self.internal.read_simulcast(b, rid).await 623 } 624 625 /// read_rtcp is a convenience method that wraps Read and unmarshal for you. 626 /// It also runs any configured interceptors. read_rtcp( &self, ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)>627 pub async fn read_rtcp( 628 &self, 629 ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)> { 630 self.internal.read_rtcp(self.receive_mtu).await 631 } 632 633 /// read_simulcast_rtcp is a convenience method that wraps ReadSimulcast and unmarshal for you read_simulcast_rtcp( &self, rid: &str, ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)>634 pub async fn read_simulcast_rtcp( 635 &self, 636 rid: &str, 637 ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)> { 638 self.internal 639 .read_simulcast_rtcp(rid, self.receive_mtu) 640 .await 641 } 642 have_received(&self) -> bool643 pub(crate) async fn have_received(&self) -> bool { 644 self.internal.current_state().is_started() 645 } 646 start(&self, incoming: &TrackDetails)647 pub(crate) async fn start(&self, incoming: &TrackDetails) { 648 let mut encoding_size = incoming.ssrcs.len(); 649 if incoming.rids.len() >= encoding_size { 650 encoding_size = incoming.rids.len(); 651 }; 652 653 let mut encodings = vec![RTCRtpDecodingParameters::default(); encoding_size]; 654 for (i, encoding) in encodings.iter_mut().enumerate() { 655 if incoming.rids.len() > i { 656 encoding.rid = incoming.rids[i].clone(); 657 } 658 if incoming.ssrcs.len() > i { 659 encoding.ssrc = incoming.ssrcs[i]; 660 } 661 662 encoding.rtx.ssrc = incoming.repair_ssrc; 663 } 664 665 if let Err(err) = self.receive(&RTCRtpReceiveParameters { encodings }).await { 666 log::warn!("RTPReceiver Receive failed {}", err); 667 return; 668 } 669 670 // set track id and label early so they can be set as new track information 671 // is received from the SDP. 672 let is_unpaused = self.current_state() == State::Started; 673 for track_remote in &self.tracks().await { 674 track_remote.set_id(incoming.id.clone()); 675 track_remote.set_stream_id(incoming.stream_id.clone()); 676 677 if is_unpaused { 678 track_remote.fire_onunmute().await; 679 } 680 } 681 } 682 683 /// Stop irreversibly stops the RTPReceiver stop(&self) -> Result<()>684 pub async fn stop(&self) -> Result<()> { 685 let previous_state = self.internal.current_state(); 686 self.internal.close()?; 687 688 let mut errs = vec![]; 689 let was_ever_started = previous_state.is_started(); 690 if was_ever_started { 691 let tracks = self.internal.tracks.write().await; 692 for t in &*tracks { 693 if let Some(rtcp_read_stream) = &t.stream.rtcp_read_stream { 694 if let Err(err) = rtcp_read_stream.close().await { 695 errs.push(err); 696 } 697 } 698 699 if let Some(rtp_read_stream) = &t.stream.rtp_read_stream { 700 if let Err(err) = rtp_read_stream.close().await { 701 errs.push(err); 702 } 703 } 704 705 if let Some(repair_rtcp_read_stream) = &t.repair_stream.rtcp_read_stream { 706 if let Err(err) = repair_rtcp_read_stream.close().await { 707 errs.push(err); 708 } 709 } 710 711 if let Some(repair_rtp_read_stream) = &t.repair_stream.rtp_read_stream { 712 if let Err(err) = repair_rtp_read_stream.close().await { 713 errs.push(err); 714 } 715 } 716 717 if let Some(stream_info) = &t.stream.stream_info { 718 self.internal 719 .interceptor 720 .unbind_remote_stream(stream_info) 721 .await; 722 } 723 724 if let Some(repair_stream_info) = &t.repair_stream.stream_info { 725 self.internal 726 .interceptor 727 .unbind_remote_stream(repair_stream_info) 728 .await; 729 } 730 } 731 } 732 733 flatten_errs(errs) 734 } 735 736 /// read_rtp should only be called by a track, this only exists so we can keep state in one place read_rtp(&self, b: &mut [u8], tid: usize) -> Result<(usize, Attributes)>737 pub(crate) async fn read_rtp(&self, b: &mut [u8], tid: usize) -> Result<(usize, Attributes)> { 738 self.internal.read_rtp(b, tid).await 739 } 740 741 /// receive_for_rid is the sibling of Receive expect for RIDs instead of SSRCs 742 /// It populates all the internal state for the given RID receive_for_rid( &self, rid: String, params: RTCRtpParameters, stream: TrackStream, ) -> Result<Arc<TrackRemote>>743 pub(crate) async fn receive_for_rid( 744 &self, 745 rid: String, 746 params: RTCRtpParameters, 747 stream: TrackStream, 748 ) -> Result<Arc<TrackRemote>> { 749 let mut tracks = self.internal.tracks.write().await; 750 for t in &mut *tracks { 751 if t.track.rid() == rid { 752 t.track.set_kind(self.kind); 753 if let Some(codec) = params.codecs.first() { 754 t.track.set_codec(codec.clone()); 755 } 756 t.track.set_params(params.clone()); 757 t.track 758 .set_ssrc(stream.stream_info.as_ref().map_or(0, |s| s.ssrc)); 759 t.stream = stream; 760 return Ok(Arc::clone(&t.track)); 761 } 762 } 763 764 Err(Error::ErrRTPReceiverForRIDTrackStreamNotFound) 765 } 766 767 /// receiveForRtx starts a routine that processes the repair stream 768 /// These packets aren't exposed to the user yet, but we need to process them for 769 /// TWCC receive_for_rtx( &self, ssrc: SSRC, rsid: String, repair_stream: TrackStream, ) -> Result<()>770 pub(crate) async fn receive_for_rtx( 771 &self, 772 ssrc: SSRC, 773 rsid: String, 774 repair_stream: TrackStream, 775 ) -> Result<()> { 776 let mut tracks = self.internal.tracks.write().await; 777 let l = tracks.len(); 778 for t in &mut *tracks { 779 if (ssrc != 0 && l == 1) || t.track.rid() == rsid { 780 t.repair_stream = repair_stream; 781 782 let receive_mtu = self.receive_mtu; 783 let track = t.clone(); 784 tokio::spawn(async move { 785 let a = Attributes::new(); 786 let mut b = vec![0u8; receive_mtu]; 787 while let Some(repair_rtp_interceptor) = &track.repair_stream.rtp_interceptor { 788 //TODO: cancel repair_rtp_interceptor.read gracefully 789 //println!("repair_rtp_interceptor read begin with ssrc={}", ssrc); 790 if repair_rtp_interceptor.read(&mut b, &a).await.is_err() { 791 break; 792 } 793 } 794 }); 795 796 return Ok(()); 797 } 798 } 799 800 Err(Error::ErrRTPReceiverForRIDTrackStreamNotFound) 801 } 802 803 // State 804 current_state(&self) -> State805 pub(crate) fn current_state(&self) -> State { 806 self.internal.current_state() 807 } 808 pause(&self) -> Result<()>809 pub(crate) async fn pause(&self) -> Result<()> { 810 self.internal.pause()?; 811 812 if !self.internal.current_state().is_started() { 813 return Ok(()); 814 } 815 816 let streams = self.internal.tracks.read().await; 817 818 for stream in streams.iter() { 819 // TODO: If we introduce futures as a direct dependency this and other futures could be 820 // ran concurrently with [`join_all`](https://docs.rs/futures/0.3.21/futures/future/fn.join_all.html) 821 stream.track.fire_onmute().await; 822 } 823 824 Ok(()) 825 } 826 resume(&self) -> Result<()>827 pub(crate) async fn resume(&self) -> Result<()> { 828 self.internal.resume()?; 829 830 if !self.internal.current_state().is_started() { 831 return Ok(()); 832 } 833 834 let streams = self.internal.tracks.read().await; 835 836 for stream in streams.iter() { 837 // TODO: If we introduce futures as a direct dependency this and other futures could be 838 // ran concurrently with [`join_all`](https://docs.rs/futures/0.3.21/futures/future/fn.join_all.html) 839 stream.track.fire_onunmute().await; 840 } 841 842 Ok(()) 843 } 844 } 845