1 #[cfg(test)] 2 mod rtp_sender_test; 3 4 use crate::api::media_engine::MediaEngine; 5 use crate::dtls_transport::RTCDtlsTransport; 6 use crate::error::{Error, Result}; 7 use crate::rtp_transceiver::rtp_codec::{RTCRtpCodecParameters, RTPCodecType}; 8 use crate::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection; 9 use crate::rtp_transceiver::srtp_writer_future::SrtpWriterFuture; 10 use crate::rtp_transceiver::{ 11 create_stream_info, PayloadType, RTCRtpEncodingParameters, RTCRtpSendParameters, 12 RTCRtpTransceiver, SSRC, 13 }; 14 use crate::track::track_local::{ 15 InterceptorToTrackLocalWriter, TrackLocal, TrackLocalContext, TrackLocalWriter, 16 }; 17 18 use ice::rand::generate_crypto_random_string; 19 use interceptor::stream_info::StreamInfo; 20 use interceptor::{Attributes, Interceptor, RTCPReader, RTPWriter}; 21 use std::sync::atomic::{AtomicBool, Ordering}; 22 use std::sync::{Arc, Weak}; 23 use tokio::sync::{mpsc, Mutex, Notify}; 24 use util::sync::Mutex as SyncMutex; 25 26 use super::srtp_writer_future::SequenceTransformer; 27 28 pub(crate) struct RTPSenderInternal { 29 pub(crate) send_called_rx: Mutex<mpsc::Receiver<()>>, 30 pub(crate) stop_called_rx: Arc<Notify>, 31 pub(crate) stop_called_signal: Arc<AtomicBool>, 32 pub(crate) rtcp_interceptor: Mutex<Option<Arc<dyn RTCPReader + Send + Sync>>>, 33 } 34 35 impl RTPSenderInternal { 36 /// read reads incoming RTCP for this RTPReceiver read(&self, b: &mut [u8]) -> Result<(usize, Attributes)>37 async fn read(&self, b: &mut [u8]) -> Result<(usize, Attributes)> { 38 let mut send_called_rx = self.send_called_rx.lock().await; 39 40 tokio::select! { 41 _ = send_called_rx.recv() =>{ 42 let rtcp_interceptor = { 43 let rtcp_interceptor = self.rtcp_interceptor.lock().await; 44 rtcp_interceptor.clone() 45 }; 46 if let Some(rtcp_interceptor) = rtcp_interceptor{ 47 let a = Attributes::new(); 48 tokio::select! { 49 _ = self.stop_called_rx.notified() => { 50 Err(Error::ErrClosedPipe) 51 } 52 result = rtcp_interceptor.read(b, &a) => { 53 Ok(result?) 54 } 55 } 56 }else{ 57 Err(Error::ErrInterceptorNotBind) 58 } 59 } 60 _ = self.stop_called_rx.notified() =>{ 61 Err(Error::ErrClosedPipe) 62 } 63 } 64 } 65 66 /// read_rtcp is a convenience method that wraps Read and unmarshals for you. read_rtcp( &self, receive_mtu: usize, ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)>67 async fn read_rtcp( 68 &self, 69 receive_mtu: usize, 70 ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)> { 71 let mut b = vec![0u8; receive_mtu]; 72 let (n, attributes) = self.read(&mut b).await?; 73 74 let mut buf = &b[..n]; 75 let pkts = rtcp::packet::unmarshal(&mut buf)?; 76 77 Ok((pkts, attributes)) 78 } 79 } 80 81 /// RTPSender allows an application to control how a given Track is encoded and transmitted to a remote peer 82 pub struct RTCRtpSender { 83 pub(crate) track: Mutex<Option<Arc<dyn TrackLocal + Send + Sync>>>, 84 85 pub(crate) srtp_stream: Arc<SrtpWriterFuture>, 86 pub(crate) stream_info: Mutex<StreamInfo>, 87 seq_trans: Arc<SequenceTransformer>, 88 89 pub(crate) context: Mutex<TrackLocalContext>, 90 91 pub(crate) transport: Arc<RTCDtlsTransport>, 92 93 pub(crate) payload_type: PayloadType, 94 pub(crate) ssrc: SSRC, 95 receive_mtu: usize, 96 97 /// a transceiver sender since we can just check the 98 /// transceiver negotiation status 99 pub(crate) negotiated: AtomicBool, 100 101 pub(crate) media_engine: Arc<MediaEngine>, 102 pub(crate) interceptor: Arc<dyn Interceptor + Send + Sync>, 103 104 pub(crate) id: String, 105 106 /// The id of the initial track, even if we later change to a different 107 /// track id should be use when negotiating. 108 pub(crate) initial_track_id: std::sync::Mutex<Option<String>>, 109 /// AssociatedMediaStreamIds from the WebRTC specifcations 110 pub(crate) associated_media_stream_ids: std::sync::Mutex<Vec<String>>, 111 112 rtp_transceiver: SyncMutex<Option<Weak<RTCRtpTransceiver>>>, 113 114 send_called_tx: SyncMutex<Option<mpsc::Sender<()>>>, 115 stop_called_tx: Arc<Notify>, 116 stop_called_signal: Arc<AtomicBool>, 117 118 pub(crate) paused: Arc<AtomicBool>, 119 120 internal: Arc<RTPSenderInternal>, 121 } 122 123 impl std::fmt::Debug for RTCRtpSender { fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 125 f.debug_struct("RTCRtpSender") 126 .field("id", &self.id) 127 .finish() 128 } 129 } 130 131 impl RTCRtpSender { new( receive_mtu: usize, track: Option<Arc<dyn TrackLocal + Send + Sync>>, transport: Arc<RTCDtlsTransport>, media_engine: Arc<MediaEngine>, interceptor: Arc<dyn Interceptor + Send + Sync>, start_paused: bool, ) -> Self132 pub async fn new( 133 receive_mtu: usize, 134 track: Option<Arc<dyn TrackLocal + Send + Sync>>, 135 transport: Arc<RTCDtlsTransport>, 136 media_engine: Arc<MediaEngine>, 137 interceptor: Arc<dyn Interceptor + Send + Sync>, 138 start_paused: bool, 139 ) -> Self { 140 let id = generate_crypto_random_string( 141 32, 142 b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ", 143 ); 144 let (send_called_tx, send_called_rx) = mpsc::channel(1); 145 let stop_called_tx = Arc::new(Notify::new()); 146 let stop_called_rx = stop_called_tx.clone(); 147 let ssrc = rand::random::<u32>(); 148 let stop_called_signal = Arc::new(AtomicBool::new(false)); 149 150 let internal = Arc::new(RTPSenderInternal { 151 send_called_rx: Mutex::new(send_called_rx), 152 stop_called_rx, 153 stop_called_signal: Arc::clone(&stop_called_signal), 154 rtcp_interceptor: Mutex::new(None), 155 }); 156 157 let seq_trans = Arc::new(SequenceTransformer::new()); 158 let srtp_stream = Arc::new(SrtpWriterFuture { 159 closed: AtomicBool::new(false), 160 ssrc, 161 rtp_sender: Arc::downgrade(&internal), 162 rtp_transport: Arc::clone(&transport), 163 rtcp_read_stream: Mutex::new(None), 164 rtp_write_session: Mutex::new(None), 165 seq_trans: Arc::clone(&seq_trans), 166 }); 167 168 let srtp_rtcp_reader = Arc::clone(&srtp_stream) as Arc<dyn RTCPReader + Send + Sync>; 169 let rtcp_interceptor = interceptor.bind_rtcp_reader(srtp_rtcp_reader).await; 170 { 171 let mut internal_rtcp_interceptor = internal.rtcp_interceptor.lock().await; 172 *internal_rtcp_interceptor = Some(rtcp_interceptor); 173 } 174 175 let stream_ids = track 176 .as_ref() 177 .map(|track| vec![track.stream_id().to_string()]) 178 .unwrap_or_default(); 179 Self { 180 track: Mutex::new(track), 181 182 srtp_stream, 183 stream_info: Mutex::new(StreamInfo::default()), 184 seq_trans, 185 186 context: Mutex::new(TrackLocalContext::default()), 187 transport, 188 189 payload_type: 0, 190 ssrc, 191 receive_mtu, 192 193 negotiated: AtomicBool::new(false), 194 195 media_engine, 196 interceptor, 197 198 id, 199 initial_track_id: std::sync::Mutex::new(None), 200 associated_media_stream_ids: std::sync::Mutex::new(stream_ids), 201 202 rtp_transceiver: SyncMutex::new(None), 203 204 send_called_tx: SyncMutex::new(Some(send_called_tx)), 205 stop_called_tx, 206 stop_called_signal, 207 208 paused: Arc::new(AtomicBool::new(start_paused)), 209 210 internal, 211 } 212 } 213 is_negotiated(&self) -> bool214 pub(crate) fn is_negotiated(&self) -> bool { 215 self.negotiated.load(Ordering::SeqCst) 216 } 217 set_negotiated(&self)218 pub(crate) fn set_negotiated(&self) { 219 self.negotiated.store(true, Ordering::SeqCst); 220 } 221 set_rtp_transceiver(&self, rtp_transceiver: Option<Weak<RTCRtpTransceiver>>)222 pub(crate) fn set_rtp_transceiver(&self, rtp_transceiver: Option<Weak<RTCRtpTransceiver>>) { 223 if let Some(t) = rtp_transceiver.as_ref().and_then(|t| t.upgrade()) { 224 self.set_paused(!t.direction().has_send()); 225 } 226 let mut tr = self.rtp_transceiver.lock(); 227 *tr = rtp_transceiver; 228 } 229 set_paused(&self, paused: bool)230 pub(crate) fn set_paused(&self, paused: bool) { 231 self.paused.store(paused, Ordering::SeqCst); 232 } 233 234 /// transport returns the currently-configured DTLSTransport 235 /// if one has not yet been configured transport(&self) -> Arc<RTCDtlsTransport>236 pub fn transport(&self) -> Arc<RTCDtlsTransport> { 237 Arc::clone(&self.transport) 238 } 239 240 /// get_parameters describes the current configuration for the encoding and 241 /// transmission of media on the sender's track. get_parameters(&self) -> RTCRtpSendParameters242 pub async fn get_parameters(&self) -> RTCRtpSendParameters { 243 let kind = { 244 let track = self.track.lock().await; 245 if let Some(t) = &*track { 246 t.kind() 247 } else { 248 RTPCodecType::default() 249 } 250 }; 251 252 let mut send_parameters = { 253 RTCRtpSendParameters { 254 rtp_parameters: self 255 .media_engine 256 .get_rtp_parameters_by_kind(kind, RTCRtpTransceiverDirection::Sendonly), 257 encodings: vec![RTCRtpEncodingParameters { 258 ssrc: self.ssrc, 259 payload_type: self.payload_type, 260 ..Default::default() 261 }], 262 } 263 }; 264 265 let codecs = { 266 let tr = self.rtp_transceiver.lock().clone(); 267 if let Some(t) = &tr { 268 if let Some(t) = t.upgrade() { 269 t.get_codecs().await 270 } else { 271 self.media_engine.get_codecs_by_kind(kind) 272 } 273 } else { 274 self.media_engine.get_codecs_by_kind(kind) 275 } 276 }; 277 send_parameters.rtp_parameters.codecs = codecs; 278 279 send_parameters 280 } 281 282 /// track returns the RTCRtpTransceiver track, or nil track(&self) -> Option<Arc<dyn TrackLocal + Send + Sync>>283 pub async fn track(&self) -> Option<Arc<dyn TrackLocal + Send + Sync>> { 284 let track = self.track.lock().await; 285 track.clone() 286 } 287 288 /// replace_track replaces the track currently being used as the sender's source with a new TrackLocal. 289 /// The new track must be of the same media kind (audio, video, etc) and switching the track should not 290 /// require negotiation. replace_track( &self, track: Option<Arc<dyn TrackLocal + Send + Sync>>, ) -> Result<()>291 pub async fn replace_track( 292 &self, 293 track: Option<Arc<dyn TrackLocal + Send + Sync>>, 294 ) -> Result<()> { 295 if let Some(t) = &track { 296 let tr = self.rtp_transceiver.lock(); 297 if let Some(r) = &*tr { 298 if let Some(r) = r.upgrade() { 299 if r.kind != t.kind() { 300 return Err(Error::ErrRTPSenderNewTrackHasIncorrectKind); 301 } 302 } else { 303 //TODO: what about None arc? 304 } 305 } else { 306 //TODO: what about None tr? 307 } 308 } 309 310 if self.has_sent() { 311 let t = { 312 let t = self.track.lock().await; 313 t.clone() 314 }; 315 if let Some(t) = t { 316 let context = self.context.lock().await; 317 t.unbind(&context).await?; 318 } 319 } 320 321 if !self.has_sent() || track.is_none() { 322 let mut t = self.track.lock().await; 323 *t = track; 324 return Ok(()); 325 } 326 327 let context = { 328 let context = self.context.lock().await; 329 context.clone() 330 }; 331 332 let result = if let Some(t) = &track { 333 self.seq_trans.reset_offset(); 334 335 let new_context = TrackLocalContext { 336 id: context.id.clone(), 337 params: self 338 .media_engine 339 .get_rtp_parameters_by_kind(t.kind(), RTCRtpTransceiverDirection::Sendonly), 340 ssrc: context.ssrc, 341 write_stream: context.write_stream.clone(), 342 paused: self.paused.clone(), 343 }; 344 345 t.bind(&new_context).await 346 } else { 347 Err(Error::ErrRTPSenderTrackNil) 348 }; 349 350 match result { 351 Err(err) => { 352 // Re-bind the original track 353 let track = self.track.lock().await; 354 if let Some(t) = &*track { 355 t.bind(&context).await?; 356 } 357 358 Err(err) 359 } 360 Ok(codec) => { 361 // Codec has changed 362 if self.payload_type != codec.payload_type { 363 let mut context = self.context.lock().await; 364 context.params.codecs = vec![codec]; 365 } 366 367 { 368 let mut t = self.track.lock().await; 369 *t = track; 370 } 371 372 Ok(()) 373 } 374 } 375 } 376 377 /// send Attempts to set the parameters controlling the sending of media. send(&self, parameters: &RTCRtpSendParameters) -> Result<()>378 pub async fn send(&self, parameters: &RTCRtpSendParameters) -> Result<()> { 379 if self.has_sent() { 380 return Err(Error::ErrRTPSenderSendAlreadyCalled); 381 } 382 383 let write_stream = Arc::new(InterceptorToTrackLocalWriter::new(self.paused.clone())); 384 let (context, stream_info) = { 385 let track = self.track.lock().await; 386 let mut context = TrackLocalContext { 387 id: self.id.clone(), 388 params: self.media_engine.get_rtp_parameters_by_kind( 389 if let Some(t) = &*track { 390 t.kind() 391 } else { 392 RTPCodecType::default() 393 }, 394 RTCRtpTransceiverDirection::Sendonly, 395 ), 396 ssrc: parameters.encodings[0].ssrc, 397 write_stream: Some( 398 Arc::clone(&write_stream) as Arc<dyn TrackLocalWriter + Send + Sync> 399 ), 400 paused: self.paused.clone(), 401 }; 402 403 let codec = if let Some(t) = &*track { 404 t.bind(&context).await? 405 } else { 406 RTCRtpCodecParameters::default() 407 }; 408 let payload_type = codec.payload_type; 409 let capability = codec.capability.clone(); 410 context.params.codecs = vec![codec]; 411 let stream_info = create_stream_info( 412 self.id.clone(), 413 parameters.encodings[0].ssrc, 414 payload_type, 415 capability, 416 ¶meters.rtp_parameters.header_extensions, 417 ); 418 419 (context, stream_info) 420 }; 421 422 let srtp_rtp_writer = Arc::clone(&self.srtp_stream) as Arc<dyn RTPWriter + Send + Sync>; 423 let rtp_interceptor = self 424 .interceptor 425 .bind_local_stream(&stream_info, srtp_rtp_writer) 426 .await; 427 { 428 let mut interceptor_rtp_writer = write_stream.interceptor_rtp_writer.lock().await; 429 *interceptor_rtp_writer = Some(rtp_interceptor); 430 } 431 432 { 433 let mut ctx = self.context.lock().await; 434 *ctx = context; 435 } 436 { 437 let mut si = self.stream_info.lock().await; 438 *si = stream_info; 439 } 440 441 { 442 let mut send_called_tx = self.send_called_tx.lock(); 443 send_called_tx.take(); 444 } 445 446 Ok(()) 447 } 448 449 /// stop irreversibly stops the RTPSender stop(&self) -> Result<()>450 pub async fn stop(&self) -> Result<()> { 451 if self.stop_called_signal.load(Ordering::SeqCst) { 452 return Ok(()); 453 } 454 self.stop_called_signal.store(true, Ordering::SeqCst); 455 self.stop_called_tx.notify_waiters(); 456 457 if !self.has_sent() { 458 return Ok(()); 459 } 460 461 self.replace_track(None).await?; 462 463 { 464 let stream_info = self.stream_info.lock().await; 465 self.interceptor.unbind_local_stream(&stream_info).await; 466 } 467 468 self.srtp_stream.close().await 469 } 470 471 /// read reads incoming RTCP for this RTPReceiver read(&self, b: &mut [u8]) -> Result<(usize, Attributes)>472 pub async fn read(&self, b: &mut [u8]) -> Result<(usize, Attributes)> { 473 self.internal.read(b).await 474 } 475 476 /// read_rtcp is a convenience method that wraps Read and unmarshals for you. read_rtcp( &self, ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)>477 pub async fn read_rtcp( 478 &self, 479 ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)> { 480 self.internal.read_rtcp(self.receive_mtu).await 481 } 482 483 /// Enables overriding outgoing `RTP` packets' `sequence number`s. 484 /// 485 /// Must be called once before any data sent or never called at all. 486 /// 487 /// # Errors 488 /// 489 /// Errors if this [`RTCRtpSender`] has started to send data or sequence 490 /// transforming has been already enabled. enable_seq_transformer(&self) -> Result<()>491 pub fn enable_seq_transformer(&self) -> Result<()> { 492 self.seq_trans.enable() 493 } 494 495 /// has_sent tells if data has been ever sent for this instance has_sent(&self) -> bool496 pub(crate) fn has_sent(&self) -> bool { 497 let send_called_tx = self.send_called_tx.lock(); 498 send_called_tx.is_none() 499 } 500 501 /// has_stopped tells if stop has been called has_stopped(&self) -> bool502 pub(crate) async fn has_stopped(&self) -> bool { 503 self.stop_called_signal.load(Ordering::SeqCst) 504 } 505 initial_track_id(&self) -> Option<String>506 pub(crate) fn initial_track_id(&self) -> Option<String> { 507 let lock = self.initial_track_id.lock().unwrap(); 508 509 lock.clone() 510 } 511 set_initial_track_id(&self, id: String) -> Result<()>512 pub(crate) fn set_initial_track_id(&self, id: String) -> Result<()> { 513 let mut lock = self.initial_track_id.lock().unwrap(); 514 515 if lock.is_some() { 516 return Err(Error::ErrSenderInitialTrackIdAlreadySet); 517 } 518 519 *lock = Some(id); 520 521 Ok(()) 522 } 523 associate_media_stream_id(&self, id: String) -> bool524 pub(crate) fn associate_media_stream_id(&self, id: String) -> bool { 525 let mut lock = self.associated_media_stream_ids.lock().unwrap(); 526 527 if lock.contains(&id) { 528 return false; 529 } 530 531 lock.push(id); 532 533 true 534 } 535 associated_media_stream_ids(&self) -> Vec<String>536 pub(crate) fn associated_media_stream_ids(&self) -> Vec<String> { 537 let lock = self.associated_media_stream_ids.lock().unwrap(); 538 539 lock.clone() 540 } 541 } 542