xref: /webrtc/interceptor/src/twcc/sender/mod.rs (revision ffe74184)
1 mod sender_stream;
2 #[cfg(test)]
3 mod sender_test;
4 
5 use crate::*;
6 use crate::{Attributes, RTPWriter};
7 use sender_stream::SenderStream;
8 
9 use rtp::extension::transport_cc_extension::TransportCcExtension;
10 use std::sync::atomic::{AtomicU32, Ordering};
11 use std::sync::Arc;
12 use tokio::sync::Mutex;
13 use util::Marshal;
14 
15 pub(crate) const TRANSPORT_CC_URI: &str =
16     "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01";
17 
18 /// HeaderExtensionBuilder is a InterceptorBuilder for a HeaderExtension Interceptor
19 #[derive(Default)]
20 pub struct SenderBuilder {
21     init_sequence_nr: u32,
22 }
23 
24 impl SenderBuilder {
25     /// with_init_sequence_nr sets the init sequence number of the interceptor.
with_init_sequence_nr(mut self, init_sequence_nr: u32) -> SenderBuilder26     pub fn with_init_sequence_nr(mut self, init_sequence_nr: u32) -> SenderBuilder {
27         self.init_sequence_nr = init_sequence_nr;
28         self
29     }
30 }
31 
32 impl InterceptorBuilder for SenderBuilder {
33     /// build constructs a new SenderInterceptor
build(&self, _id: &str) -> Result<Arc<dyn Interceptor + Send + Sync>>34     fn build(&self, _id: &str) -> Result<Arc<dyn Interceptor + Send + Sync>> {
35         Ok(Arc::new(Sender {
36             next_sequence_nr: Arc::new(AtomicU32::new(self.init_sequence_nr)),
37             streams: Mutex::new(HashMap::new()),
38         }))
39     }
40 }
41 
42 /// Sender adds transport wide sequence numbers as header extension to each RTP packet
43 pub struct Sender {
44     next_sequence_nr: Arc<AtomicU32>,
45     streams: Mutex<HashMap<u32, Arc<SenderStream>>>,
46 }
47 
48 impl Sender {
49     /// builder returns a new SenderBuilder.
builder() -> SenderBuilder50     pub fn builder() -> SenderBuilder {
51         SenderBuilder::default()
52     }
53 }
54 
55 #[async_trait]
56 impl Interceptor for Sender {
57     /// bind_rtcp_reader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might
58     /// change in the future. The returned method will be called once per packet batch.
bind_rtcp_reader( &self, reader: Arc<dyn RTCPReader + Send + Sync>, ) -> Arc<dyn RTCPReader + Send + Sync>59     async fn bind_rtcp_reader(
60         &self,
61         reader: Arc<dyn RTCPReader + Send + Sync>,
62     ) -> Arc<dyn RTCPReader + Send + Sync> {
63         reader
64     }
65 
66     /// bind_rtcp_writer lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
67     /// will be called once per packet batch.
bind_rtcp_writer( &self, writer: Arc<dyn RTCPWriter + Send + Sync>, ) -> Arc<dyn RTCPWriter + Send + Sync>68     async fn bind_rtcp_writer(
69         &self,
70         writer: Arc<dyn RTCPWriter + Send + Sync>,
71     ) -> Arc<dyn RTCPWriter + Send + Sync> {
72         writer
73     }
74 
75     /// bind_local_stream returns a writer that adds a rtp TransportCCExtension
76     /// header with increasing sequence numbers to each outgoing packet.
bind_local_stream( &self, info: &StreamInfo, writer: Arc<dyn RTPWriter + Send + Sync>, ) -> Arc<dyn RTPWriter + Send + Sync>77     async fn bind_local_stream(
78         &self,
79         info: &StreamInfo,
80         writer: Arc<dyn RTPWriter + Send + Sync>,
81     ) -> Arc<dyn RTPWriter + Send + Sync> {
82         let mut hdr_ext_id = 0u8;
83         for e in &info.rtp_header_extensions {
84             if e.uri == TRANSPORT_CC_URI {
85                 hdr_ext_id = e.id as u8;
86                 break;
87             }
88         }
89         if hdr_ext_id == 0 {
90             // Don't add header extension if ID is 0, because 0 is an invalid extension ID
91             return writer;
92         }
93 
94         let stream = Arc::new(SenderStream::new(
95             writer,
96             Arc::clone(&self.next_sequence_nr),
97             hdr_ext_id,
98         ));
99 
100         {
101             let mut streams = self.streams.lock().await;
102             streams.insert(info.ssrc, Arc::clone(&stream));
103         }
104 
105         stream
106     }
107 
108     /// unbind_local_stream is called when the Stream is removed. It can be used to clean up any data related to that track.
unbind_local_stream(&self, info: &StreamInfo)109     async fn unbind_local_stream(&self, info: &StreamInfo) {
110         let mut streams = self.streams.lock().await;
111         streams.remove(&info.ssrc);
112     }
113 
114     /// bind_remote_stream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
115     /// will be called once per rtp packet.
bind_remote_stream( &self, _info: &StreamInfo, reader: Arc<dyn RTPReader + Send + Sync>, ) -> Arc<dyn RTPReader + Send + Sync>116     async fn bind_remote_stream(
117         &self,
118         _info: &StreamInfo,
119         reader: Arc<dyn RTPReader + Send + Sync>,
120     ) -> Arc<dyn RTPReader + Send + Sync> {
121         reader
122     }
123 
124     /// unbind_remote_stream is called when the Stream is removed. It can be used to clean up any data related to that track.
unbind_remote_stream(&self, _info: &StreamInfo)125     async fn unbind_remote_stream(&self, _info: &StreamInfo) {}
126 
127     /// close closes the Interceptor, cleaning up any data if necessary.
close(&self) -> Result<()>128     async fn close(&self) -> Result<()> {
129         Ok(())
130     }
131 }
132