1 mod responder_stream; 2 #[cfg(test)] 3 mod responder_test; 4 5 use crate::stream_info::StreamInfo; 6 use crate::{ 7 Attributes, Interceptor, InterceptorBuilder, RTCPReader, RTCPWriter, RTPReader, RTPWriter, 8 }; 9 use responder_stream::ResponderStream; 10 11 use crate::error::Result; 12 use crate::nack::stream_support_nack; 13 14 use async_trait::async_trait; 15 use rtcp::transport_feedbacks::transport_layer_nack::TransportLayerNack; 16 use std::collections::HashMap; 17 use std::future::Future; 18 use std::pin::Pin; 19 use std::sync::Arc; 20 use tokio::sync::Mutex; 21 22 /// GeneratorBuilder can be used to configure Responder Interceptor 23 #[derive(Default)] 24 pub struct ResponderBuilder { 25 log2_size: Option<u8>, 26 } 27 28 impl ResponderBuilder { 29 /// with_log2_size sets the size of the interceptor. 30 /// Size must be one of: 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768 with_log2_size(mut self, log2_size: u8) -> ResponderBuilder31 pub fn with_log2_size(mut self, log2_size: u8) -> ResponderBuilder { 32 self.log2_size = Some(log2_size); 33 self 34 } 35 } 36 37 impl InterceptorBuilder for ResponderBuilder { build(&self, _id: &str) -> Result<Arc<dyn Interceptor + Send + Sync>>38 fn build(&self, _id: &str) -> Result<Arc<dyn Interceptor + Send + Sync>> { 39 Ok(Arc::new(Responder { 40 internal: Arc::new(ResponderInternal { 41 log2_size: if let Some(log2_size) = self.log2_size { 42 log2_size 43 } else { 44 13 // 8192 = 1 << 13 45 }, 46 streams: Arc::new(Mutex::new(HashMap::new())), 47 }), 48 })) 49 } 50 } 51 52 pub struct ResponderInternal { 53 log2_size: u8, 54 streams: Arc<Mutex<HashMap<u32, Arc<ResponderStream>>>>, 55 } 56 57 impl ResponderInternal { resend_packets( streams: Arc<Mutex<HashMap<u32, Arc<ResponderStream>>>>, nack: TransportLayerNack, )58 async fn resend_packets( 59 streams: Arc<Mutex<HashMap<u32, Arc<ResponderStream>>>>, 60 nack: TransportLayerNack, 61 ) { 62 let stream = { 63 let m = streams.lock().await; 64 if let Some(stream) = m.get(&nack.media_ssrc) { 65 stream.clone() 66 } else { 67 return; 68 } 69 }; 70 71 for n in &nack.nacks { 72 // can't use n.range() since this callback is async fn, 73 // instead, use NackPair into_iter() 74 let stream2 = Arc::clone(&stream); 75 let f = Box::new( 76 move |seq: u16| -> Pin<Box<dyn Future<Output = bool> + Send + 'static>> { 77 let stream3 = Arc::clone(&stream2); 78 Box::pin(async move { 79 if let Some(p) = stream3.get(seq).await { 80 let a = Attributes::new(); 81 if let Err(err) = stream3.next_rtp_writer.write(&p, &a).await { 82 log::warn!("failed resending nacked packet: {}", err); 83 } 84 } 85 true 86 }) 87 }, 88 ); 89 for packet_id in n.into_iter() { 90 if !f(packet_id).await { 91 return; 92 } 93 } 94 } 95 } 96 } 97 98 pub struct ResponderRtcpReader { 99 parent_rtcp_reader: Arc<dyn RTCPReader + Send + Sync>, 100 internal: Arc<ResponderInternal>, 101 } 102 103 #[async_trait] 104 impl RTCPReader for ResponderRtcpReader { read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)>105 async fn read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> { 106 let (n, attr) = { self.parent_rtcp_reader.read(buf, a).await? }; 107 108 let mut b = &buf[..n]; 109 let pkts = rtcp::packet::unmarshal(&mut b)?; 110 for p in &pkts { 111 if let Some(nack) = p.as_any().downcast_ref::<TransportLayerNack>() { 112 let nack = nack.clone(); 113 let streams = Arc::clone(&self.internal.streams); 114 tokio::spawn(async move { 115 ResponderInternal::resend_packets(streams, nack).await; 116 }); 117 } 118 } 119 120 Ok((n, attr)) 121 } 122 } 123 124 /// Responder responds to nack feedback messages 125 pub struct Responder { 126 internal: Arc<ResponderInternal>, 127 } 128 129 impl Responder { 130 /// builder returns a new ResponderBuilder. builder() -> ResponderBuilder131 pub fn builder() -> ResponderBuilder { 132 ResponderBuilder::default() 133 } 134 } 135 136 #[async_trait] 137 impl Interceptor for Responder { 138 /// bind_rtcp_reader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might 139 /// change in the future. The returned method will be called once per packet batch. bind_rtcp_reader( &self, reader: Arc<dyn RTCPReader + Send + Sync>, ) -> Arc<dyn RTCPReader + Send + Sync>140 async fn bind_rtcp_reader( 141 &self, 142 reader: Arc<dyn RTCPReader + Send + Sync>, 143 ) -> Arc<dyn RTCPReader + Send + Sync> { 144 Arc::new(ResponderRtcpReader { 145 internal: Arc::clone(&self.internal), 146 parent_rtcp_reader: reader, 147 }) as Arc<dyn RTCPReader + Send + Sync> 148 } 149 150 /// bind_rtcp_writer lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method 151 /// will be called once per packet batch. bind_rtcp_writer( &self, writer: Arc<dyn RTCPWriter + Send + Sync>, ) -> Arc<dyn RTCPWriter + Send + Sync>152 async fn bind_rtcp_writer( 153 &self, 154 writer: Arc<dyn RTCPWriter + Send + Sync>, 155 ) -> Arc<dyn RTCPWriter + Send + Sync> { 156 writer 157 } 158 159 /// bind_local_stream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method 160 /// will be called once per rtp packet. bind_local_stream( &self, info: &StreamInfo, writer: Arc<dyn RTPWriter + Send + Sync>, ) -> Arc<dyn RTPWriter + Send + Sync>161 async fn bind_local_stream( 162 &self, 163 info: &StreamInfo, 164 writer: Arc<dyn RTPWriter + Send + Sync>, 165 ) -> Arc<dyn RTPWriter + Send + Sync> { 166 if !stream_support_nack(info) { 167 return writer; 168 } 169 170 let stream = Arc::new(ResponderStream::new(self.internal.log2_size, writer)); 171 { 172 let mut streams = self.internal.streams.lock().await; 173 streams.insert(info.ssrc, Arc::clone(&stream)); 174 } 175 176 stream 177 } 178 179 /// unbind_local_stream is called when the Stream is removed. It can be used to clean up any data related to that track. unbind_local_stream(&self, info: &StreamInfo)180 async fn unbind_local_stream(&self, info: &StreamInfo) { 181 let mut streams = self.internal.streams.lock().await; 182 streams.remove(&info.ssrc); 183 } 184 185 /// bind_remote_stream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method 186 /// will be called once per rtp packet. bind_remote_stream( &self, _info: &StreamInfo, reader: Arc<dyn RTPReader + Send + Sync>, ) -> Arc<dyn RTPReader + Send + Sync>187 async fn bind_remote_stream( 188 &self, 189 _info: &StreamInfo, 190 reader: Arc<dyn RTPReader + Send + Sync>, 191 ) -> Arc<dyn RTPReader + Send + Sync> { 192 reader 193 } 194 195 /// unbind_remote_stream is called when the Stream is removed. It can be used to clean up any data related to that track. unbind_remote_stream(&self, _info: &StreamInfo)196 async fn unbind_remote_stream(&self, _info: &StreamInfo) {} 197 198 /// close closes the Interceptor, cleaning up any data if necessary. close(&self) -> Result<()>199 async fn close(&self) -> Result<()> { 200 Ok(()) 201 } 202 } 203