xref: /webrtc/webrtc/src/track/track_remote/mod.rs (revision ab3a4f44)
1 use crate::api::media_engine::MediaEngine;
2 use crate::error::{Error, Result};
3 use crate::rtp_transceiver::rtp_codec::{RTCRtpCodecParameters, RTCRtpParameters, RTPCodecType};
4 use crate::rtp_transceiver::{PayloadType, SSRC};
5 
6 use crate::rtp_transceiver::rtp_receiver::RTPReceiverInternal;
7 
8 use crate::track::RTP_PAYLOAD_TYPE_BITMASK;
9 use arc_swap::ArcSwapOption;
10 use bytes::{Bytes, BytesMut};
11 use interceptor::{Attributes, Interceptor};
12 use std::collections::VecDeque;
13 use std::future::Future;
14 use std::pin::Pin;
15 use std::sync::atomic::{AtomicU32, AtomicU8, AtomicUsize, Ordering};
16 use std::sync::{Arc, Weak};
17 use tokio::sync::Mutex;
18 use util::sync::Mutex as SyncMutex;
19 
20 use util::Unmarshal;
21 
22 lazy_static! {
23     static ref TRACK_REMOTE_UNIQUE_ID: AtomicUsize = AtomicUsize::new(0);
24 }
25 pub type OnMuteHdlrFn = Box<
26     dyn (FnMut() -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>) + Send + Sync + 'static,
27 >;
28 
29 #[derive(Default)]
30 struct Handlers {
31     on_mute: ArcSwapOption<Mutex<OnMuteHdlrFn>>,
32     on_unmute: ArcSwapOption<Mutex<OnMuteHdlrFn>>,
33 }
34 
35 #[derive(Default)]
36 struct TrackRemoteInternal {
37     peeked: VecDeque<(Bytes, Attributes)>,
38 }
39 
40 /// TrackRemote represents a single inbound source of media
41 pub struct TrackRemote {
42     tid: usize,
43 
44     id: SyncMutex<String>,
45     stream_id: SyncMutex<String>,
46 
47     receive_mtu: usize,
48     payload_type: AtomicU8, //PayloadType,
49     kind: AtomicU8,         //RTPCodecType,
50     ssrc: AtomicU32,        //SSRC,
51     codec: SyncMutex<RTCRtpCodecParameters>,
52     pub(crate) params: SyncMutex<RTCRtpParameters>,
53     rid: String,
54 
55     media_engine: Arc<MediaEngine>,
56     interceptor: Arc<dyn Interceptor + Send + Sync>,
57 
58     handlers: Arc<Handlers>,
59 
60     receiver: Option<Weak<RTPReceiverInternal>>,
61     internal: Mutex<TrackRemoteInternal>,
62 }
63 
64 impl std::fmt::Debug for TrackRemote {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result65     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66         f.debug_struct("TrackRemote")
67             .field("id", &self.id)
68             .field("stream_id", &self.stream_id)
69             .field("payload_type", &self.payload_type)
70             .field("kind", &self.kind)
71             .field("ssrc", &self.ssrc)
72             .field("codec", &self.codec)
73             .field("params", &self.params)
74             .field("rid", &self.rid)
75             .finish()
76     }
77 }
78 
79 impl TrackRemote {
new( receive_mtu: usize, kind: RTPCodecType, ssrc: SSRC, rid: String, receiver: Weak<RTPReceiverInternal>, media_engine: Arc<MediaEngine>, interceptor: Arc<dyn Interceptor + Send + Sync>, ) -> Self80     pub(crate) fn new(
81         receive_mtu: usize,
82         kind: RTPCodecType,
83         ssrc: SSRC,
84         rid: String,
85         receiver: Weak<RTPReceiverInternal>,
86         media_engine: Arc<MediaEngine>,
87         interceptor: Arc<dyn Interceptor + Send + Sync>,
88     ) -> Self {
89         TrackRemote {
90             tid: TRACK_REMOTE_UNIQUE_ID.fetch_add(1, Ordering::SeqCst),
91             id: Default::default(),
92             stream_id: Default::default(),
93             receive_mtu,
94             payload_type: Default::default(),
95             kind: AtomicU8::new(kind as u8),
96             ssrc: AtomicU32::new(ssrc),
97             codec: Default::default(),
98             params: Default::default(),
99             rid,
100             receiver: Some(receiver),
101             media_engine,
102             interceptor,
103             handlers: Default::default(),
104 
105             internal: Default::default(),
106         }
107     }
108 
tid(&self) -> usize109     pub fn tid(&self) -> usize {
110         self.tid
111     }
112 
113     /// id is the unique identifier for this Track. This should be unique for the
114     /// stream, but doesn't have to globally unique. A common example would be 'audio' or 'video'
115     /// and StreamID would be 'desktop' or 'webcam'
id(&self) -> String116     pub fn id(&self) -> String {
117         let id = self.id.lock();
118         id.clone()
119     }
120 
set_id(&self, s: String)121     pub fn set_id(&self, s: String) {
122         let mut id = self.id.lock();
123         *id = s;
124     }
125 
126     /// stream_id is the group this track belongs too. This must be unique
stream_id(&self) -> String127     pub fn stream_id(&self) -> String {
128         let stream_id = self.stream_id.lock();
129         stream_id.clone()
130     }
131 
set_stream_id(&self, s: String)132     pub fn set_stream_id(&self, s: String) {
133         let mut stream_id = self.stream_id.lock();
134         *stream_id = s;
135     }
136 
137     /// rid gets the RTP Stream ID of this Track
138     /// With Simulcast you will have multiple tracks with the same ID, but different RID values.
139     /// In many cases a TrackRemote will not have an RID, so it is important to assert it is non-zero
rid(&self) -> &str140     pub fn rid(&self) -> &str {
141         self.rid.as_str()
142     }
143 
144     /// payload_type gets the PayloadType of the track
payload_type(&self) -> PayloadType145     pub fn payload_type(&self) -> PayloadType {
146         self.payload_type.load(Ordering::SeqCst)
147     }
148 
set_payload_type(&self, payload_type: PayloadType)149     pub fn set_payload_type(&self, payload_type: PayloadType) {
150         self.payload_type.store(payload_type, Ordering::SeqCst);
151     }
152 
153     /// kind gets the Kind of the track
kind(&self) -> RTPCodecType154     pub fn kind(&self) -> RTPCodecType {
155         self.kind.load(Ordering::SeqCst).into()
156     }
157 
set_kind(&self, kind: RTPCodecType)158     pub fn set_kind(&self, kind: RTPCodecType) {
159         self.kind.store(kind as u8, Ordering::SeqCst);
160     }
161 
162     /// ssrc gets the SSRC of the track
ssrc(&self) -> SSRC163     pub fn ssrc(&self) -> SSRC {
164         self.ssrc.load(Ordering::SeqCst)
165     }
166 
set_ssrc(&self, ssrc: SSRC)167     pub fn set_ssrc(&self, ssrc: SSRC) {
168         self.ssrc.store(ssrc, Ordering::SeqCst);
169     }
170 
171     /// msid gets the Msid of the track
msid(&self) -> String172     pub fn msid(&self) -> String {
173         format!("{} {}", self.stream_id(), self.id())
174     }
175 
176     /// codec gets the Codec of the track
codec(&self) -> RTCRtpCodecParameters177     pub fn codec(&self) -> RTCRtpCodecParameters {
178         let codec = self.codec.lock();
179         codec.clone()
180     }
181 
set_codec(&self, codec: RTCRtpCodecParameters)182     pub fn set_codec(&self, codec: RTCRtpCodecParameters) {
183         let mut c = self.codec.lock();
184         *c = codec;
185     }
186 
params(&self) -> RTCRtpParameters187     pub fn params(&self) -> RTCRtpParameters {
188         let p = self.params.lock();
189         p.clone()
190     }
191 
set_params(&self, params: RTCRtpParameters)192     pub fn set_params(&self, params: RTCRtpParameters) {
193         let mut p = self.params.lock();
194         *p = params;
195     }
196 
onmute<F>(&self, handler: F) where F: FnMut() -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> + Send + 'static + Sync,197     pub fn onmute<F>(&self, handler: F)
198     where
199         F: FnMut() -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> + Send + 'static + Sync,
200     {
201         self.handlers
202             .on_mute
203             .store(Some(Arc::new(Mutex::new(Box::new(handler)))));
204     }
205 
onunmute<F>(&self, handler: F) where F: FnMut() -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> + Send + 'static + Sync,206     pub fn onunmute<F>(&self, handler: F)
207     where
208         F: FnMut() -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> + Send + 'static + Sync,
209     {
210         self.handlers
211             .on_unmute
212             .store(Some(Arc::new(Mutex::new(Box::new(handler)))));
213     }
214 
215     /// Reads data from the track.
216     ///
217     /// **Cancel Safety:** This method is not cancel safe. Dropping the resulting [`Future`] before
218     /// it returns [`Poll::Ready`] will cause data loss.
read(&self, b: &mut [u8]) -> Result<(usize, Attributes)>219     pub async fn read(&self, b: &mut [u8]) -> Result<(usize, Attributes)> {
220         {
221             // Internal lock scope
222             let mut internal = self.internal.lock().await;
223             if let Some((data, attributes)) = internal.peeked.pop_front() {
224                 let n = std::cmp::min(b.len(), data.len());
225                 b[..n].copy_from_slice(&data[..n]);
226                 self.check_and_update_track(&b[..n]).await?;
227 
228                 return Ok((n, attributes));
229             }
230         };
231 
232         let receiver = match self.receiver.as_ref().and_then(|r| r.upgrade()) {
233             Some(r) => r,
234             None => return Err(Error::ErrRTPReceiverNil),
235         };
236 
237         let (n, attributes) = receiver.read_rtp(b, self.tid).await?;
238         self.check_and_update_track(&b[..n]).await?;
239         Ok((n, attributes))
240     }
241 
242     /// check_and_update_track checks payloadType for every incoming packet
243     /// once a different payloadType is detected the track will be updated
check_and_update_track(&self, b: &[u8]) -> Result<()>244     pub(crate) async fn check_and_update_track(&self, b: &[u8]) -> Result<()> {
245         // NOTE: This method MUST not attempt to lock `Self::internal`, doing so will deadlock.
246         if b.len() < 2 {
247             return Err(Error::ErrRTPTooShort);
248         }
249 
250         let payload_type = b[1] & RTP_PAYLOAD_TYPE_BITMASK;
251         if payload_type != self.payload_type() {
252             let p = self
253                 .media_engine
254                 .get_rtp_parameters_by_payload_type(payload_type)
255                 .await?;
256 
257             if let Some(receiver) = &self.receiver {
258                 if let Some(receiver) = receiver.upgrade() {
259                     self.kind.store(receiver.kind as u8, Ordering::SeqCst);
260                 }
261             }
262             self.payload_type.store(payload_type, Ordering::SeqCst);
263             {
264                 let mut codec = self.codec.lock();
265                 *codec = if let Some(codec) = p.codecs.first() {
266                     codec.clone()
267                 } else {
268                     return Err(Error::ErrCodecNotFound);
269                 };
270             }
271             {
272                 let mut params = self.params.lock();
273                 *params = p;
274             }
275         }
276 
277         Ok(())
278     }
279 
280     /// read_rtp is a convenience method that wraps Read and unmarshals for you.
read_rtp(&self) -> Result<(rtp::packet::Packet, Attributes)>281     pub async fn read_rtp(&self) -> Result<(rtp::packet::Packet, Attributes)> {
282         let mut b = vec![0u8; self.receive_mtu];
283         let (n, attributes) = self.read(&mut b).await?;
284 
285         let mut buf = &b[..n];
286         let r = rtp::packet::Packet::unmarshal(&mut buf)?;
287         Ok((r, attributes))
288     }
289 
290     /// peek is like Read, but it doesn't discard the packet read
peek(&self, b: &mut [u8]) -> Result<(usize, Attributes)>291     pub(crate) async fn peek(&self, b: &mut [u8]) -> Result<(usize, Attributes)> {
292         let (n, a) = self.read(b).await?;
293 
294         // this might overwrite data if somebody peeked between the Read
295         // and us getting the lock.  Oh well, we'll just drop a packet in
296         // that case.
297         let mut data = BytesMut::new();
298         data.extend(b[..n].to_vec());
299         {
300             let mut internal = self.internal.lock().await;
301             internal.peeked.push_back((data.freeze(), a.clone()));
302         }
303         Ok((n, a))
304     }
305 
306     /// Set the initially peeked data for this track.
307     ///
308     /// This is useful when a track is first created to populate data read from the track in the
309     /// process of identifying the track as part of simulcast probing. Using this during other
310     /// parts of the track's lifecycle is probably an error.
prepopulate_peeked_data(&self, data: VecDeque<(Bytes, Attributes)>)311     pub(crate) async fn prepopulate_peeked_data(&self, data: VecDeque<(Bytes, Attributes)>) {
312         let mut internal = self.internal.lock().await;
313         internal.peeked = data;
314     }
315 
fire_onmute(&self)316     pub(crate) async fn fire_onmute(&self) {
317         let on_mute = self.handlers.on_mute.load();
318 
319         if let Some(f) = on_mute.as_ref() {
320             (f.lock().await)().await
321         };
322     }
323 
fire_onunmute(&self)324     pub(crate) async fn fire_onunmute(&self) {
325         let on_unmute = self.handlers.on_unmute.load();
326 
327         if let Some(f) = on_unmute.as_ref() {
328             (f.lock().await)().await
329         };
330     }
331 }
332