1 use transport::Buffer; 2 use util::Error; 3 4 use tokio::sync::mpsc; 5 6 use std::io::Cursor; 7 8 // Limit the buffer size to 1MB 9 pub const SRTP_BUFFER_SIZE: usize = 1000 * 1000; 10 11 // Limit the buffer size to 100KB 12 pub const SRTCP_BUFFER_SIZE: usize = 100 * 1000; 13 14 // Stream handles decryption for a single RTP/RTCP SSRC 15 pub struct Stream { 16 ssrc: u32, 17 tx: mpsc::Sender<u32>, 18 buffer: Buffer, 19 is_rtp: bool, 20 } 21 22 impl Stream { 23 pub fn new(ssrc: u32, tx: mpsc::Sender<u32>, is_rtp: bool) -> Self { 24 Stream { 25 ssrc, 26 tx, 27 // Create a buffer with a 1MB limit 28 buffer: Buffer::new( 29 0, 30 if is_rtp { 31 SRTP_BUFFER_SIZE 32 } else { 33 SRTCP_BUFFER_SIZE 34 }, 35 ), 36 is_rtp, 37 } 38 } 39 40 // Get Cloned Buffer 41 pub(crate) fn get_cloned_buffer(&self) -> Buffer { 42 self.buffer.clone() 43 } 44 45 // GetSSRC returns the SSRC we are demuxing for 46 pub fn get_ssrc(&self) -> u32 { 47 self.ssrc 48 } 49 50 pub fn is_rtp_stream(&self) -> bool { 51 self.is_rtp 52 } 53 54 // Read reads and decrypts full RTP packet from the nextConn 55 pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> { 56 self.buffer.read(buf, None).await 57 } 58 59 // ReadRTP reads and decrypts full RTP packet and its header from the nextConn 60 pub async fn read_rtp( 61 &mut self, 62 buf: &mut [u8], 63 ) -> Result<(usize, rtp::header::Header), Error> { 64 if !self.is_rtp { 65 return Err(Error::new("this stream is not RTPStream".to_string())); 66 } 67 68 let n = self.buffer.read(buf, None).await?; 69 let mut reader = Cursor::new(buf); 70 let header = rtp::header::Header::unmarshal(&mut reader)?; 71 72 Ok((n, header)) 73 } 74 75 // ReadRTCP reads and decrypts full RTP packet and its header from the nextConn 76 pub async fn read_rtcp( 77 &mut self, 78 buf: &mut [u8], 79 ) -> Result<(usize, rtcp::header::Header), Error> { 80 if self.is_rtp { 81 return Err(Error::new("this stream is not RTCPStream".to_string())); 82 } 83 84 let n = self.buffer.read(buf, None).await?; 85 let mut reader = Cursor::new(buf); 86 let header = rtcp::header::Header::unmarshal(&mut reader)?; 87 88 Ok((n, header)) 89 } 90 91 // Close removes the ReadStream from the session and cleans up any associated state 92 pub async fn close(&mut self) -> Result<(), Error> { 93 self.buffer.close().await; 94 self.tx.send(self.ssrc).await?; 95 96 Ok(()) 97 } 98 } 99