1 use super::*; 2 3 pub(super) struct ReceiverStream { 4 parent_rtp_reader: Arc<dyn RTPReader + Send + Sync>, 5 hdr_ext_id: u8, 6 ssrc: u32, 7 packet_chan_tx: mpsc::Sender<Packet>, 8 // we use tokio's Instant because it makes testing easier via `tokio::time::advance`. 9 start_time: tokio::time::Instant, 10 } 11 12 impl ReceiverStream { new( parent_rtp_reader: Arc<dyn RTPReader + Send + Sync>, hdr_ext_id: u8, ssrc: u32, packet_chan_tx: mpsc::Sender<Packet>, start_time: tokio::time::Instant, ) -> Self13 pub(super) fn new( 14 parent_rtp_reader: Arc<dyn RTPReader + Send + Sync>, 15 hdr_ext_id: u8, 16 ssrc: u32, 17 packet_chan_tx: mpsc::Sender<Packet>, 18 start_time: tokio::time::Instant, 19 ) -> Self { 20 ReceiverStream { 21 parent_rtp_reader, 22 hdr_ext_id, 23 ssrc, 24 packet_chan_tx, 25 start_time, 26 } 27 } 28 } 29 30 #[async_trait] 31 impl RTPReader for ReceiverStream { 32 /// read a rtp packet read(&self, buf: &mut [u8], attributes: &Attributes) -> Result<(usize, Attributes)>33 async fn read(&self, buf: &mut [u8], attributes: &Attributes) -> Result<(usize, Attributes)> { 34 let (n, attr) = self.parent_rtp_reader.read(buf, attributes).await?; 35 36 let mut b = &buf[..n]; 37 let p = rtp::packet::Packet::unmarshal(&mut b)?; 38 39 if let Some(mut ext) = p.header.get_extension(self.hdr_ext_id) { 40 let tcc_ext = TransportCcExtension::unmarshal(&mut ext)?; 41 42 let _ = self 43 .packet_chan_tx 44 .send(Packet { 45 hdr: p.header, 46 sequence_number: tcc_ext.transport_sequence, 47 arrival_time: (tokio::time::Instant::now() - self.start_time).as_micros() 48 as i64, 49 ssrc: self.ssrc, 50 }) 51 .await; 52 } 53 54 Ok((n, attr)) 55 } 56 } 57