1 use super::*;
2 use crate::{Attributes, RTPWriter};
3 
4 use async_trait::async_trait;
5 use rtp::extension::abs_send_time_extension::unix2ntp;
6 use std::convert::TryInto;
7 use std::sync::Arc;
8 use std::time::{Duration, SystemTime};
9 use tokio::sync::Mutex;
10 
11 struct SenderStreamInternal {
12     ssrc: u32,
13     clock_rate: f64,
14 
15     /// data from rtp packets
16     last_rtp_time_rtp: u32,
17     last_rtp_time_time: SystemTime,
18     counters: Counters,
19 }
20 
21 impl SenderStreamInternal {
process_rtp(&mut self, now: SystemTime, pkt: &rtp::packet::Packet)22     fn process_rtp(&mut self, now: SystemTime, pkt: &rtp::packet::Packet) {
23         // always update time to minimize errors
24         self.last_rtp_time_rtp = pkt.header.timestamp;
25         self.last_rtp_time_time = now;
26 
27         self.counters.increment_packets();
28         self.counters.count_octets(pkt.payload.len());
29     }
30 
generate_report(&mut self, now: SystemTime) -> rtcp::sender_report::SenderReport31     fn generate_report(&mut self, now: SystemTime) -> rtcp::sender_report::SenderReport {
32         rtcp::sender_report::SenderReport {
33             ssrc: self.ssrc,
34             ntp_time: unix2ntp(now),
35             rtp_time: self.last_rtp_time_rtp.wrapping_add(
36                 (now.duration_since(self.last_rtp_time_time)
37                     .unwrap_or_else(|_| Duration::from_secs(0))
38                     .as_secs_f64()
39                     * self.clock_rate) as u32,
40             ),
41             packet_count: self.counters.packet_count(),
42             octet_count: self.counters.octet_count(),
43             ..Default::default()
44         }
45     }
46 }
47 
48 pub(crate) struct SenderStream {
49     next_rtp_writer: Arc<dyn RTPWriter + Send + Sync>,
50     now: Option<FnTimeGen>,
51 
52     internal: Mutex<SenderStreamInternal>,
53 }
54 
55 impl SenderStream {
new( ssrc: u32, clock_rate: u32, writer: Arc<dyn RTPWriter + Send + Sync>, now: Option<FnTimeGen>, ) -> Self56     pub(crate) fn new(
57         ssrc: u32,
58         clock_rate: u32,
59         writer: Arc<dyn RTPWriter + Send + Sync>,
60         now: Option<FnTimeGen>,
61     ) -> Self {
62         SenderStream {
63             next_rtp_writer: writer,
64             now,
65 
66             internal: Mutex::new(SenderStreamInternal {
67                 ssrc,
68                 clock_rate: clock_rate as f64,
69                 last_rtp_time_rtp: 0,
70                 last_rtp_time_time: SystemTime::UNIX_EPOCH,
71                 counters: Default::default(),
72             }),
73         }
74     }
75 
process_rtp(&self, now: SystemTime, pkt: &rtp::packet::Packet)76     async fn process_rtp(&self, now: SystemTime, pkt: &rtp::packet::Packet) {
77         let mut internal = self.internal.lock().await;
78         internal.process_rtp(now, pkt);
79     }
80 
generate_report( &self, now: SystemTime, ) -> rtcp::sender_report::SenderReport81     pub(crate) async fn generate_report(
82         &self,
83         now: SystemTime,
84     ) -> rtcp::sender_report::SenderReport {
85         let mut internal = self.internal.lock().await;
86         internal.generate_report(now)
87     }
88 }
89 
90 /// RTPWriter is used by Interceptor.bind_local_stream.
91 #[async_trait]
92 impl RTPWriter for SenderStream {
93     /// write a rtp packet
write(&self, pkt: &rtp::packet::Packet, a: &Attributes) -> Result<usize>94     async fn write(&self, pkt: &rtp::packet::Packet, a: &Attributes) -> Result<usize> {
95         let now = if let Some(f) = &self.now {
96             f()
97         } else {
98             SystemTime::now()
99         };
100         self.process_rtp(now, pkt).await;
101 
102         self.next_rtp_writer.write(pkt, a).await
103     }
104 }
105 
106 #[derive(Default)]
107 pub(crate) struct Counters {
108     packets: u32,
109     octets: u32,
110 }
111 
112 /// Wrapping counters used for generating [`rtcp::sender_report::SenderReport`]
113 impl Counters {
increment_packets(&mut self)114     pub fn increment_packets(&mut self) {
115         self.packets = self.packets.wrapping_add(1);
116     }
117 
count_octets(&mut self, octets: usize)118     pub fn count_octets(&mut self, octets: usize) {
119         // account for a payload size of at most `u32::MAX`
120         // and log a message if larger
121         self.octets = self
122             .octets
123             .wrapping_add(octets.try_into().unwrap_or_else(|_| {
124                 log::warn!("packet payload larger than 32 bits");
125                 u32::MAX
126             }));
127     }
128 
packet_count(&self) -> u32129     pub fn packet_count(&self) -> u32 {
130         self.packets
131     }
132 
octet_count(&self) -> u32133     pub fn octet_count(&self) -> u32 {
134         self.octets
135     }
136 
137     #[cfg(test)]
mock(packets: u32, octets: u32) -> Self138     pub fn mock(packets: u32, octets: u32) -> Self {
139         Self { packets, octets }
140     }
141 }
142