1 use super::rtsp_channel::RtcpChannel; 2 use super::rtsp_channel::RtpChannel; 3 use super::rtsp_codec::RtspCodecInfo; 4 use super::rtsp_transport::RtspTransport; 5 use crate::rtp::errors::UnPackerError; 6 use crate::rtsp_channel::TRtpFunc; 7 use bytes::BytesMut; 8 use bytesio::bytes_reader::BytesReader; 9 use bytesio::bytesio::TNetIO; 10 use std::sync::Arc; 11 use tokio::sync::Mutex; 12 13 pub trait Track { create_packer(&mut self, writer: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>)14 fn create_packer(&mut self, writer: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>); create_unpacker(&mut self)15 fn create_unpacker(&mut self); 16 } 17 #[derive(Debug, Clone, Default, Hash, Eq, PartialEq)] 18 pub enum TrackType { 19 #[default] 20 Audio, 21 Video, 22 Application, 23 } 24 25 // A track can be a audio/video track, the A/V media data is transmitted 26 // over RTP, and the control data is transmitted over RTCP. 27 // The rtp/rtcp can be over TCP or UDP : 28 // 1. Over the TCP: It shares one TCP channel with the RTSP signaling data, and 29 // the entire session uses only one TCP connection(RTSP signaling data/audio 30 // RTP/audio RTCP/video RTP/video RTCP) 31 // 2. Over the UDP: It will establish 4 UDP channles for A/V RTP/RTCP data. 32 // 2.1 A RTP channel for audio media data transmitting. 33 // 2.2 A RTCP channel for audio control data transmitting 34 // 2.3 A RTP channel for video media data transmitting. 35 // 2.4 A RTCP channel for video control data transmitting 36 pub struct RtspTrack { 37 pub track_type: TrackType, 38 39 pub transport: RtspTransport, 40 pub uri: String, 41 pub media_control: String, 42 43 pub rtp_channel: Arc<Mutex<RtpChannel>>, 44 pub rtcp_channel: Arc<Mutex<RtcpChannel>>, 45 } 46 47 impl RtspTrack { new(track_type: TrackType, codec_info: RtspCodecInfo, media_control: String) -> Self48 pub fn new(track_type: TrackType, codec_info: RtspCodecInfo, media_control: String) -> Self { 49 let rtp_channel = RtpChannel::new(codec_info); 50 51 RtspTrack { 52 track_type, 53 media_control, 54 transport: RtspTransport::default(), 55 uri: String::default(), 56 rtp_channel: Arc::new(Mutex::new(rtp_channel)), 57 rtcp_channel: Arc::new(Mutex::default()), 58 } 59 } 60 rtp_receive_loop(&mut self, mut rtp_io: Box<dyn TNetIO + Send + Sync>)61 pub async fn rtp_receive_loop(&mut self, mut rtp_io: Box<dyn TNetIO + Send + Sync>) { 62 let rtp_channel_out = self.rtp_channel.clone(); 63 tokio::spawn(async move { 64 let mut reader = BytesReader::new(BytesMut::new()); 65 let mut rtp_channel_in = rtp_channel_out.lock().await; 66 loop { 67 match rtp_io.read().await { 68 Ok(data) => { 69 reader.extend_from_slice(&data[..]); 70 if let Err(err) = rtp_channel_in.on_packet(&mut reader) { 71 log::error!("rtp_receive_loop on_packet error: {}", err); 72 } 73 } 74 Err(err) => { 75 log::error!("read error: {:?}", err); 76 break; 77 } 78 } 79 } 80 }); 81 } 82 //send and receive rtcp data in a UDP channel rtcp_receive_loop(&mut self, rtcp_io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>)83 pub async fn rtcp_receive_loop(&mut self, rtcp_io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>) { 84 let rtcp_channel_out = self.rtcp_channel.clone(); 85 86 tokio::spawn(async move { 87 let mut reader = BytesReader::new(BytesMut::new()); 88 let mut rtcp_channel_in = rtcp_channel_out.lock().await; 89 90 loop { 91 let data = match rtcp_io.lock().await.read().await { 92 Ok(data) => data, 93 Err(err) => { 94 log::error!("read error: {:?}", err); 95 break; 96 } 97 }; 98 reader.extend_from_slice(&data[..]); 99 rtcp_channel_in.on_rtcp(&mut reader, rtcp_io.clone()).await; 100 } 101 }); 102 } 103 set_transport(&mut self, transport: RtspTransport)104 pub async fn set_transport(&mut self, transport: RtspTransport) { 105 if let Some(interleaveds) = transport.interleaved { 106 self.rtcp_channel 107 .lock() 108 .await 109 .set_channel_identifier(interleaveds[1]); 110 } else { 111 log::info!("it is a udp transport!!!"); 112 } 113 114 self.transport = transport; 115 } 116 on_rtp(&mut self, reader: &mut BytesReader) -> Result<(), UnPackerError>117 pub async fn on_rtp(&mut self, reader: &mut BytesReader) -> Result<(), UnPackerError> { 118 self.rtp_channel.lock().await.on_packet(reader) 119 } 120 on_rtcp( &mut self, reader: &mut BytesReader, io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, )121 pub async fn on_rtcp( 122 &mut self, 123 reader: &mut BytesReader, 124 io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, 125 ) { 126 self.rtcp_channel.lock().await.on_rtcp(reader, io).await; 127 } 128 create_packer(&mut self, io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>)129 pub async fn create_packer(&mut self, io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>) { 130 self.rtp_channel.lock().await.create_packer(io); 131 } 132 } 133