1 use crate::error::{Error, Result}; 2 use util::{marshal::*, Buffer}; 3 4 use tokio::sync::mpsc; 5 6 /// Limit the buffer size to 1MB 7 pub const SRTP_BUFFER_SIZE: usize = 1000 * 1000; 8 9 /// Limit the buffer size to 100KB 10 pub const SRTCP_BUFFER_SIZE: usize = 100 * 1000; 11 12 /// Stream handles decryption for a single RTP/RTCP SSRC 13 #[derive(Debug)] 14 pub struct Stream { 15 ssrc: u32, 16 tx: mpsc::Sender<u32>, 17 pub(crate) buffer: Buffer, 18 is_rtp: bool, 19 } 20 21 impl Stream { 22 /// Create a new stream new(ssrc: u32, tx: mpsc::Sender<u32>, is_rtp: bool) -> Self23 pub fn new(ssrc: u32, tx: mpsc::Sender<u32>, is_rtp: bool) -> Self { 24 Stream { 25 ssrc, 26 tx, 27 // Create a buffer with a 1MB limit 28 buffer: Buffer::new( 29 0, 30 if is_rtp { 31 SRTP_BUFFER_SIZE 32 } else { 33 SRTCP_BUFFER_SIZE 34 }, 35 ), 36 is_rtp, 37 } 38 } 39 40 /// GetSSRC returns the SSRC we are demuxing for get_ssrc(&self) -> u3241 pub fn get_ssrc(&self) -> u32 { 42 self.ssrc 43 } 44 45 /// Check if RTP is a stream. is_rtp_stream(&self) -> bool46 pub fn is_rtp_stream(&self) -> bool { 47 self.is_rtp 48 } 49 50 /// Read reads and decrypts full RTP packet from the nextConn read(&self, buf: &mut [u8]) -> Result<usize>51 pub async fn read(&self, buf: &mut [u8]) -> Result<usize> { 52 Ok(self.buffer.read(buf, None).await?) 53 } 54 55 /// ReadRTP reads and decrypts full RTP packet and its header from the nextConn read_rtp(&self, buf: &mut [u8]) -> Result<(usize, rtp::header::Header)>56 pub async fn read_rtp(&self, buf: &mut [u8]) -> Result<(usize, rtp::header::Header)> { 57 if !self.is_rtp { 58 return Err(Error::InvalidRtpStream); 59 } 60 61 let n = self.buffer.read(buf, None).await?; 62 let mut b = &buf[..n]; 63 let header = rtp::header::Header::unmarshal(&mut b)?; 64 65 Ok((n, header)) 66 } 67 68 /// read_rtcp reads and decrypts full RTP packet and its header from the nextConn read_rtcp(&self, buf: &mut [u8]) -> Result<(usize, rtcp::header::Header)>69 pub async fn read_rtcp(&self, buf: &mut [u8]) -> Result<(usize, rtcp::header::Header)> { 70 if self.is_rtp { 71 return Err(Error::InvalidRtcpStream); 72 } 73 74 let n = self.buffer.read(buf, None).await?; 75 let mut b = &buf[..n]; 76 let header = rtcp::header::Header::unmarshal(&mut b)?; 77 78 Ok((n, header)) 79 } 80 81 /// Close removes the ReadStream from the session and cleans up any associated state close(&self) -> Result<()>82 pub async fn close(&self) -> Result<()> { 83 self.buffer.close().await; 84 let _ = self.tx.send(self.ssrc).await; 85 Ok(()) 86 } 87 } 88