1 use crate::api::media_engine::MediaEngine; 2 use crate::error::{Error, Result}; 3 use crate::rtp_transceiver::rtp_codec::{RTCRtpCodecParameters, RTCRtpParameters, RTPCodecType}; 4 use crate::rtp_transceiver::{PayloadType, SSRC}; 5 6 use crate::rtp_transceiver::rtp_receiver::RTPReceiverInternal; 7 8 use crate::track::RTP_PAYLOAD_TYPE_BITMASK; 9 use arc_swap::ArcSwapOption; 10 use bytes::{Bytes, BytesMut}; 11 use interceptor::{Attributes, Interceptor}; 12 use std::collections::VecDeque; 13 use std::future::Future; 14 use std::pin::Pin; 15 use std::sync::atomic::{AtomicU32, AtomicU8, AtomicUsize, Ordering}; 16 use std::sync::{Arc, Weak}; 17 use tokio::sync::Mutex; 18 use util::sync::Mutex as SyncMutex; 19 20 use util::Unmarshal; 21 22 lazy_static! { 23 static ref TRACK_REMOTE_UNIQUE_ID: AtomicUsize = AtomicUsize::new(0); 24 } 25 pub type OnMuteHdlrFn = Box< 26 dyn (FnMut() -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>) + Send + Sync + 'static, 27 >; 28 29 #[derive(Default)] 30 struct Handlers { 31 on_mute: ArcSwapOption<Mutex<OnMuteHdlrFn>>, 32 on_unmute: ArcSwapOption<Mutex<OnMuteHdlrFn>>, 33 } 34 35 #[derive(Default)] 36 struct TrackRemoteInternal { 37 peeked: VecDeque<(Bytes, Attributes)>, 38 } 39 40 /// TrackRemote represents a single inbound source of media 41 pub struct TrackRemote { 42 tid: usize, 43 44 id: SyncMutex<String>, 45 stream_id: SyncMutex<String>, 46 47 receive_mtu: usize, 48 payload_type: AtomicU8, //PayloadType, 49 kind: AtomicU8, //RTPCodecType, 50 ssrc: AtomicU32, //SSRC, 51 codec: SyncMutex<RTCRtpCodecParameters>, 52 pub(crate) params: SyncMutex<RTCRtpParameters>, 53 rid: String, 54 55 media_engine: Arc<MediaEngine>, 56 interceptor: Arc<dyn Interceptor + Send + Sync>, 57 58 handlers: Arc<Handlers>, 59 60 receiver: Option<Weak<RTPReceiverInternal>>, 61 internal: Mutex<TrackRemoteInternal>, 62 } 63 64 impl std::fmt::Debug for TrackRemote { fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 66 f.debug_struct("TrackRemote") 67 .field("id", &self.id) 68 .field("stream_id", &self.stream_id) 69 .field("payload_type", &self.payload_type) 70 .field("kind", &self.kind) 71 .field("ssrc", &self.ssrc) 72 .field("codec", &self.codec) 73 .field("params", &self.params) 74 .field("rid", &self.rid) 75 .finish() 76 } 77 } 78 79 impl TrackRemote { new( receive_mtu: usize, kind: RTPCodecType, ssrc: SSRC, rid: String, receiver: Weak<RTPReceiverInternal>, media_engine: Arc<MediaEngine>, interceptor: Arc<dyn Interceptor + Send + Sync>, ) -> Self80 pub(crate) fn new( 81 receive_mtu: usize, 82 kind: RTPCodecType, 83 ssrc: SSRC, 84 rid: String, 85 receiver: Weak<RTPReceiverInternal>, 86 media_engine: Arc<MediaEngine>, 87 interceptor: Arc<dyn Interceptor + Send + Sync>, 88 ) -> Self { 89 TrackRemote { 90 tid: TRACK_REMOTE_UNIQUE_ID.fetch_add(1, Ordering::SeqCst), 91 id: Default::default(), 92 stream_id: Default::default(), 93 receive_mtu, 94 payload_type: Default::default(), 95 kind: AtomicU8::new(kind as u8), 96 ssrc: AtomicU32::new(ssrc), 97 codec: Default::default(), 98 params: Default::default(), 99 rid, 100 receiver: Some(receiver), 101 media_engine, 102 interceptor, 103 handlers: Default::default(), 104 105 internal: Default::default(), 106 } 107 } 108 tid(&self) -> usize109 pub fn tid(&self) -> usize { 110 self.tid 111 } 112 113 /// id is the unique identifier for this Track. This should be unique for the 114 /// stream, but doesn't have to globally unique. A common example would be 'audio' or 'video' 115 /// and StreamID would be 'desktop' or 'webcam' id(&self) -> String116 pub fn id(&self) -> String { 117 let id = self.id.lock(); 118 id.clone() 119 } 120 set_id(&self, s: String)121 pub fn set_id(&self, s: String) { 122 let mut id = self.id.lock(); 123 *id = s; 124 } 125 126 /// stream_id is the group this track belongs too. This must be unique stream_id(&self) -> String127 pub fn stream_id(&self) -> String { 128 let stream_id = self.stream_id.lock(); 129 stream_id.clone() 130 } 131 set_stream_id(&self, s: String)132 pub fn set_stream_id(&self, s: String) { 133 let mut stream_id = self.stream_id.lock(); 134 *stream_id = s; 135 } 136 137 /// rid gets the RTP Stream ID of this Track 138 /// With Simulcast you will have multiple tracks with the same ID, but different RID values. 139 /// In many cases a TrackRemote will not have an RID, so it is important to assert it is non-zero rid(&self) -> &str140 pub fn rid(&self) -> &str { 141 self.rid.as_str() 142 } 143 144 /// payload_type gets the PayloadType of the track payload_type(&self) -> PayloadType145 pub fn payload_type(&self) -> PayloadType { 146 self.payload_type.load(Ordering::SeqCst) 147 } 148 set_payload_type(&self, payload_type: PayloadType)149 pub fn set_payload_type(&self, payload_type: PayloadType) { 150 self.payload_type.store(payload_type, Ordering::SeqCst); 151 } 152 153 /// kind gets the Kind of the track kind(&self) -> RTPCodecType154 pub fn kind(&self) -> RTPCodecType { 155 self.kind.load(Ordering::SeqCst).into() 156 } 157 set_kind(&self, kind: RTPCodecType)158 pub fn set_kind(&self, kind: RTPCodecType) { 159 self.kind.store(kind as u8, Ordering::SeqCst); 160 } 161 162 /// ssrc gets the SSRC of the track ssrc(&self) -> SSRC163 pub fn ssrc(&self) -> SSRC { 164 self.ssrc.load(Ordering::SeqCst) 165 } 166 set_ssrc(&self, ssrc: SSRC)167 pub fn set_ssrc(&self, ssrc: SSRC) { 168 self.ssrc.store(ssrc, Ordering::SeqCst); 169 } 170 171 /// msid gets the Msid of the track msid(&self) -> String172 pub fn msid(&self) -> String { 173 format!("{} {}", self.stream_id(), self.id()) 174 } 175 176 /// codec gets the Codec of the track codec(&self) -> RTCRtpCodecParameters177 pub fn codec(&self) -> RTCRtpCodecParameters { 178 let codec = self.codec.lock(); 179 codec.clone() 180 } 181 set_codec(&self, codec: RTCRtpCodecParameters)182 pub fn set_codec(&self, codec: RTCRtpCodecParameters) { 183 let mut c = self.codec.lock(); 184 *c = codec; 185 } 186 params(&self) -> RTCRtpParameters187 pub fn params(&self) -> RTCRtpParameters { 188 let p = self.params.lock(); 189 p.clone() 190 } 191 set_params(&self, params: RTCRtpParameters)192 pub fn set_params(&self, params: RTCRtpParameters) { 193 let mut p = self.params.lock(); 194 *p = params; 195 } 196 onmute<F>(&self, handler: F) where F: FnMut() -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> + Send + 'static + Sync,197 pub fn onmute<F>(&self, handler: F) 198 where 199 F: FnMut() -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> + Send + 'static + Sync, 200 { 201 self.handlers 202 .on_mute 203 .store(Some(Arc::new(Mutex::new(Box::new(handler))))); 204 } 205 onunmute<F>(&self, handler: F) where F: FnMut() -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> + Send + 'static + Sync,206 pub fn onunmute<F>(&self, handler: F) 207 where 208 F: FnMut() -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> + Send + 'static + Sync, 209 { 210 self.handlers 211 .on_unmute 212 .store(Some(Arc::new(Mutex::new(Box::new(handler))))); 213 } 214 215 /// Reads data from the track. 216 /// 217 /// **Cancel Safety:** This method is not cancel safe. Dropping the resulting [`Future`] before 218 /// it returns [`Poll::Ready`] will cause data loss. read(&self, b: &mut [u8]) -> Result<(usize, Attributes)>219 pub async fn read(&self, b: &mut [u8]) -> Result<(usize, Attributes)> { 220 { 221 // Internal lock scope 222 let mut internal = self.internal.lock().await; 223 if let Some((data, attributes)) = internal.peeked.pop_front() { 224 let n = std::cmp::min(b.len(), data.len()); 225 b[..n].copy_from_slice(&data[..n]); 226 self.check_and_update_track(&b[..n]).await?; 227 228 return Ok((n, attributes)); 229 } 230 }; 231 232 let receiver = match self.receiver.as_ref().and_then(|r| r.upgrade()) { 233 Some(r) => r, 234 None => return Err(Error::ErrRTPReceiverNil), 235 }; 236 237 let (n, attributes) = receiver.read_rtp(b, self.tid).await?; 238 self.check_and_update_track(&b[..n]).await?; 239 Ok((n, attributes)) 240 } 241 242 /// check_and_update_track checks payloadType for every incoming packet 243 /// once a different payloadType is detected the track will be updated check_and_update_track(&self, b: &[u8]) -> Result<()>244 pub(crate) async fn check_and_update_track(&self, b: &[u8]) -> Result<()> { 245 // NOTE: This method MUST not attempt to lock `Self::internal`, doing so will deadlock. 246 if b.len() < 2 { 247 return Err(Error::ErrRTPTooShort); 248 } 249 250 let payload_type = b[1] & RTP_PAYLOAD_TYPE_BITMASK; 251 if payload_type != self.payload_type() { 252 let p = self 253 .media_engine 254 .get_rtp_parameters_by_payload_type(payload_type) 255 .await?; 256 257 if let Some(receiver) = &self.receiver { 258 if let Some(receiver) = receiver.upgrade() { 259 self.kind.store(receiver.kind as u8, Ordering::SeqCst); 260 } 261 } 262 self.payload_type.store(payload_type, Ordering::SeqCst); 263 { 264 let mut codec = self.codec.lock(); 265 *codec = if let Some(codec) = p.codecs.first() { 266 codec.clone() 267 } else { 268 return Err(Error::ErrCodecNotFound); 269 }; 270 } 271 { 272 let mut params = self.params.lock(); 273 *params = p; 274 } 275 } 276 277 Ok(()) 278 } 279 280 /// read_rtp is a convenience method that wraps Read and unmarshals for you. read_rtp(&self) -> Result<(rtp::packet::Packet, Attributes)>281 pub async fn read_rtp(&self) -> Result<(rtp::packet::Packet, Attributes)> { 282 let mut b = vec![0u8; self.receive_mtu]; 283 let (n, attributes) = self.read(&mut b).await?; 284 285 let mut buf = &b[..n]; 286 let r = rtp::packet::Packet::unmarshal(&mut buf)?; 287 Ok((r, attributes)) 288 } 289 290 /// peek is like Read, but it doesn't discard the packet read peek(&self, b: &mut [u8]) -> Result<(usize, Attributes)>291 pub(crate) async fn peek(&self, b: &mut [u8]) -> Result<(usize, Attributes)> { 292 let (n, a) = self.read(b).await?; 293 294 // this might overwrite data if somebody peeked between the Read 295 // and us getting the lock. Oh well, we'll just drop a packet in 296 // that case. 297 let mut data = BytesMut::new(); 298 data.extend(b[..n].to_vec()); 299 { 300 let mut internal = self.internal.lock().await; 301 internal.peeked.push_back((data.freeze(), a.clone())); 302 } 303 Ok((n, a)) 304 } 305 306 /// Set the initially peeked data for this track. 307 /// 308 /// This is useful when a track is first created to populate data read from the track in the 309 /// process of identifying the track as part of simulcast probing. Using this during other 310 /// parts of the track's lifecycle is probably an error. prepopulate_peeked_data(&self, data: VecDeque<(Bytes, Attributes)>)311 pub(crate) async fn prepopulate_peeked_data(&self, data: VecDeque<(Bytes, Attributes)>) { 312 let mut internal = self.internal.lock().await; 313 internal.peeked = data; 314 } 315 fire_onmute(&self)316 pub(crate) async fn fire_onmute(&self) { 317 let on_mute = self.handlers.on_mute.load(); 318 319 if let Some(f) = on_mute.as_ref() { 320 (f.lock().await)().await 321 }; 322 } 323 fire_onunmute(&self)324 pub(crate) async fn fire_onunmute(&self) { 325 let on_unmute = self.handlers.on_unmute.load(); 326 327 if let Some(f) = on_unmute.as_ref() { 328 (f.lock().await)().await 329 }; 330 } 331 } 332