1 mod sender_stream; 2 #[cfg(test)] 3 mod sender_test; 4 5 use crate::*; 6 use crate::{Attributes, RTPWriter}; 7 use sender_stream::SenderStream; 8 9 use rtp::extension::transport_cc_extension::TransportCcExtension; 10 use std::sync::atomic::{AtomicU32, Ordering}; 11 use std::sync::Arc; 12 use tokio::sync::Mutex; 13 use util::Marshal; 14 15 pub(crate) const TRANSPORT_CC_URI: &str = 16 "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01"; 17 18 /// HeaderExtensionBuilder is a InterceptorBuilder for a HeaderExtension Interceptor 19 #[derive(Default)] 20 pub struct SenderBuilder { 21 init_sequence_nr: u32, 22 } 23 24 impl SenderBuilder { 25 /// with_init_sequence_nr sets the init sequence number of the interceptor. with_init_sequence_nr(mut self, init_sequence_nr: u32) -> SenderBuilder26 pub fn with_init_sequence_nr(mut self, init_sequence_nr: u32) -> SenderBuilder { 27 self.init_sequence_nr = init_sequence_nr; 28 self 29 } 30 } 31 32 impl InterceptorBuilder for SenderBuilder { 33 /// build constructs a new SenderInterceptor build(&self, _id: &str) -> Result<Arc<dyn Interceptor + Send + Sync>>34 fn build(&self, _id: &str) -> Result<Arc<dyn Interceptor + Send + Sync>> { 35 Ok(Arc::new(Sender { 36 next_sequence_nr: Arc::new(AtomicU32::new(self.init_sequence_nr)), 37 streams: Mutex::new(HashMap::new()), 38 })) 39 } 40 } 41 42 /// Sender adds transport wide sequence numbers as header extension to each RTP packet 43 pub struct Sender { 44 next_sequence_nr: Arc<AtomicU32>, 45 streams: Mutex<HashMap<u32, Arc<SenderStream>>>, 46 } 47 48 impl Sender { 49 /// builder returns a new SenderBuilder. builder() -> SenderBuilder50 pub fn builder() -> SenderBuilder { 51 SenderBuilder::default() 52 } 53 } 54 55 #[async_trait] 56 impl Interceptor for Sender { 57 /// bind_rtcp_reader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might 58 /// change in the future. The returned method will be called once per packet batch. bind_rtcp_reader( &self, reader: Arc<dyn RTCPReader + Send + Sync>, ) -> Arc<dyn RTCPReader + Send + Sync>59 async fn bind_rtcp_reader( 60 &self, 61 reader: Arc<dyn RTCPReader + Send + Sync>, 62 ) -> Arc<dyn RTCPReader + Send + Sync> { 63 reader 64 } 65 66 /// bind_rtcp_writer lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method 67 /// will be called once per packet batch. bind_rtcp_writer( &self, writer: Arc<dyn RTCPWriter + Send + Sync>, ) -> Arc<dyn RTCPWriter + Send + Sync>68 async fn bind_rtcp_writer( 69 &self, 70 writer: Arc<dyn RTCPWriter + Send + Sync>, 71 ) -> Arc<dyn RTCPWriter + Send + Sync> { 72 writer 73 } 74 75 /// bind_local_stream returns a writer that adds a rtp TransportCCExtension 76 /// header with increasing sequence numbers to each outgoing packet. bind_local_stream( &self, info: &StreamInfo, writer: Arc<dyn RTPWriter + Send + Sync>, ) -> Arc<dyn RTPWriter + Send + Sync>77 async fn bind_local_stream( 78 &self, 79 info: &StreamInfo, 80 writer: Arc<dyn RTPWriter + Send + Sync>, 81 ) -> Arc<dyn RTPWriter + Send + Sync> { 82 let mut hdr_ext_id = 0u8; 83 for e in &info.rtp_header_extensions { 84 if e.uri == TRANSPORT_CC_URI { 85 hdr_ext_id = e.id as u8; 86 break; 87 } 88 } 89 if hdr_ext_id == 0 { 90 // Don't add header extension if ID is 0, because 0 is an invalid extension ID 91 return writer; 92 } 93 94 let stream = Arc::new(SenderStream::new( 95 writer, 96 Arc::clone(&self.next_sequence_nr), 97 hdr_ext_id, 98 )); 99 100 { 101 let mut streams = self.streams.lock().await; 102 streams.insert(info.ssrc, Arc::clone(&stream)); 103 } 104 105 stream 106 } 107 108 /// unbind_local_stream is called when the Stream is removed. It can be used to clean up any data related to that track. unbind_local_stream(&self, info: &StreamInfo)109 async fn unbind_local_stream(&self, info: &StreamInfo) { 110 let mut streams = self.streams.lock().await; 111 streams.remove(&info.ssrc); 112 } 113 114 /// bind_remote_stream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method 115 /// will be called once per rtp packet. bind_remote_stream( &self, _info: &StreamInfo, reader: Arc<dyn RTPReader + Send + Sync>, ) -> Arc<dyn RTPReader + Send + Sync>116 async fn bind_remote_stream( 117 &self, 118 _info: &StreamInfo, 119 reader: Arc<dyn RTPReader + Send + Sync>, 120 ) -> Arc<dyn RTPReader + Send + Sync> { 121 reader 122 } 123 124 /// unbind_remote_stream is called when the Stream is removed. It can be used to clean up any data related to that track. unbind_remote_stream(&self, _info: &StreamInfo)125 async fn unbind_remote_stream(&self, _info: &StreamInfo) {} 126 127 /// close closes the Interceptor, cleaning up any data if necessary. close(&self) -> Result<()>128 async fn close(&self) -> Result<()> { 129 Ok(()) 130 } 131 } 132