1 #[cfg(test)]
2 mod rtp_receiver_test;
3 
4 use crate::api::media_engine::MediaEngine;
5 use crate::dtls_transport::RTCDtlsTransport;
6 use crate::error::{flatten_errs, Error, Result};
7 use crate::peer_connection::sdp::TrackDetails;
8 use crate::rtp_transceiver::rtp_codec::{
9     codec_parameters_fuzzy_search, CodecMatch, RTCRtpCodecCapability, RTCRtpCodecParameters,
10     RTCRtpParameters, RTPCodecType,
11 };
12 use crate::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection;
13 use crate::rtp_transceiver::{
14     create_stream_info, RTCRtpDecodingParameters, RTCRtpReceiveParameters, SSRC,
15 };
16 use crate::track::track_remote::TrackRemote;
17 use crate::track::{TrackStream, TrackStreams};
18 
19 use arc_swap::ArcSwapOption;
20 use interceptor::stream_info::RTPHeaderExtension;
21 use interceptor::{Attributes, Interceptor};
22 use log::trace;
23 use std::fmt;
24 
25 use std::sync::Arc;
26 use tokio::sync::{watch, Mutex, RwLock};
27 
28 #[derive(Debug, Copy, Clone, PartialEq, Eq)]
29 #[repr(u8)]
30 pub enum State {
31     /// We haven't started yet.
32     Unstarted = 0,
33     /// We haven't started yet and additionally we've been paused.
34     UnstartedPaused = 1,
35 
36     /// We have started and are running.
37     Started = 2,
38 
39     /// We have been paused after starting.
40     Paused = 3,
41 
42     /// We have been stopped.
43     Stopped = 4,
44 }
45 
46 impl From<u8> for State {
from(value: u8) -> Self47     fn from(value: u8) -> Self {
48         match value {
49             v if v == State::Unstarted as u8 => State::Unstarted,
50             v if v == State::UnstartedPaused as u8 => State::UnstartedPaused,
51             v if v == State::Started as u8 => State::Started,
52             v if v == State::Paused as u8 => State::Paused,
53             v if v == State::Stopped as u8 => State::Stopped,
54             _ => unreachable!(
55                 "Invalid serialization of {}: {}",
56                 std::any::type_name::<Self>(),
57                 value
58             ),
59         }
60     }
61 }
62 
63 impl fmt::Display for State {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result64     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65         match self {
66             State::Unstarted => write!(f, "Unstarted"),
67             State::UnstartedPaused => write!(f, "UnstartedPaused"),
68             State::Started => write!(f, "Running"),
69             State::Paused => write!(f, "Paused"),
70             State::Stopped => write!(f, "Closed"),
71         }
72     }
73 }
74 
75 impl State {
transition(to: Self, tx: &watch::Sender<State>) -> Result<()>76     fn transition(to: Self, tx: &watch::Sender<State>) -> Result<()> {
77         let current = *tx.borrow();
78         if current == to {
79             // Already in this state
80             return Ok(());
81         }
82 
83         match current {
84             Self::Unstarted
85                 if matches!(to, Self::Started | Self::Stopped | Self::UnstartedPaused) =>
86             {
87                 let _ = tx.send(to);
88                 return Ok(());
89             }
90             Self::UnstartedPaused
91                 if matches!(to, Self::Unstarted | Self::Stopped | Self::Paused) =>
92             {
93                 let _ = tx.send(to);
94                 return Ok(());
95             }
96             State::Started if matches!(to, Self::Paused | Self::Stopped) => {
97                 let _ = tx.send(to);
98                 return Ok(());
99             }
100             State::Paused if matches!(to, Self::Started | Self::Stopped) => {
101                 let _ = tx.send(to);
102                 return Ok(());
103             }
104             _ => {}
105         }
106 
107         Err(Error::ErrRTPReceiverStateChangeInvalid { from: current, to })
108     }
109 
wait_for(rx: &mut watch::Receiver<State>, states: &[State]) -> Result<()>110     async fn wait_for(rx: &mut watch::Receiver<State>, states: &[State]) -> Result<()> {
111         loop {
112             let state = *rx.borrow();
113 
114             match state {
115                 _ if states.contains(&state) => return Ok(()),
116                 State::Stopped => {
117                     return Err(Error::ErrClosedPipe);
118                 }
119                 _ => {}
120             }
121 
122             if rx.changed().await.is_err() {
123                 return Err(Error::ErrClosedPipe);
124             }
125         }
126     }
127 
error_on_close(rx: &mut watch::Receiver<State>) -> Result<()>128     async fn error_on_close(rx: &mut watch::Receiver<State>) -> Result<()> {
129         if rx.changed().await.is_err() {
130             return Err(Error::ErrClosedPipe);
131         }
132 
133         let state = *rx.borrow();
134         if state == State::Stopped {
135             return Err(Error::ErrClosedPipe);
136         }
137 
138         Ok(())
139     }
140 
is_started(&self) -> bool141     fn is_started(&self) -> bool {
142         matches!(self, Self::Started | Self::Paused)
143     }
144 }
145 
146 pub struct RTPReceiverInternal {
147     pub(crate) kind: RTPCodecType,
148 
149     // State is stored within the channel
150     state_tx: watch::Sender<State>,
151     state_rx: watch::Receiver<State>,
152 
153     tracks: RwLock<Vec<TrackStreams>>,
154 
155     transceiver_codecs: ArcSwapOption<Mutex<Vec<RTCRtpCodecParameters>>>,
156 
157     transport: Arc<RTCDtlsTransport>,
158     media_engine: Arc<MediaEngine>,
159     interceptor: Arc<dyn Interceptor + Send + Sync>,
160 }
161 
162 impl RTPReceiverInternal {
163     /// read reads incoming RTCP for this RTPReceiver
read(&self, b: &mut [u8]) -> Result<(usize, Attributes)>164     async fn read(&self, b: &mut [u8]) -> Result<(usize, Attributes)> {
165         let mut state_watch_rx = self.state_tx.subscribe();
166         // Ensure we are running or paused. When paused we still receive RTCP even if RTP traffic
167         // isn't flowing.
168         State::wait_for(&mut state_watch_rx, &[State::Started, State::Paused]).await?;
169 
170         let tracks = self.tracks.read().await;
171         if let Some(t) = tracks.first() {
172             if let Some(rtcp_interceptor) = &t.stream.rtcp_interceptor {
173                 let a = Attributes::new();
174                 loop {
175                     tokio::select! {
176                         res = State::error_on_close(&mut state_watch_rx) => {
177                             res?
178                         }
179                         result = rtcp_interceptor.read(b, &a) => {
180                             return Ok(result?)
181                         }
182                     }
183                 }
184             } else {
185                 Err(Error::ErrInterceptorNotBind)
186             }
187         } else {
188             Err(Error::ErrExistingTrack)
189         }
190     }
191 
192     /// read_simulcast reads incoming RTCP for this RTPReceiver for given rid
read_simulcast(&self, b: &mut [u8], rid: &str) -> Result<(usize, Attributes)>193     async fn read_simulcast(&self, b: &mut [u8], rid: &str) -> Result<(usize, Attributes)> {
194         let mut state_watch_rx = self.state_tx.subscribe();
195 
196         // Ensure we are running or paused. When paused we still recevie RTCP even if RTP traffic
197         // isn't flowing.
198         State::wait_for(&mut state_watch_rx, &[State::Started, State::Paused]).await?;
199 
200         let tracks = self.tracks.read().await;
201         for t in &*tracks {
202             if t.track.rid() == rid {
203                 if let Some(rtcp_interceptor) = &t.stream.rtcp_interceptor {
204                     let a = Attributes::new();
205 
206                     loop {
207                         tokio::select! {
208                             res = State::error_on_close(&mut state_watch_rx) => {
209                                 res?
210                             }
211                             result = rtcp_interceptor.read(b, &a) => {
212                                 return Ok(result?);
213                             }
214                         }
215                     }
216                 } else {
217                     return Err(Error::ErrInterceptorNotBind);
218                 }
219             }
220         }
221         Err(Error::ErrRTPReceiverForRIDTrackStreamNotFound)
222     }
223 
224     /// read_rtcp is a convenience method that wraps Read and unmarshal for you.
225     /// It also runs any configured interceptors.
read_rtcp( &self, receive_mtu: usize, ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)>226     async fn read_rtcp(
227         &self,
228         receive_mtu: usize,
229     ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)> {
230         let mut b = vec![0u8; receive_mtu];
231         let (n, attributes) = self.read(&mut b).await?;
232 
233         let mut buf = &b[..n];
234         let pkts = rtcp::packet::unmarshal(&mut buf)?;
235 
236         Ok((pkts, attributes))
237     }
238 
239     /// read_simulcast_rtcp is a convenience method that wraps ReadSimulcast and unmarshal for you
read_simulcast_rtcp( &self, rid: &str, receive_mtu: usize, ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)>240     async fn read_simulcast_rtcp(
241         &self,
242         rid: &str,
243         receive_mtu: usize,
244     ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)> {
245         let mut b = vec![0u8; receive_mtu];
246         let (n, attributes) = self.read_simulcast(&mut b, rid).await?;
247 
248         let mut buf = &b[..n];
249         let pkts = rtcp::packet::unmarshal(&mut buf)?;
250 
251         Ok((pkts, attributes))
252     }
253 
read_rtp(&self, b: &mut [u8], tid: usize) -> Result<(usize, Attributes)>254     pub(crate) async fn read_rtp(&self, b: &mut [u8], tid: usize) -> Result<(usize, Attributes)> {
255         let mut state_watch_rx = self.state_tx.subscribe();
256 
257         // Ensure we are running.
258         State::wait_for(&mut state_watch_rx, &[State::Started]).await?;
259 
260         //log::debug!("read_rtp enter tracks tid {}", tid);
261         let mut rtp_interceptor = None;
262         //let mut ssrc = 0;
263         {
264             let tracks = self.tracks.read().await;
265             for t in &*tracks {
266                 if t.track.tid() == tid {
267                     rtp_interceptor = t.stream.rtp_interceptor.clone();
268                     //ssrc = t.track.ssrc();
269                     break;
270                 }
271             }
272         };
273         /*log::debug!(
274             "read_rtp exit tracks with rtp_interceptor {} with tid {}",
275             rtp_interceptor.is_some(),
276             tid,
277         );*/
278 
279         if let Some(rtp_interceptor) = rtp_interceptor {
280             let a = Attributes::new();
281             //println!(
282             //    "read_rtp rtp_interceptor.read enter with tid {} ssrc {}",
283             //    tid, ssrc
284             //);
285             let mut current_state = *state_watch_rx.borrow();
286             loop {
287                 tokio::select! {
288                     _ = state_watch_rx.changed() => {
289                         let new_state = *state_watch_rx.borrow();
290 
291                         if new_state == State::Stopped {
292                             return Err(Error::ErrClosedPipe);
293                         }
294                         current_state = new_state;
295                     }
296                     result = rtp_interceptor.read(b, &a) => {
297                         let result = result?;
298 
299                         if current_state == State::Paused {
300                             trace!("Dropping {} read bytes received while RTPReceiver was paused", result.0);
301                             continue;
302                         }
303                         return Ok(result);
304                     }
305                 }
306             }
307         } else {
308             //log::debug!("read_rtp exit tracks with ErrRTPReceiverWithSSRCTrackStreamNotFound");
309             Err(Error::ErrRTPReceiverWithSSRCTrackStreamNotFound)
310         }
311     }
312 
get_parameters(&self) -> RTCRtpParameters313     async fn get_parameters(&self) -> RTCRtpParameters {
314         let mut parameters = self
315             .media_engine
316             .get_rtp_parameters_by_kind(self.kind, RTCRtpTransceiverDirection::Recvonly);
317 
318         let transceiver_codecs = self.transceiver_codecs.load();
319         if let Some(codecs) = &*transceiver_codecs {
320             let mut c = codecs.lock().await;
321             parameters.codecs =
322                 RTPReceiverInternal::get_codecs(&mut c, self.kind, &self.media_engine);
323         }
324 
325         parameters
326     }
327 
get_codecs( codecs: &mut [RTCRtpCodecParameters], kind: RTPCodecType, media_engine: &Arc<MediaEngine>, ) -> Vec<RTCRtpCodecParameters>328     pub(crate) fn get_codecs(
329         codecs: &mut [RTCRtpCodecParameters],
330         kind: RTPCodecType,
331         media_engine: &Arc<MediaEngine>,
332     ) -> Vec<RTCRtpCodecParameters> {
333         let media_engine_codecs = media_engine.get_codecs_by_kind(kind);
334         if codecs.is_empty() {
335             return media_engine_codecs;
336         }
337         let mut filtered_codecs = vec![];
338         for codec in codecs {
339             let (c, match_type) = codec_parameters_fuzzy_search(codec, &media_engine_codecs);
340             if match_type != CodecMatch::None {
341                 if codec.payload_type == 0 {
342                     codec.payload_type = c.payload_type;
343                 }
344                 filtered_codecs.push(codec.clone());
345             }
346         }
347 
348         filtered_codecs
349     }
350 
351     // State
352 
353     /// Get the current state and a receiver for the next state change.
current_state(&self) -> State354     pub(crate) fn current_state(&self) -> State {
355         *self.state_rx.borrow()
356     }
357 
start(&self) -> Result<()>358     pub(crate) fn start(&self) -> Result<()> {
359         State::transition(State::Started, &self.state_tx)
360     }
361 
pause(&self) -> Result<()>362     pub(crate) fn pause(&self) -> Result<()> {
363         let current = self.current_state();
364 
365         match current {
366             State::Unstarted => State::transition(State::UnstartedPaused, &self.state_tx),
367             State::Started => State::transition(State::Paused, &self.state_tx),
368             _ => Ok(()),
369         }
370     }
371 
resume(&self) -> Result<()>372     pub(crate) fn resume(&self) -> Result<()> {
373         let current = self.current_state();
374 
375         match current {
376             State::UnstartedPaused => State::transition(State::Unstarted, &self.state_tx),
377             State::Paused => State::transition(State::Started, &self.state_tx),
378             _ => Ok(()),
379         }
380     }
381 
close(&self) -> Result<()>382     pub(crate) fn close(&self) -> Result<()> {
383         State::transition(State::Stopped, &self.state_tx)
384     }
385 }
386 
387 /// RTPReceiver allows an application to inspect the receipt of a TrackRemote
388 pub struct RTCRtpReceiver {
389     receive_mtu: usize,
390     kind: RTPCodecType,
391     transport: Arc<RTCDtlsTransport>,
392 
393     pub internal: Arc<RTPReceiverInternal>,
394 }
395 
396 impl std::fmt::Debug for RTCRtpReceiver {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result397     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
398         f.debug_struct("RTCRtpReceiver")
399             .field("kind", &self.kind)
400             .finish()
401     }
402 }
403 
404 impl RTCRtpReceiver {
new( receive_mtu: usize, kind: RTPCodecType, transport: Arc<RTCDtlsTransport>, media_engine: Arc<MediaEngine>, interceptor: Arc<dyn Interceptor + Send + Sync>, ) -> Self405     pub fn new(
406         receive_mtu: usize,
407         kind: RTPCodecType,
408         transport: Arc<RTCDtlsTransport>,
409         media_engine: Arc<MediaEngine>,
410         interceptor: Arc<dyn Interceptor + Send + Sync>,
411     ) -> Self {
412         let (state_tx, state_rx) = watch::channel(State::Unstarted);
413 
414         RTCRtpReceiver {
415             receive_mtu,
416             kind,
417             transport: Arc::clone(&transport),
418 
419             internal: Arc::new(RTPReceiverInternal {
420                 kind,
421 
422                 tracks: RwLock::new(vec![]),
423                 transport,
424                 media_engine,
425                 interceptor,
426 
427                 state_tx,
428                 state_rx,
429 
430                 transceiver_codecs: ArcSwapOption::new(None),
431             }),
432         }
433     }
434 
kind(&self) -> RTPCodecType435     pub fn kind(&self) -> RTPCodecType {
436         self.kind
437     }
438 
set_transceiver_codecs( &self, codecs: Option<Arc<Mutex<Vec<RTCRtpCodecParameters>>>>, )439     pub(crate) fn set_transceiver_codecs(
440         &self,
441         codecs: Option<Arc<Mutex<Vec<RTCRtpCodecParameters>>>>,
442     ) {
443         self.internal.transceiver_codecs.store(codecs);
444     }
445 
446     /// transport returns the currently-configured *DTLSTransport or nil
447     /// if one has not yet been configured
transport(&self) -> Arc<RTCDtlsTransport>448     pub fn transport(&self) -> Arc<RTCDtlsTransport> {
449         Arc::clone(&self.transport)
450     }
451 
452     /// get_parameters describes the current configuration for the encoding and
453     /// transmission of media on the receiver's track.
get_parameters(&self) -> RTCRtpParameters454     pub async fn get_parameters(&self) -> RTCRtpParameters {
455         self.internal.get_parameters().await
456     }
457 
458     /// SetRTPParameters applies provided RTPParameters the RTPReceiver's tracks.
459     /// This method is part of the ORTC API. It is not
460     /// meant to be used together with the basic WebRTC API.
461     /// The amount of provided codecs must match the number of tracks on the receiver.
set_rtp_parameters(&self, params: RTCRtpParameters)462     pub async fn set_rtp_parameters(&self, params: RTCRtpParameters) {
463         let mut header_extensions = vec![];
464         for h in &params.header_extensions {
465             header_extensions.push(RTPHeaderExtension {
466                 id: h.id,
467                 uri: h.uri.clone(),
468             });
469         }
470 
471         let mut tracks = self.internal.tracks.write().await;
472         for (idx, codec) in params.codecs.iter().enumerate() {
473             let t = &mut tracks[idx];
474             if let Some(stream_info) = &mut t.stream.stream_info {
475                 stream_info.rtp_header_extensions = header_extensions.clone();
476             }
477 
478             let current_track = &t.track;
479             current_track.set_codec(codec.clone());
480             current_track.set_params(params.clone());
481         }
482     }
483 
484     /// track returns the RtpTransceiver TrackRemote
track(&self) -> Option<Arc<TrackRemote>>485     pub async fn track(&self) -> Option<Arc<TrackRemote>> {
486         let tracks = self.internal.tracks.read().await;
487         if tracks.len() != 1 {
488             None
489         } else {
490             tracks.first().map(|t| Arc::clone(&t.track))
491         }
492     }
493 
494     /// tracks returns the RtpTransceiver traclockks
495     /// A RTPReceiver to support Simulcast may now have multiple tracks
tracks(&self) -> Vec<Arc<TrackRemote>>496     pub async fn tracks(&self) -> Vec<Arc<TrackRemote>> {
497         let tracks = self.internal.tracks.read().await;
498         tracks.iter().map(|t| Arc::clone(&t.track)).collect()
499     }
500 
501     /// receive initialize the track and starts all the transports
receive(&self, parameters: &RTCRtpReceiveParameters) -> Result<()>502     pub async fn receive(&self, parameters: &RTCRtpReceiveParameters) -> Result<()> {
503         let receiver = Arc::downgrade(&self.internal);
504 
505         let current_state = self.internal.current_state();
506         if current_state.is_started() {
507             return Err(Error::ErrRTPReceiverReceiveAlreadyCalled);
508         }
509         self.internal.start()?;
510 
511         let (global_params, interceptor, media_engine) = {
512             (
513                 self.internal.get_parameters().await,
514                 Arc::clone(&self.internal.interceptor),
515                 Arc::clone(&self.internal.media_engine),
516             )
517         };
518 
519         let codec = if let Some(codec) = global_params.codecs.first() {
520             codec.capability.clone()
521         } else {
522             RTCRtpCodecCapability::default()
523         };
524 
525         for encoding in &parameters.encodings {
526             let (stream_info, rtp_read_stream, rtp_interceptor, rtcp_read_stream, rtcp_interceptor) =
527                 if encoding.ssrc != 0 {
528                     let stream_info = create_stream_info(
529                         "".to_owned(),
530                         encoding.ssrc,
531                         0,
532                         codec.clone(),
533                         &global_params.header_extensions,
534                     );
535                     let (rtp_read_stream, rtp_interceptor, rtcp_read_stream, rtcp_interceptor) =
536                         self.transport
537                             .streams_for_ssrc(encoding.ssrc, &stream_info, &interceptor)
538                             .await?;
539 
540                     (
541                         Some(stream_info),
542                         Some(rtp_read_stream),
543                         Some(rtp_interceptor),
544                         Some(rtcp_read_stream),
545                         Some(rtcp_interceptor),
546                     )
547                 } else {
548                     (None, None, None, None, None)
549                 };
550 
551             let t = TrackStreams {
552                 track: Arc::new(TrackRemote::new(
553                     self.receive_mtu,
554                     self.kind,
555                     encoding.ssrc,
556                     encoding.rid.clone(),
557                     receiver.clone(),
558                     Arc::clone(&media_engine),
559                     Arc::clone(&interceptor),
560                 )),
561                 stream: TrackStream {
562                     stream_info,
563                     rtp_read_stream,
564                     rtp_interceptor,
565                     rtcp_read_stream,
566                     rtcp_interceptor,
567                 },
568 
569                 repair_stream: TrackStream {
570                     stream_info: None,
571                     rtp_read_stream: None,
572                     rtp_interceptor: None,
573                     rtcp_read_stream: None,
574                     rtcp_interceptor: None,
575                 },
576             };
577 
578             {
579                 let mut tracks = self.internal.tracks.write().await;
580                 tracks.push(t);
581             };
582 
583             let rtx_ssrc = encoding.rtx.ssrc;
584             if rtx_ssrc != 0 {
585                 let stream_info = create_stream_info(
586                     "".to_owned(),
587                     rtx_ssrc,
588                     0,
589                     codec.clone(),
590                     &global_params.header_extensions,
591                 );
592                 let (rtp_read_stream, rtp_interceptor, rtcp_read_stream, rtcp_interceptor) = self
593                     .transport
594                     .streams_for_ssrc(rtx_ssrc, &stream_info, &interceptor)
595                     .await?;
596 
597                 self.receive_for_rtx(
598                     rtx_ssrc,
599                     "".to_owned(),
600                     TrackStream {
601                         stream_info: Some(stream_info),
602                         rtp_read_stream: Some(rtp_read_stream),
603                         rtp_interceptor: Some(rtp_interceptor),
604                         rtcp_read_stream: Some(rtcp_read_stream),
605                         rtcp_interceptor: Some(rtcp_interceptor),
606                     },
607                 )
608                 .await?;
609             }
610         }
611 
612         Ok(())
613     }
614 
615     /// read reads incoming RTCP for this RTPReceiver
read(&self, b: &mut [u8]) -> Result<(usize, Attributes)>616     pub async fn read(&self, b: &mut [u8]) -> Result<(usize, Attributes)> {
617         self.internal.read(b).await
618     }
619 
620     /// read_simulcast reads incoming RTCP for this RTPReceiver for given rid
read_simulcast(&self, b: &mut [u8], rid: &str) -> Result<(usize, Attributes)>621     pub async fn read_simulcast(&self, b: &mut [u8], rid: &str) -> Result<(usize, Attributes)> {
622         self.internal.read_simulcast(b, rid).await
623     }
624 
625     /// read_rtcp is a convenience method that wraps Read and unmarshal for you.
626     /// It also runs any configured interceptors.
read_rtcp( &self, ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)>627     pub async fn read_rtcp(
628         &self,
629     ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)> {
630         self.internal.read_rtcp(self.receive_mtu).await
631     }
632 
633     /// read_simulcast_rtcp is a convenience method that wraps ReadSimulcast and unmarshal for you
read_simulcast_rtcp( &self, rid: &str, ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)>634     pub async fn read_simulcast_rtcp(
635         &self,
636         rid: &str,
637     ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)> {
638         self.internal
639             .read_simulcast_rtcp(rid, self.receive_mtu)
640             .await
641     }
642 
have_received(&self) -> bool643     pub(crate) async fn have_received(&self) -> bool {
644         self.internal.current_state().is_started()
645     }
646 
start(&self, incoming: &TrackDetails)647     pub(crate) async fn start(&self, incoming: &TrackDetails) {
648         let mut encoding_size = incoming.ssrcs.len();
649         if incoming.rids.len() >= encoding_size {
650             encoding_size = incoming.rids.len();
651         };
652 
653         let mut encodings = vec![RTCRtpDecodingParameters::default(); encoding_size];
654         for (i, encoding) in encodings.iter_mut().enumerate() {
655             if incoming.rids.len() > i {
656                 encoding.rid = incoming.rids[i].clone();
657             }
658             if incoming.ssrcs.len() > i {
659                 encoding.ssrc = incoming.ssrcs[i];
660             }
661 
662             encoding.rtx.ssrc = incoming.repair_ssrc;
663         }
664 
665         if let Err(err) = self.receive(&RTCRtpReceiveParameters { encodings }).await {
666             log::warn!("RTPReceiver Receive failed {}", err);
667             return;
668         }
669 
670         // set track id and label early so they can be set as new track information
671         // is received from the SDP.
672         let is_unpaused = self.current_state() == State::Started;
673         for track_remote in &self.tracks().await {
674             track_remote.set_id(incoming.id.clone());
675             track_remote.set_stream_id(incoming.stream_id.clone());
676 
677             if is_unpaused {
678                 track_remote.fire_onunmute().await;
679             }
680         }
681     }
682 
683     /// Stop irreversibly stops the RTPReceiver
stop(&self) -> Result<()>684     pub async fn stop(&self) -> Result<()> {
685         let previous_state = self.internal.current_state();
686         self.internal.close()?;
687 
688         let mut errs = vec![];
689         let was_ever_started = previous_state.is_started();
690         if was_ever_started {
691             let tracks = self.internal.tracks.write().await;
692             for t in &*tracks {
693                 if let Some(rtcp_read_stream) = &t.stream.rtcp_read_stream {
694                     if let Err(err) = rtcp_read_stream.close().await {
695                         errs.push(err);
696                     }
697                 }
698 
699                 if let Some(rtp_read_stream) = &t.stream.rtp_read_stream {
700                     if let Err(err) = rtp_read_stream.close().await {
701                         errs.push(err);
702                     }
703                 }
704 
705                 if let Some(repair_rtcp_read_stream) = &t.repair_stream.rtcp_read_stream {
706                     if let Err(err) = repair_rtcp_read_stream.close().await {
707                         errs.push(err);
708                     }
709                 }
710 
711                 if let Some(repair_rtp_read_stream) = &t.repair_stream.rtp_read_stream {
712                     if let Err(err) = repair_rtp_read_stream.close().await {
713                         errs.push(err);
714                     }
715                 }
716 
717                 if let Some(stream_info) = &t.stream.stream_info {
718                     self.internal
719                         .interceptor
720                         .unbind_remote_stream(stream_info)
721                         .await;
722                 }
723 
724                 if let Some(repair_stream_info) = &t.repair_stream.stream_info {
725                     self.internal
726                         .interceptor
727                         .unbind_remote_stream(repair_stream_info)
728                         .await;
729                 }
730             }
731         }
732 
733         flatten_errs(errs)
734     }
735 
736     /// read_rtp should only be called by a track, this only exists so we can keep state in one place
read_rtp(&self, b: &mut [u8], tid: usize) -> Result<(usize, Attributes)>737     pub(crate) async fn read_rtp(&self, b: &mut [u8], tid: usize) -> Result<(usize, Attributes)> {
738         self.internal.read_rtp(b, tid).await
739     }
740 
741     /// receive_for_rid is the sibling of Receive expect for RIDs instead of SSRCs
742     /// It populates all the internal state for the given RID
receive_for_rid( &self, rid: String, params: RTCRtpParameters, stream: TrackStream, ) -> Result<Arc<TrackRemote>>743     pub(crate) async fn receive_for_rid(
744         &self,
745         rid: String,
746         params: RTCRtpParameters,
747         stream: TrackStream,
748     ) -> Result<Arc<TrackRemote>> {
749         let mut tracks = self.internal.tracks.write().await;
750         for t in &mut *tracks {
751             if t.track.rid() == rid {
752                 t.track.set_kind(self.kind);
753                 if let Some(codec) = params.codecs.first() {
754                     t.track.set_codec(codec.clone());
755                 }
756                 t.track.set_params(params.clone());
757                 t.track
758                     .set_ssrc(stream.stream_info.as_ref().map_or(0, |s| s.ssrc));
759                 t.stream = stream;
760                 return Ok(Arc::clone(&t.track));
761             }
762         }
763 
764         Err(Error::ErrRTPReceiverForRIDTrackStreamNotFound)
765     }
766 
767     /// receiveForRtx starts a routine that processes the repair stream
768     /// These packets aren't exposed to the user yet, but we need to process them for
769     /// TWCC
receive_for_rtx( &self, ssrc: SSRC, rsid: String, repair_stream: TrackStream, ) -> Result<()>770     pub(crate) async fn receive_for_rtx(
771         &self,
772         ssrc: SSRC,
773         rsid: String,
774         repair_stream: TrackStream,
775     ) -> Result<()> {
776         let mut tracks = self.internal.tracks.write().await;
777         let l = tracks.len();
778         for t in &mut *tracks {
779             if (ssrc != 0 && l == 1) || t.track.rid() == rsid {
780                 t.repair_stream = repair_stream;
781 
782                 let receive_mtu = self.receive_mtu;
783                 let track = t.clone();
784                 tokio::spawn(async move {
785                     let a = Attributes::new();
786                     let mut b = vec![0u8; receive_mtu];
787                     while let Some(repair_rtp_interceptor) = &track.repair_stream.rtp_interceptor {
788                         //TODO: cancel repair_rtp_interceptor.read gracefully
789                         //println!("repair_rtp_interceptor read begin with ssrc={}", ssrc);
790                         if repair_rtp_interceptor.read(&mut b, &a).await.is_err() {
791                             break;
792                         }
793                     }
794                 });
795 
796                 return Ok(());
797             }
798         }
799 
800         Err(Error::ErrRTPReceiverForRIDTrackStreamNotFound)
801     }
802 
803     // State
804 
current_state(&self) -> State805     pub(crate) fn current_state(&self) -> State {
806         self.internal.current_state()
807     }
808 
pause(&self) -> Result<()>809     pub(crate) async fn pause(&self) -> Result<()> {
810         self.internal.pause()?;
811 
812         if !self.internal.current_state().is_started() {
813             return Ok(());
814         }
815 
816         let streams = self.internal.tracks.read().await;
817 
818         for stream in streams.iter() {
819             // TODO: If we introduce futures as a direct dependency this and other futures could be
820             // ran concurrently with [`join_all`](https://docs.rs/futures/0.3.21/futures/future/fn.join_all.html)
821             stream.track.fire_onmute().await;
822         }
823 
824         Ok(())
825     }
826 
resume(&self) -> Result<()>827     pub(crate) async fn resume(&self) -> Result<()> {
828         self.internal.resume()?;
829 
830         if !self.internal.current_state().is_started() {
831             return Ok(());
832         }
833 
834         let streams = self.internal.tracks.read().await;
835 
836         for stream in streams.iter() {
837             // TODO: If we introduce futures as a direct dependency this and other futures could be
838             // ran concurrently with [`join_all`](https://docs.rs/futures/0.3.21/futures/future/fn.join_all.html)
839             stream.track.fire_onunmute().await;
840         }
841 
842         Ok(())
843     }
844 }
845