xref: /webrtc/interceptor/src/nack/responder/mod.rs (revision bc5c52e6)
1 mod responder_stream;
2 #[cfg(test)]
3 mod responder_test;
4 
5 use crate::stream_info::StreamInfo;
6 use crate::{
7     Attributes, Interceptor, InterceptorBuilder, RTCPReader, RTCPWriter, RTPReader, RTPWriter,
8 };
9 use responder_stream::ResponderStream;
10 
11 use crate::error::Result;
12 use crate::nack::stream_support_nack;
13 
14 use async_trait::async_trait;
15 use rtcp::transport_feedbacks::transport_layer_nack::TransportLayerNack;
16 use std::collections::HashMap;
17 use std::future::Future;
18 use std::pin::Pin;
19 use std::sync::Arc;
20 use tokio::sync::Mutex;
21 
22 /// GeneratorBuilder can be used to configure Responder Interceptor
23 #[derive(Default)]
24 pub struct ResponderBuilder {
25     log2_size: Option<u8>,
26 }
27 
28 impl ResponderBuilder {
29     /// with_log2_size sets the size of the interceptor.
30     /// Size must be one of: 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768
with_log2_size(mut self, log2_size: u8) -> ResponderBuilder31     pub fn with_log2_size(mut self, log2_size: u8) -> ResponderBuilder {
32         self.log2_size = Some(log2_size);
33         self
34     }
35 }
36 
37 impl InterceptorBuilder for ResponderBuilder {
build(&self, _id: &str) -> Result<Arc<dyn Interceptor + Send + Sync>>38     fn build(&self, _id: &str) -> Result<Arc<dyn Interceptor + Send + Sync>> {
39         Ok(Arc::new(Responder {
40             internal: Arc::new(ResponderInternal {
41                 log2_size: if let Some(log2_size) = self.log2_size {
42                     log2_size
43                 } else {
44                     13 // 8192 = 1 << 13
45                 },
46                 streams: Arc::new(Mutex::new(HashMap::new())),
47             }),
48         }))
49     }
50 }
51 
52 pub struct ResponderInternal {
53     log2_size: u8,
54     streams: Arc<Mutex<HashMap<u32, Arc<ResponderStream>>>>,
55 }
56 
57 impl ResponderInternal {
resend_packets( streams: Arc<Mutex<HashMap<u32, Arc<ResponderStream>>>>, nack: TransportLayerNack, )58     async fn resend_packets(
59         streams: Arc<Mutex<HashMap<u32, Arc<ResponderStream>>>>,
60         nack: TransportLayerNack,
61     ) {
62         let stream = {
63             let m = streams.lock().await;
64             if let Some(stream) = m.get(&nack.media_ssrc) {
65                 stream.clone()
66             } else {
67                 return;
68             }
69         };
70 
71         for n in &nack.nacks {
72             // can't use n.range() since this callback is async fn,
73             // instead, use NackPair into_iter()
74             let stream2 = Arc::clone(&stream);
75             let f = Box::new(
76                 move |seq: u16| -> Pin<Box<dyn Future<Output = bool> + Send + 'static>> {
77                     let stream3 = Arc::clone(&stream2);
78                     Box::pin(async move {
79                         if let Some(p) = stream3.get(seq).await {
80                             let a = Attributes::new();
81                             if let Err(err) = stream3.next_rtp_writer.write(&p, &a).await {
82                                 log::warn!("failed resending nacked packet: {}", err);
83                             }
84                         }
85                         true
86                     })
87                 },
88             );
89             for packet_id in n.into_iter() {
90                 if !f(packet_id).await {
91                     return;
92                 }
93             }
94         }
95     }
96 }
97 
98 pub struct ResponderRtcpReader {
99     parent_rtcp_reader: Arc<dyn RTCPReader + Send + Sync>,
100     internal: Arc<ResponderInternal>,
101 }
102 
103 #[async_trait]
104 impl RTCPReader for ResponderRtcpReader {
read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)>105     async fn read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> {
106         let (n, attr) = { self.parent_rtcp_reader.read(buf, a).await? };
107 
108         let mut b = &buf[..n];
109         let pkts = rtcp::packet::unmarshal(&mut b)?;
110         for p in &pkts {
111             if let Some(nack) = p.as_any().downcast_ref::<TransportLayerNack>() {
112                 let nack = nack.clone();
113                 let streams = Arc::clone(&self.internal.streams);
114                 tokio::spawn(async move {
115                     ResponderInternal::resend_packets(streams, nack).await;
116                 });
117             }
118         }
119 
120         Ok((n, attr))
121     }
122 }
123 
124 /// Responder responds to nack feedback messages
125 pub struct Responder {
126     internal: Arc<ResponderInternal>,
127 }
128 
129 impl Responder {
130     /// builder returns a new ResponderBuilder.
builder() -> ResponderBuilder131     pub fn builder() -> ResponderBuilder {
132         ResponderBuilder::default()
133     }
134 }
135 
136 #[async_trait]
137 impl Interceptor for Responder {
138     /// bind_rtcp_reader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might
139     /// change in the future. The returned method will be called once per packet batch.
bind_rtcp_reader( &self, reader: Arc<dyn RTCPReader + Send + Sync>, ) -> Arc<dyn RTCPReader + Send + Sync>140     async fn bind_rtcp_reader(
141         &self,
142         reader: Arc<dyn RTCPReader + Send + Sync>,
143     ) -> Arc<dyn RTCPReader + Send + Sync> {
144         Arc::new(ResponderRtcpReader {
145             internal: Arc::clone(&self.internal),
146             parent_rtcp_reader: reader,
147         }) as Arc<dyn RTCPReader + Send + Sync>
148     }
149 
150     /// bind_rtcp_writer lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
151     /// will be called once per packet batch.
bind_rtcp_writer( &self, writer: Arc<dyn RTCPWriter + Send + Sync>, ) -> Arc<dyn RTCPWriter + Send + Sync>152     async fn bind_rtcp_writer(
153         &self,
154         writer: Arc<dyn RTCPWriter + Send + Sync>,
155     ) -> Arc<dyn RTCPWriter + Send + Sync> {
156         writer
157     }
158 
159     /// bind_local_stream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method
160     /// will be called once per rtp packet.
bind_local_stream( &self, info: &StreamInfo, writer: Arc<dyn RTPWriter + Send + Sync>, ) -> Arc<dyn RTPWriter + Send + Sync>161     async fn bind_local_stream(
162         &self,
163         info: &StreamInfo,
164         writer: Arc<dyn RTPWriter + Send + Sync>,
165     ) -> Arc<dyn RTPWriter + Send + Sync> {
166         if !stream_support_nack(info) {
167             return writer;
168         }
169 
170         let stream = Arc::new(ResponderStream::new(self.internal.log2_size, writer));
171         {
172             let mut streams = self.internal.streams.lock().await;
173             streams.insert(info.ssrc, Arc::clone(&stream));
174         }
175 
176         stream
177     }
178 
179     /// unbind_local_stream is called when the Stream is removed. It can be used to clean up any data related to that track.
unbind_local_stream(&self, info: &StreamInfo)180     async fn unbind_local_stream(&self, info: &StreamInfo) {
181         let mut streams = self.internal.streams.lock().await;
182         streams.remove(&info.ssrc);
183     }
184 
185     /// bind_remote_stream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
186     /// will be called once per rtp packet.
bind_remote_stream( &self, _info: &StreamInfo, reader: Arc<dyn RTPReader + Send + Sync>, ) -> Arc<dyn RTPReader + Send + Sync>187     async fn bind_remote_stream(
188         &self,
189         _info: &StreamInfo,
190         reader: Arc<dyn RTPReader + Send + Sync>,
191     ) -> Arc<dyn RTPReader + Send + Sync> {
192         reader
193     }
194 
195     /// unbind_remote_stream is called when the Stream is removed. It can be used to clean up any data related to that track.
unbind_remote_stream(&self, _info: &StreamInfo)196     async fn unbind_remote_stream(&self, _info: &StreamInfo) {}
197 
198     /// close closes the Interceptor, cleaning up any data if necessary.
close(&self) -> Result<()>199     async fn close(&self) -> Result<()> {
200         Ok(())
201     }
202 }
203