xref: /xiu/protocol/rtsp/src/rtsp_track.rs (revision b36cf5da)
1 use super::rtsp_channel::RtcpChannel;
2 use super::rtsp_channel::RtpChannel;
3 use super::rtsp_codec::RtspCodecInfo;
4 use super::rtsp_transport::RtspTransport;
5 use crate::rtp::errors::UnPackerError;
6 use crate::rtsp_channel::TRtpFunc;
7 use bytes::BytesMut;
8 use bytesio::bytes_reader::BytesReader;
9 use bytesio::bytesio::TNetIO;
10 use std::sync::Arc;
11 use tokio::sync::Mutex;
12 
13 pub trait Track {
create_packer(&mut self, writer: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>)14     fn create_packer(&mut self, writer: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>);
create_unpacker(&mut self)15     fn create_unpacker(&mut self);
16 }
17 #[derive(Debug, Clone, Default, Hash, Eq, PartialEq)]
18 pub enum TrackType {
19     #[default]
20     Audio,
21     Video,
22     Application,
23 }
24 
25 // A track can be a audio/video track, the A/V media data is transmitted
26 // over RTP, and the control data is transmitted over RTCP.
27 // The rtp/rtcp can be over TCP or UDP :
28 // 1. Over the TCP: It shares one TCP channel with the RTSP signaling data, and
29 // the entire session uses only one TCP connection(RTSP signaling data/audio
30 // RTP/audio RTCP/video RTP/video RTCP)
31 // 2. Over the UDP: It will establish 4 UDP channles for A/V RTP/RTCP data.
32 // 2.1 A RTP channel for audio media data transmitting.
33 // 2.2 A RTCP channel for audio control data transmitting
34 // 2.3 A RTP channel for video media data transmitting.
35 // 2.4 A RTCP channel for video control data transmitting
36 pub struct RtspTrack {
37     pub track_type: TrackType,
38 
39     pub transport: RtspTransport,
40     pub uri: String,
41     pub media_control: String,
42 
43     pub rtp_channel: Arc<Mutex<RtpChannel>>,
44     pub rtcp_channel: Arc<Mutex<RtcpChannel>>,
45 }
46 
47 impl RtspTrack {
new(track_type: TrackType, codec_info: RtspCodecInfo, media_control: String) -> Self48     pub fn new(track_type: TrackType, codec_info: RtspCodecInfo, media_control: String) -> Self {
49         let rtp_channel = RtpChannel::new(codec_info);
50 
51         RtspTrack {
52             track_type,
53             media_control,
54             transport: RtspTransport::default(),
55             uri: String::default(),
56             rtp_channel: Arc::new(Mutex::new(rtp_channel)),
57             rtcp_channel: Arc::new(Mutex::default()),
58         }
59     }
60 
rtp_receive_loop(&mut self, mut rtp_io: Box<dyn TNetIO + Send + Sync>)61     pub async fn rtp_receive_loop(&mut self, mut rtp_io: Box<dyn TNetIO + Send + Sync>) {
62         let rtp_channel_out = self.rtp_channel.clone();
63         tokio::spawn(async move {
64             let mut reader = BytesReader::new(BytesMut::new());
65             let mut rtp_channel_in = rtp_channel_out.lock().await;
66             loop {
67                 match rtp_io.read().await {
68                     Ok(data) => {
69                         reader.extend_from_slice(&data[..]);
70                         if let Err(err) = rtp_channel_in.on_packet(&mut reader) {
71                             log::error!("rtp_receive_loop on_packet error: {}", err);
72                         }
73                     }
74                     Err(err) => {
75                         log::error!("read error: {:?}", err);
76                         break;
77                     }
78                 }
79             }
80         });
81     }
82     //send and receive rtcp data in a UDP channel
rtcp_receive_loop(&mut self, rtcp_io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>)83     pub async fn rtcp_receive_loop(&mut self, rtcp_io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>) {
84         let rtcp_channel_out = self.rtcp_channel.clone();
85 
86         tokio::spawn(async move {
87             let mut reader = BytesReader::new(BytesMut::new());
88             let mut rtcp_channel_in = rtcp_channel_out.lock().await;
89 
90             loop {
91                 let data = match rtcp_io.lock().await.read().await {
92                     Ok(data) => data,
93                     Err(err) => {
94                         log::error!("read error: {:?}", err);
95                         break;
96                     }
97                 };
98                 reader.extend_from_slice(&data[..]);
99                 rtcp_channel_in.on_rtcp(&mut reader, rtcp_io.clone()).await;
100             }
101         });
102     }
103 
set_transport(&mut self, transport: RtspTransport)104     pub async fn set_transport(&mut self, transport: RtspTransport) {
105         if let Some(interleaveds) = transport.interleaved {
106             self.rtcp_channel
107                 .lock()
108                 .await
109                 .set_channel_identifier(interleaveds[1]);
110         } else {
111             log::info!("it is a udp transport!!!");
112         }
113 
114         self.transport = transport;
115     }
116 
on_rtp(&mut self, reader: &mut BytesReader) -> Result<(), UnPackerError>117     pub async fn on_rtp(&mut self, reader: &mut BytesReader) -> Result<(), UnPackerError> {
118         self.rtp_channel.lock().await.on_packet(reader)
119     }
120 
on_rtcp( &mut self, reader: &mut BytesReader, io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, )121     pub async fn on_rtcp(
122         &mut self,
123         reader: &mut BytesReader,
124         io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>,
125     ) {
126         self.rtcp_channel.lock().await.on_rtcp(reader, io).await;
127     }
128 
create_packer(&mut self, io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>)129     pub async fn create_packer(&mut self, io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>) {
130         self.rtp_channel.lock().await.create_packer(io);
131     }
132 }
133