1 #[cfg(test)] 2 mod track_local_static_test; 3 4 pub mod track_local_static_rtp; 5 pub mod track_local_static_sample; 6 7 use crate::error::{Error, Result}; 8 use crate::rtp_transceiver::rtp_codec::*; 9 use crate::rtp_transceiver::*; 10 11 use async_trait::async_trait; 12 use interceptor::{Attributes, RTPWriter}; 13 use std::any::Any; 14 use std::fmt; 15 use std::sync::atomic::{AtomicBool, Ordering}; 16 use std::sync::Arc; 17 use tokio::sync::Mutex; 18 use util::Unmarshal; 19 20 /// TrackLocalWriter is the Writer for outbound RTP Packets 21 #[async_trait] 22 pub trait TrackLocalWriter: fmt::Debug { 23 /// write_rtp encrypts a RTP packet and writes to the connection write_rtp(&self, p: &rtp::packet::Packet) -> Result<usize>24 async fn write_rtp(&self, p: &rtp::packet::Packet) -> Result<usize>; 25 26 /// write encrypts and writes a full RTP packet write(&self, b: &[u8]) -> Result<usize>27 async fn write(&self, b: &[u8]) -> Result<usize>; 28 } 29 30 /// TrackLocalContext is the Context passed when a TrackLocal has been Binded/Unbinded from a PeerConnection, and used 31 /// in Interceptors. 32 #[derive(Default, Debug, Clone)] 33 pub struct TrackLocalContext { 34 pub(crate) id: String, 35 pub(crate) params: RTCRtpParameters, 36 pub(crate) ssrc: SSRC, 37 pub(crate) write_stream: Option<Arc<dyn TrackLocalWriter + Send + Sync>>, 38 pub(crate) paused: Arc<AtomicBool>, 39 } 40 41 impl TrackLocalContext { 42 /// codec_parameters returns the negotiated RTPCodecParameters. These are the codecs supported by both 43 /// PeerConnections and the SSRC/PayloadTypes codec_parameters(&self) -> &[RTCRtpCodecParameters]44 pub fn codec_parameters(&self) -> &[RTCRtpCodecParameters] { 45 &self.params.codecs 46 } 47 48 /// header_extensions returns the negotiated RTPHeaderExtensionParameters. These are the header extensions supported by 49 /// both PeerConnections and the SSRC/PayloadTypes header_extensions(&self) -> &[RTCRtpHeaderExtensionParameters]50 pub fn header_extensions(&self) -> &[RTCRtpHeaderExtensionParameters] { 51 &self.params.header_extensions 52 } 53 54 /// ssrc requires the negotiated SSRC of this track 55 /// This track may have multiple if RTX is enabled ssrc(&self) -> SSRC56 pub fn ssrc(&self) -> SSRC { 57 self.ssrc 58 } 59 60 /// write_stream returns the write_stream for this TrackLocal. The implementer writes the outbound 61 /// media packets to it write_stream(&self) -> Option<Arc<dyn TrackLocalWriter + Send + Sync>>62 pub fn write_stream(&self) -> Option<Arc<dyn TrackLocalWriter + Send + Sync>> { 63 self.write_stream.clone() 64 } 65 66 /// id is a unique identifier that is used for both bind/unbind id(&self) -> String67 pub fn id(&self) -> String { 68 self.id.clone() 69 } 70 } 71 /// TrackLocal is an interface that controls how the user can send media 72 /// The user can provide their own TrackLocal implementations, or use 73 /// the implementations in pkg/media 74 #[async_trait] 75 pub trait TrackLocal { 76 /// bind should implement the way how the media data flows from the Track to the PeerConnection 77 /// This will be called internally after signaling is complete and the list of available 78 /// codecs has been determined bind(&self, t: &TrackLocalContext) -> Result<RTCRtpCodecParameters>79 async fn bind(&self, t: &TrackLocalContext) -> Result<RTCRtpCodecParameters>; 80 81 /// unbind should implement the teardown logic when the track is no longer needed. This happens 82 /// because a track has been stopped. unbind(&self, t: &TrackLocalContext) -> Result<()>83 async fn unbind(&self, t: &TrackLocalContext) -> Result<()>; 84 85 /// id is the unique identifier for this Track. This should be unique for the 86 /// stream, but doesn't have to globally unique. A common example would be 'audio' or 'video' 87 /// and stream_id would be 'desktop' or 'webcam' id(&self) -> &str88 fn id(&self) -> &str; 89 90 /// stream_id is the group this track belongs too. This must be unique stream_id(&self) -> &str91 fn stream_id(&self) -> &str; 92 93 /// kind controls if this TrackLocal is audio or video kind(&self) -> RTPCodecType94 fn kind(&self) -> RTPCodecType; 95 as_any(&self) -> &dyn Any96 fn as_any(&self) -> &dyn Any; 97 } 98 99 /// TrackBinding is a single bind for a Track 100 /// Bind can be called multiple times, this stores the 101 /// result for a single bind call so that it can be used when writing 102 #[derive(Default, Debug)] 103 pub(crate) struct TrackBinding { 104 id: String, 105 ssrc: SSRC, 106 payload_type: PayloadType, 107 params: RTCRtpParameters, 108 write_stream: Option<Arc<dyn TrackLocalWriter + Send + Sync>>, 109 sender_paused: Arc<AtomicBool>, 110 } 111 112 impl TrackBinding { is_sender_paused(&self) -> bool113 pub fn is_sender_paused(&self) -> bool { 114 self.sender_paused.load(Ordering::SeqCst) 115 } 116 } 117 118 pub(crate) struct InterceptorToTrackLocalWriter { 119 pub(crate) interceptor_rtp_writer: Mutex<Option<Arc<dyn RTPWriter + Send + Sync>>>, 120 sender_paused: Arc<AtomicBool>, 121 } 122 123 impl InterceptorToTrackLocalWriter { new(paused: Arc<AtomicBool>) -> Self124 pub(crate) fn new(paused: Arc<AtomicBool>) -> Self { 125 InterceptorToTrackLocalWriter { 126 interceptor_rtp_writer: Mutex::new(None), 127 sender_paused: paused, 128 } 129 } 130 is_sender_paused(&self) -> bool131 fn is_sender_paused(&self) -> bool { 132 self.sender_paused.load(Ordering::SeqCst) 133 } 134 } 135 136 impl std::fmt::Debug for InterceptorToTrackLocalWriter { fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result137 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 138 f.debug_struct("InterceptorToTrackLocalWriter").finish() 139 } 140 } 141 142 #[async_trait] 143 impl TrackLocalWriter for InterceptorToTrackLocalWriter { write_rtp(&self, pkt: &rtp::packet::Packet) -> Result<usize>144 async fn write_rtp(&self, pkt: &rtp::packet::Packet) -> Result<usize> { 145 if self.is_sender_paused() { 146 return Ok(0); 147 } 148 149 let interceptor_rtp_writer = self.interceptor_rtp_writer.lock().await; 150 if let Some(writer) = &*interceptor_rtp_writer { 151 let a = Attributes::new(); 152 Ok(writer.write(pkt, &a).await?) 153 } else { 154 Ok(0) 155 } 156 } 157 write(&self, mut b: &[u8]) -> Result<usize>158 async fn write(&self, mut b: &[u8]) -> Result<usize> { 159 let pkt = rtp::packet::Packet::unmarshal(&mut b)?; 160 self.write_rtp(&pkt).await 161 } 162 } 163