xref: /webrtc/srtp/src/stream.rs (revision ffe74184)
1 use crate::error::{Error, Result};
2 use util::{marshal::*, Buffer};
3 
4 use tokio::sync::mpsc;
5 
6 /// Limit the buffer size to 1MB
7 pub const SRTP_BUFFER_SIZE: usize = 1000 * 1000;
8 
9 /// Limit the buffer size to 100KB
10 pub const SRTCP_BUFFER_SIZE: usize = 100 * 1000;
11 
12 /// Stream handles decryption for a single RTP/RTCP SSRC
13 #[derive(Debug)]
14 pub struct Stream {
15     ssrc: u32,
16     tx: mpsc::Sender<u32>,
17     pub(crate) buffer: Buffer,
18     is_rtp: bool,
19 }
20 
21 impl Stream {
22     /// Create a new stream
new(ssrc: u32, tx: mpsc::Sender<u32>, is_rtp: bool) -> Self23     pub fn new(ssrc: u32, tx: mpsc::Sender<u32>, is_rtp: bool) -> Self {
24         Stream {
25             ssrc,
26             tx,
27             // Create a buffer with a 1MB limit
28             buffer: Buffer::new(
29                 0,
30                 if is_rtp {
31                     SRTP_BUFFER_SIZE
32                 } else {
33                     SRTCP_BUFFER_SIZE
34                 },
35             ),
36             is_rtp,
37         }
38     }
39 
40     /// GetSSRC returns the SSRC we are demuxing for
get_ssrc(&self) -> u3241     pub fn get_ssrc(&self) -> u32 {
42         self.ssrc
43     }
44 
45     /// Check if RTP is a stream.
is_rtp_stream(&self) -> bool46     pub fn is_rtp_stream(&self) -> bool {
47         self.is_rtp
48     }
49 
50     /// Read reads and decrypts full RTP packet from the nextConn
read(&self, buf: &mut [u8]) -> Result<usize>51     pub async fn read(&self, buf: &mut [u8]) -> Result<usize> {
52         Ok(self.buffer.read(buf, None).await?)
53     }
54 
55     /// ReadRTP reads and decrypts full RTP packet and its header from the nextConn
read_rtp(&self, buf: &mut [u8]) -> Result<(usize, rtp::header::Header)>56     pub async fn read_rtp(&self, buf: &mut [u8]) -> Result<(usize, rtp::header::Header)> {
57         if !self.is_rtp {
58             return Err(Error::InvalidRtpStream);
59         }
60 
61         let n = self.buffer.read(buf, None).await?;
62         let mut b = &buf[..n];
63         let header = rtp::header::Header::unmarshal(&mut b)?;
64 
65         Ok((n, header))
66     }
67 
68     /// read_rtcp reads and decrypts full RTP packet and its header from the nextConn
read_rtcp(&self, buf: &mut [u8]) -> Result<(usize, rtcp::header::Header)>69     pub async fn read_rtcp(&self, buf: &mut [u8]) -> Result<(usize, rtcp::header::Header)> {
70         if self.is_rtp {
71             return Err(Error::InvalidRtcpStream);
72         }
73 
74         let n = self.buffer.read(buf, None).await?;
75         let mut b = &buf[..n];
76         let header = rtcp::header::Header::unmarshal(&mut b)?;
77 
78         Ok((n, header))
79     }
80 
81     /// Close removes the ReadStream from the session and cleans up any associated state
close(&self) -> Result<()>82     pub async fn close(&self) -> Result<()> {
83         self.buffer.close().await;
84         let _ = self.tx.send(self.ssrc).await;
85         Ok(())
86     }
87 }
88