1 use super::*;
2 
3 pub(super) struct ReceiverStream {
4     parent_rtp_reader: Arc<dyn RTPReader + Send + Sync>,
5     hdr_ext_id: u8,
6     ssrc: u32,
7     packet_chan_tx: mpsc::Sender<Packet>,
8     // we use tokio's Instant because it makes testing easier via `tokio::time::advance`.
9     start_time: tokio::time::Instant,
10 }
11 
12 impl ReceiverStream {
new( parent_rtp_reader: Arc<dyn RTPReader + Send + Sync>, hdr_ext_id: u8, ssrc: u32, packet_chan_tx: mpsc::Sender<Packet>, start_time: tokio::time::Instant, ) -> Self13     pub(super) fn new(
14         parent_rtp_reader: Arc<dyn RTPReader + Send + Sync>,
15         hdr_ext_id: u8,
16         ssrc: u32,
17         packet_chan_tx: mpsc::Sender<Packet>,
18         start_time: tokio::time::Instant,
19     ) -> Self {
20         ReceiverStream {
21             parent_rtp_reader,
22             hdr_ext_id,
23             ssrc,
24             packet_chan_tx,
25             start_time,
26         }
27     }
28 }
29 
30 #[async_trait]
31 impl RTPReader for ReceiverStream {
32     /// read a rtp packet
read(&self, buf: &mut [u8], attributes: &Attributes) -> Result<(usize, Attributes)>33     async fn read(&self, buf: &mut [u8], attributes: &Attributes) -> Result<(usize, Attributes)> {
34         let (n, attr) = self.parent_rtp_reader.read(buf, attributes).await?;
35 
36         let mut b = &buf[..n];
37         let p = rtp::packet::Packet::unmarshal(&mut b)?;
38 
39         if let Some(mut ext) = p.header.get_extension(self.hdr_ext_id) {
40             let tcc_ext = TransportCcExtension::unmarshal(&mut ext)?;
41 
42             let _ = self
43                 .packet_chan_tx
44                 .send(Packet {
45                     hdr: p.header,
46                     sequence_number: tcc_ext.transport_sequence,
47                     arrival_time: (tokio::time::Instant::now() - self.start_time).as_micros()
48                         as i64,
49                     ssrc: self.ssrc,
50                 })
51                 .await;
52         }
53 
54         Ok((n, attr))
55     }
56 }
57