xref: /xiu/protocol/rtmp/src/session/common.rs (revision 88d91efd)
1 use {
2     super::{
3         define::{SessionSubType, SessionType},
4         errors::{SessionError, SessionErrorValue},
5     },
6     crate::{
7         channels::define::{
8             ChannelData, ChannelDataConsumer, ChannelDataProducer, ChannelEvent,
9             ChannelEventProducer,
10         },
11         chunk::{
12             define::{chunk_type, csid_type},
13             packetizer::ChunkPacketizer,
14             ChunkInfo,
15         },
16         messages::define::msg_type_id,
17     },
18     bytes::BytesMut,
19     bytesio::bytesio::BytesIO,
20     std::{sync::Arc, time::Duration},
21     tokio::{
22         sync::{mpsc, oneshot, Mutex},
23         time::sleep,
24     },
25     uuid::Uuid,
26 };
27 #[derive(Debug)]
28 pub struct SessionInfo {
29     pub subscriber_id: Uuid,
30     pub session_sub_type: SessionSubType,
31 }
32 pub struct Common {
33     packetizer: ChunkPacketizer,
34 
35     data_consumer: ChannelDataConsumer,
36     data_producer: ChannelDataProducer,
37 
38     event_producer: ChannelEventProducer,
39     pub session_type: SessionType,
40 }
41 
42 impl Common {
43     pub fn new(
44         net_io: Arc<Mutex<BytesIO>>,
45         event_producer: ChannelEventProducer,
46         session_type: SessionType,
47     ) -> Self {
48         //only used for init,since I don't found a better way to deal with this.
49         let (init_producer, init_consumer) = mpsc::unbounded_channel();
50 
51         Self {
52             packetizer: ChunkPacketizer::new(Arc::clone(&net_io)),
53 
54             data_producer: init_producer,
55             data_consumer: init_consumer,
56 
57             event_producer,
58             session_type,
59         }
60     }
61     pub async fn send_channel_data(&mut self) -> Result<(), SessionError> {
62         loop {
63             if let Some(data) = self.data_consumer.recv().await {
64                 match data {
65                     ChannelData::Audio { timestamp, data } => {
66                         self.send_audio(data, timestamp).await?;
67                     }
68                     ChannelData::Video { timestamp, data } => {
69                         self.send_video(data, timestamp).await?;
70                     }
71                     ChannelData::MetaData { timestamp, data } => {
72                         self.send_metadata(data, timestamp).await?;
73                     }
74                 }
75             }
76         }
77     }
78 
79     pub async fn send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> {
80         let mut chunk_info = ChunkInfo::new(
81             csid_type::AUDIO,
82             chunk_type::TYPE_0,
83             timestamp,
84             data.len() as u32,
85             msg_type_id::AUDIO,
86             0,
87             data,
88         );
89 
90         self.packetizer.write_chunk(&mut chunk_info).await?;
91 
92         Ok(())
93     }
94 
95     pub async fn send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> {
96         let mut chunk_info = ChunkInfo::new(
97             csid_type::VIDEO,
98             chunk_type::TYPE_0,
99             timestamp,
100             data.len() as u32,
101             msg_type_id::VIDEO,
102             0,
103             data,
104         );
105 
106         self.packetizer.write_chunk(&mut chunk_info).await?;
107 
108         Ok(())
109     }
110 
111     pub async fn send_metadata(
112         &mut self,
113         data: BytesMut,
114         timestamp: u32,
115     ) -> Result<(), SessionError> {
116         let mut chunk_info = ChunkInfo::new(
117             csid_type::DATA_AMF0_AMF3,
118             chunk_type::TYPE_0,
119             timestamp,
120             data.len() as u32,
121             msg_type_id::DATA_AMF0,
122             0,
123             data,
124         );
125 
126         self.packetizer.write_chunk(&mut chunk_info).await?;
127         Ok(())
128     }
129 
130     pub fn on_video_data(
131         &mut self,
132         data: &mut BytesMut,
133         timestamp: &u32,
134     ) -> Result<(), SessionError> {
135         let data = ChannelData::Video {
136             timestamp: timestamp.clone(),
137             data: data.clone(),
138         };
139 
140         match self.data_producer.send(data) {
141             Ok(_) => {}
142             Err(err) => {
143                 log::error!("send video err: {}", err);
144                 return Err(SessionError {
145                     value: SessionErrorValue::SendChannelDataErr,
146                 });
147             }
148         }
149 
150         Ok(())
151     }
152 
153     pub fn on_audio_data(
154         &mut self,
155         data: &mut BytesMut,
156         timestamp: &u32,
157     ) -> Result<(), SessionError> {
158         let data = ChannelData::Audio {
159             timestamp: timestamp.clone(),
160             data: data.clone(),
161         };
162 
163         match self.data_producer.send(data) {
164             Ok(_) => {}
165             Err(err) => {
166                 log::error!("receive audio err {}\n", err);
167                 return Err(SessionError {
168                     value: SessionErrorValue::SendChannelDataErr,
169                 });
170             }
171         }
172 
173         Ok(())
174     }
175 
176     pub fn on_meta_data(
177         &mut self,
178         body: &mut BytesMut,
179         timestamp: &u32,
180     ) -> Result<(), SessionError> {
181         let data = ChannelData::MetaData {
182             timestamp: timestamp.clone(),
183             data: body.clone(),
184         };
185 
186         match self.data_producer.send(data) {
187             Ok(_) => {}
188             Err(_) => {
189                 return Err(SessionError {
190                     value: SessionErrorValue::SendChannelDataErr,
191                 })
192             }
193         }
194 
195         Ok(())
196     }
197 
198     fn get_session_info(&mut self, sub_id: Uuid) -> SessionInfo {
199         match self.session_type {
200             SessionType::Client => SessionInfo {
201                 subscriber_id: sub_id,
202                 session_sub_type: SessionSubType::Publisher,
203             },
204             SessionType::Server => SessionInfo {
205                 subscriber_id: sub_id,
206                 session_sub_type: SessionSubType::Player,
207             },
208         }
209     }
210 
211     /*Begin to send data to common player or relay pull client*/
212     pub async fn subscribe_from_channels(
213         &mut self,
214         app_name: String,
215         stream_name: String,
216         sub_id: Uuid,
217     ) -> Result<(), SessionError> {
218         log::info!(
219             "subscribe_from_channels, app_name: {} stream_name: {} subscribe_id: {}",
220             app_name,
221             stream_name.clone(),
222             sub_id
223         );
224 
225         let mut retry_count: u8 = 0;
226 
227         loop {
228             let (sender, receiver) = oneshot::channel();
229 
230             let subscribe_event = ChannelEvent::Subscribe {
231                 app_name: app_name.clone(),
232                 stream_name: stream_name.clone(),
233                 session_info: self.get_session_info(sub_id),
234                 responder: sender,
235             };
236             let rv = self.event_producer.send(subscribe_event);
237             match rv {
238                 Err(_) => {
239                     return Err(SessionError {
240                         value: SessionErrorValue::ChannelEventSendErr,
241                     })
242                 }
243                 _ => {}
244             }
245 
246             match receiver.await {
247                 Ok(consumer) => {
248                     self.data_consumer = consumer;
249                     break;
250                 }
251                 Err(_) => {
252                     if retry_count > 10 {
253                         return Err(SessionError {
254                             value: SessionErrorValue::SubscribeCountLimitReach,
255                         });
256                     }
257                 }
258             }
259 
260             sleep(Duration::from_millis(800)).await;
261             retry_count = retry_count + 1;
262         }
263 
264         Ok(())
265     }
266 
267     pub async fn unsubscribe_from_channels(
268         &mut self,
269         app_name: String,
270         stream_name: String,
271         sub_id: Uuid,
272     ) -> Result<(), SessionError> {
273         let subscribe_event = ChannelEvent::UnSubscribe {
274             app_name,
275             stream_name,
276             session_info: self.get_session_info(sub_id),
277         };
278         if let Err(err) = self.event_producer.send(subscribe_event) {
279             log::error!("unsubscribe_from_channels err {}\n", err);
280         }
281 
282         Ok(())
283     }
284 
285     /*Begin to receive stream data from RTMP push client or RTMP relay push client*/
286     pub async fn publish_to_channels(
287         &mut self,
288         app_name: String,
289         stream_name: String,
290     ) -> Result<(), SessionError> {
291         let (sender, receiver) = oneshot::channel();
292         let publish_event = ChannelEvent::Publish {
293             app_name,
294             stream_name,
295             responder: sender,
296         };
297 
298         let rv = self.event_producer.send(publish_event);
299         match rv {
300             Err(_) => {
301                 return Err(SessionError {
302                     value: SessionErrorValue::ChannelEventSendErr,
303                 })
304             }
305             _ => {}
306         }
307 
308         match receiver.await {
309             Ok(producer) => {
310                 self.data_producer = producer;
311             }
312             Err(err) => {
313                 log::error!("publish_to_channels err{}\n", err);
314             }
315         }
316         Ok(())
317     }
318 
319     pub async fn unpublish_to_channels(
320         &mut self,
321         app_name: String,
322         stream_name: String,
323     ) -> Result<(), SessionError> {
324         let unpublish_event = ChannelEvent::UnPublish {
325             app_name: app_name.clone(),
326             stream_name: stream_name.clone(),
327         };
328 
329         let rv = self.event_producer.send(unpublish_event);
330         match rv {
331             Err(_) => {
332                 log::error!(
333                     "unpublish_to_channels error.app_name: {}, stream_name: {}",
334                     app_name,
335                     stream_name
336                 );
337                 return Err(SessionError {
338                     value: SessionErrorValue::ChannelEventSendErr,
339                 });
340             }
341             _ => {
342                 log::info!(
343                     "unpublish_to_channels successfully.app_name: {}, stream_name: {}",
344                     app_name,
345                     stream_name
346                 );
347             }
348         }
349         Ok(())
350     }
351 }
352