xref: /xiu/protocol/rtmp/src/session/common.rs (revision 87f493cd)
1 use {
2     super::{
3         define::{SessionSubType, SessionType},
4         errors::{SessionError, SessionErrorValue},
5     },
6     crate::{
7         channels::define::{
8             ChannelData, ChannelDataConsumer, ChannelDataProducer, ChannelEvent,
9             ChannelEventProducer,
10         },
11         chunk::{
12             define::{chunk_type, csid_type},
13             packetizer::ChunkPacketizer,
14             ChunkInfo,
15         },
16         messages::define::msg_type_id,
17     },
18     bytes::BytesMut,
19     networkio::networkio::NetworkIO,
20     std::{sync::Arc, time::Duration},
21     tokio::{
22         sync::{mpsc, oneshot, Mutex},
23         time::sleep,
24     },
25 };
26 
27 pub struct SessionInfo {
28     pub session_id: u64,
29     pub session_sub_type: SessionSubType,
30 }
31 pub struct Common {
32     packetizer: ChunkPacketizer,
33 
34     data_consumer: ChannelDataConsumer,
35     data_producer: ChannelDataProducer,
36 
37     event_producer: ChannelEventProducer,
38     session_type: SessionType,
39 }
40 
41 impl Common {
42     pub fn new(
43         net_io: Arc<Mutex<NetworkIO>>,
44         event_producer: ChannelEventProducer,
45         session_type: SessionType,
46     ) -> Self {
47         //only used for init,since I don't found a better way to deal with this.
48         let (init_producer, init_consumer) = mpsc::unbounded_channel();
49 
50         Self {
51             packetizer: ChunkPacketizer::new(Arc::clone(&net_io)),
52 
53             data_producer: init_producer,
54             data_consumer: init_consumer,
55 
56             event_producer,
57             session_type,
58         }
59     }
60     pub async fn send_channel_data(&mut self) -> Result<(), SessionError> {
61         loop {
62             if let Some(data) = self.data_consumer.recv().await {
63                 match data {
64                     ChannelData::Audio { timestamp, data } => {
65                         self.send_audio(data, timestamp).await?;
66                     }
67                     ChannelData::Video { timestamp, data } => {
68                         self.send_video(data, timestamp).await?;
69                     }
70                     ChannelData::MetaData { timestamp, data } => {
71                         self.send_metadata(data, timestamp).await?;
72                     }
73                 }
74             }
75         }
76     }
77 
78     pub async fn send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> {
79         let mut chunk_info = ChunkInfo::new(
80             csid_type::AUDIO,
81             chunk_type::TYPE_0,
82             timestamp,
83             data.len() as u32,
84             msg_type_id::AUDIO,
85             0,
86             data,
87         );
88 
89         self.packetizer.write_chunk(&mut chunk_info).await?;
90 
91         Ok(())
92     }
93 
94     pub async fn send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> {
95         let mut chunk_info = ChunkInfo::new(
96             csid_type::VIDEO,
97             chunk_type::TYPE_0,
98             timestamp,
99             data.len() as u32,
100             msg_type_id::VIDEO,
101             0,
102             data,
103         );
104 
105         self.packetizer.write_chunk(&mut chunk_info).await?;
106 
107         Ok(())
108     }
109 
110     pub async fn send_metadata(
111         &mut self,
112         data: BytesMut,
113         timestamp: u32,
114     ) -> Result<(), SessionError> {
115         let mut chunk_info = ChunkInfo::new(
116             csid_type::DATA_AMF0_AMF3,
117             chunk_type::TYPE_0,
118             timestamp,
119             data.len() as u32,
120             msg_type_id::DATA_AMF0,
121             0,
122             data,
123         );
124 
125         self.packetizer.write_chunk(&mut chunk_info).await?;
126         Ok(())
127     }
128 
129     pub fn on_video_data(
130         &mut self,
131         data: &mut BytesMut,
132         timestamp: &u32,
133     ) -> Result<(), SessionError> {
134         let data = ChannelData::Video {
135             timestamp: timestamp.clone(),
136             data: data.clone(),
137         };
138 
139         //print!("receive video data\n");
140         match self.data_producer.send(data) {
141             Ok(_) => {}
142             Err(err) => {
143                 print!("send video err {}\n", err);
144                 return Err(SessionError {
145                     value: SessionErrorValue::SendChannelDataErr,
146                 });
147             }
148         }
149 
150         Ok(())
151     }
152 
153     pub fn on_audio_data(
154         &mut self,
155         data: &mut BytesMut,
156         timestamp: &u32,
157     ) -> Result<(), SessionError> {
158         let data = ChannelData::Audio {
159             timestamp: timestamp.clone(),
160             data: data.clone(),
161         };
162 
163         match self.data_producer.send(data) {
164             Ok(_) => {}
165             Err(err) => {
166                 print!("receive audio err {}\n", err);
167                 return Err(SessionError {
168                     value: SessionErrorValue::SendChannelDataErr,
169                 });
170             }
171         }
172 
173         Ok(())
174     }
175 
176     pub fn on_meta_data(
177         &mut self,
178         body: &mut BytesMut,
179         timestamp: &u32,
180     ) -> Result<(), SessionError> {
181         let data = ChannelData::MetaData {
182             timestamp: timestamp.clone(),
183             data: body.clone(),
184         };
185 
186         match self.data_producer.send(data) {
187             Ok(_) => {}
188             Err(_) => {
189                 return Err(SessionError {
190                     value: SessionErrorValue::SendChannelDataErr,
191                 })
192             }
193         }
194 
195         Ok(())
196     }
197 
198     fn get_session_info(&mut self, session_id: u64) -> SessionInfo {
199         match self.session_type {
200             SessionType::Client => SessionInfo {
201                 session_id: session_id,
202                 session_sub_type: SessionSubType::Publisher,
203             },
204             SessionType::Server => SessionInfo {
205                 session_id: session_id,
206                 session_sub_type: SessionSubType::Player,
207             },
208         }
209     }
210 
211     pub async fn subscribe_from_channels(
212         &mut self,
213         app_name: String,
214         stream_name: String,
215         session_id: u64,
216     ) -> Result<(), SessionError> {
217         print!(
218             "subscribe info............{} {} {}\n",
219             app_name,
220             stream_name.clone(),
221             session_id
222         );
223 
224         let mut retry_count: u8 = 0;
225 
226         loop {
227             let (sender, receiver) = oneshot::channel();
228 
229             let subscribe_event = ChannelEvent::Subscribe {
230                 app_name: app_name.clone(),
231                 stream_name: stream_name.clone(),
232                 session_info: self.get_session_info(session_id),
233                 responder: sender,
234             };
235             let rv = self.event_producer.send(subscribe_event);
236             match rv {
237                 Err(_) => {
238                     return Err(SessionError {
239                         value: SessionErrorValue::ChannelEventSendErr,
240                     })
241                 }
242                 _ => {}
243             }
244 
245             match receiver.await {
246                 Ok(consumer) => {
247                     self.data_consumer = consumer;
248                     break;
249                 }
250                 Err(_) => {
251                     if retry_count > 10 {
252                         return Err(SessionError {
253                             value: SessionErrorValue::SubscribeCountLimitReach,
254                         });
255                     }
256                 }
257             }
258 
259             sleep(Duration::from_millis(800)).await;
260             retry_count = retry_count + 1;
261         }
262 
263         Ok(())
264     }
265 
266     pub async fn unsubscribe_from_channels(
267         &mut self,
268         app_name: String,
269         stream_name: String,
270         session_id: u64,
271     ) -> Result<(), SessionError> {
272         let subscribe_event = ChannelEvent::UnSubscribe {
273             app_name,
274             stream_name,
275             session_info: self.get_session_info(session_id),
276         };
277         if let Err(err) = self.event_producer.send(subscribe_event) {
278             print!("unsubscribe_from_channels err {}\n", err)
279         }
280 
281         Ok(())
282     }
283 
284     pub async fn publish_to_channels(
285         &mut self,
286         app_name: String,
287         stream_name: String,
288     ) -> Result<(), SessionError> {
289         let (sender, receiver) = oneshot::channel();
290         let publish_event = ChannelEvent::Publish {
291             app_name,
292             stream_name,
293             responder: sender,
294         };
295 
296         let rv = self.event_producer.send(publish_event);
297         match rv {
298             Err(_) => {
299                 return Err(SessionError {
300                     value: SessionErrorValue::ChannelEventSendErr,
301                 })
302             }
303             _ => {}
304         }
305 
306         match receiver.await {
307             Ok(producer) => {
308                 //print!("set producer before\n");
309                 self.data_producer = producer;
310                 //print!("set producer after\n");
311             }
312             Err(err) => {
313                 print!("publish_to_channels err{}\n", err)
314             }
315         }
316         Ok(())
317     }
318 
319     pub async fn unpublish_to_channels(
320         &mut self,
321         app_name: String,
322         stream_name: String,
323     ) -> Result<(), SessionError> {
324         let unpublish_event = ChannelEvent::UnPublish {
325             app_name,
326             stream_name,
327         };
328 
329         let rv = self.event_producer.send(unpublish_event);
330         match rv {
331             Err(_) => {
332                 println!("unpublish_to_channels error.");
333                 return Err(SessionError {
334                     value: SessionErrorValue::ChannelEventSendErr,
335                 });
336             }
337             _ => {
338                 println!("unpublish_to_channels successfully.")
339             }
340         }
341         Ok(())
342     }
343 }
344