xref: /webrtc/srtp/src/stream.rs (revision 259fddd2)
1 use transport::Buffer;
2 use util::Error;
3 
4 use tokio::sync::mpsc;
5 
6 use std::io::Cursor;
7 
8 // Limit the buffer size to 1MB
9 pub const SRTP_BUFFER_SIZE: usize = 1000 * 1000;
10 
11 // Limit the buffer size to 100KB
12 pub const SRTCP_BUFFER_SIZE: usize = 100 * 1000;
13 
14 // Stream handles decryption for a single RTP/RTCP SSRC
15 pub struct Stream {
16     ssrc: u32,
17     tx: mpsc::Sender<u32>,
18     buffer: Buffer,
19     is_rtp: bool,
20 }
21 
22 impl Stream {
23     pub fn new(ssrc: u32, tx: mpsc::Sender<u32>, is_rtp: bool) -> Self {
24         Stream {
25             ssrc,
26             tx,
27             // Create a buffer with a 1MB limit
28             buffer: Buffer::new(
29                 0,
30                 if is_rtp {
31                     SRTP_BUFFER_SIZE
32                 } else {
33                     SRTCP_BUFFER_SIZE
34                 },
35             ),
36             is_rtp,
37         }
38     }
39 
40     // Get Cloned Buffer
41     pub(crate) fn get_cloned_buffer(&self) -> Buffer {
42         self.buffer.clone()
43     }
44 
45     // GetSSRC returns the SSRC we are demuxing for
46     pub fn get_ssrc(&self) -> u32 {
47         self.ssrc
48     }
49 
50     pub fn is_rtp_stream(&self) -> bool {
51         self.is_rtp
52     }
53 
54     // Read reads and decrypts full RTP packet from the nextConn
55     pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
56         self.buffer.read(buf, None).await
57     }
58 
59     // ReadRTP reads and decrypts full RTP packet and its header from the nextConn
60     pub async fn read_rtp(
61         &mut self,
62         buf: &mut [u8],
63     ) -> Result<(usize, rtp::header::Header), Error> {
64         if !self.is_rtp {
65             return Err(Error::new("this stream is not RTPStream".to_string()));
66         }
67 
68         let n = self.buffer.read(buf, None).await?;
69         let mut reader = Cursor::new(buf);
70         let header = rtp::header::Header::unmarshal(&mut reader)?;
71 
72         Ok((n, header))
73     }
74 
75     // ReadRTCP reads and decrypts full RTP packet and its header from the nextConn
76     pub async fn read_rtcp(
77         &mut self,
78         buf: &mut [u8],
79     ) -> Result<(usize, rtcp::header::Header), Error> {
80         if self.is_rtp {
81             return Err(Error::new("this stream is not RTCPStream".to_string()));
82         }
83 
84         let n = self.buffer.read(buf, None).await?;
85         let mut reader = Cursor::new(buf);
86         let header = rtcp::header::Header::unmarshal(&mut reader)?;
87 
88         Ok((n, header))
89     }
90 
91     // Close removes the ReadStream from the session and cleans up any associated state
92     pub async fn close(&mut self) -> Result<(), Error> {
93         self.buffer.close().await;
94         self.tx.send(self.ssrc).await?;
95 
96         Ok(())
97     }
98 }
99