xref: /xiu/protocol/rtmp/src/session/common.rs (revision fcc2ec9e)
1 use {
2     super::{
3         define::{SessionSubType, SessionType},
4         errors::{SessionError, SessionErrorValue},
5     },
6     crate::{
7         amf0::Amf0ValueType,
8         channels::define::{
9             ChannelData, ChannelDataConsumer, ChannelDataProducer, ChannelEvent,
10             ChannelEventProducer,
11         },
12         chunk::{
13             define::{chunk_type, csid_type, CHUNK_SIZE},
14             packetizer::ChunkPacketizer,
15             unpacketizer::{ChunkUnpacketizer, UnpackResult},
16             ChunkInfo,
17         },
18         config,
19         handshake::handshake::{ServerHandshakeState, SimpleHandshakeServer},
20         messages::{
21             define::{msg_type_id, RtmpMessageData},
22             parser::MessageParser,
23         },
24         netconnection::commands::NetConnection,
25         netstream::writer::NetStreamWriter,
26         protocol_control_messages::writer::ProtocolControlMessagesWriter,
27         user_control_messages::writer::EventMessagesWriter,
28         utils::print::print,
29     },
30     bytes::BytesMut,
31     networkio::{
32         bytes_writer::{AsyncBytesWriter, BytesWriter},
33         networkio::NetworkIO,
34     },
35     std::{collections::HashMap, sync::Arc},
36     tokio::{
37         net::TcpStream,
38         sync::{mpsc, oneshot, Mutex},
39     },
40 };
41 
42 pub struct SessionInfo {
43     pub session_id: u64,
44     pub session_sub_type: SessionSubType,
45 }
46 pub struct Common {
47     packetizer: ChunkPacketizer,
48 
49     data_consumer: ChannelDataConsumer,
50     data_producer: ChannelDataProducer,
51 
52     event_producer: ChannelEventProducer,
53     session_type: SessionType,
54 }
55 
56 impl Common {
57     pub fn new(
58         net_io: Arc<Mutex<NetworkIO>>,
59         event_producer: ChannelEventProducer,
60         session_type: SessionType,
61     ) -> Self {
62         //only used for init,since I don't found a better way to deal with this.
63         let (init_producer, init_consumer) = mpsc::unbounded_channel();
64 
65         Self {
66             packetizer: ChunkPacketizer::new(Arc::clone(&net_io)),
67 
68             data_producer: init_producer,
69             data_consumer: init_consumer,
70 
71             event_producer,
72             session_type,
73         }
74     }
75     pub async fn send_channel_data(&mut self) -> Result<(), SessionError> {
76         loop {
77             if let Some(data) = self.data_consumer.recv().await {
78                 match data {
79                     ChannelData::Audio { timestamp, data } => {
80                         self.send_audio(data, timestamp).await?;
81                     }
82                     ChannelData::Video { timestamp, data } => {
83                         self.send_video(data, timestamp).await?;
84                     }
85                     ChannelData::MetaData { body } => {
86                         self.send_metadata(body).await?;
87                     }
88                 }
89             }
90         }
91     }
92 
93     pub async fn send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> {
94         let mut chunk_info = ChunkInfo::new(
95             csid_type::AUDIO,
96             chunk_type::TYPE_0,
97             timestamp,
98             data.len() as u32,
99             msg_type_id::AUDIO,
100             0,
101             data,
102         );
103 
104         self.packetizer.write_chunk(&mut chunk_info).await?;
105 
106         Ok(())
107     }
108 
109     pub async fn send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> {
110         let mut chunk_info = ChunkInfo::new(
111             csid_type::VIDEO,
112             chunk_type::TYPE_0,
113             timestamp,
114             data.len() as u32,
115             msg_type_id::VIDEO,
116             0,
117             data,
118         );
119 
120         self.packetizer.write_chunk(&mut chunk_info).await?;
121 
122         Ok(())
123     }
124 
125     pub async fn send_metadata(&mut self, data: BytesMut) -> Result<(), SessionError> {
126         let mut chunk_info = ChunkInfo::new(
127             csid_type::DATA_AMF0_AMF3,
128             chunk_type::TYPE_0,
129             0,
130             data.len() as u32,
131             msg_type_id::DATA_AMF0,
132             0,
133             data,
134         );
135 
136         self.packetizer.write_chunk(&mut chunk_info).await?;
137         Ok(())
138     }
139 
140     pub fn on_video_data(
141         &mut self,
142         data: &mut BytesMut,
143         timestamp: &u32,
144     ) -> Result<(), SessionError> {
145         let data = ChannelData::Video {
146             timestamp: timestamp.clone(),
147             data: data.clone(),
148         };
149 
150         //print!("receive video data\n");
151         match self.data_producer.send(data) {
152             Ok(_) => {}
153             Err(err) => {
154                 print!("send video err {}\n", err);
155                 return Err(SessionError {
156                     value: SessionErrorValue::SendChannelDataErr,
157                 });
158             }
159         }
160 
161         Ok(())
162     }
163 
164     pub fn on_audio_data(
165         &mut self,
166         data: &mut BytesMut,
167         timestamp: &u32,
168     ) -> Result<(), SessionError> {
169         let data = ChannelData::Audio {
170             timestamp: timestamp.clone(),
171             data: data.clone(),
172         };
173 
174         match self.data_producer.send(data) {
175             Ok(_) => {}
176             Err(err) => {
177                 print!("receive audio err {}\n", err);
178                 return Err(SessionError {
179                     value: SessionErrorValue::SendChannelDataErr,
180                 });
181             }
182         }
183 
184         Ok(())
185     }
186 
187     pub fn on_amf_data(&mut self, body: &mut BytesMut) -> Result<(), SessionError> {
188         let data = ChannelData::MetaData { body: body.clone() };
189 
190         match self.data_producer.send(data) {
191             Ok(_) => {}
192             Err(_) => {
193                 return Err(SessionError {
194                     value: SessionErrorValue::SendChannelDataErr,
195                 })
196             }
197         }
198 
199         Ok(())
200     }
201 
202     fn get_session_info(&mut self, session_id: u64) -> SessionInfo {
203         match self.session_type {
204             SessionType::Client => SessionInfo {
205                 session_id: session_id,
206                 session_sub_type: SessionSubType::Publisher,
207             },
208             SessionType::Server => SessionInfo {
209                 session_id: session_id,
210                 session_sub_type: SessionSubType::Player,
211             },
212         }
213     }
214 
215     pub async fn subscribe_from_channels(
216         &mut self,
217         app_name: String,
218         stream_name: String,
219         session_id: u64,
220     ) -> Result<(), SessionError> {
221         print!(
222             "subscribe info............{} {} {}\n",
223             app_name,
224             stream_name.clone(),
225             session_id
226         );
227 
228         let (sender, receiver) = oneshot::channel();
229 
230         let subscribe_event = ChannelEvent::Subscribe {
231             app_name: app_name,
232             stream_name,
233             session_info: self.get_session_info(session_id),
234             responder: sender,
235         };
236 
237         let rv = self.event_producer.send(subscribe_event);
238         match rv {
239             Err(_) => {
240                 return Err(SessionError {
241                     value: SessionErrorValue::ChannelEventSendErr,
242                 })
243             }
244             _ => {}
245         }
246 
247         match receiver.await {
248             Ok(consumer) => {
249                 self.data_consumer = consumer;
250             }
251             Err(_) => {}
252         }
253         Ok(())
254     }
255 
256     pub async fn unsubscribe_from_channels(
257         &mut self,
258         app_name: String,
259         stream_name: String,
260         session_id: u64,
261     ) -> Result<(), SessionError> {
262         let subscribe_event = ChannelEvent::UnSubscribe {
263             app_name,
264             stream_name,
265             session_info: self.get_session_info(session_id),
266         };
267         if let Err(err) = self.event_producer.send(subscribe_event) {
268             print!("unsubscribe_from_channels err {}\n", err)
269         }
270 
271         Ok(())
272     }
273 
274     pub async fn publish_to_channels(
275         &mut self,
276         app_name: String,
277         stream_name: String,
278         connect_command_object: HashMap<String, Amf0ValueType>,
279     ) -> Result<(), SessionError> {
280         let (sender, receiver) = oneshot::channel();
281         let publish_event = ChannelEvent::Publish {
282             app_name,
283             stream_name,
284             responder: sender,
285             connect_command_object,
286         };
287 
288         let rv = self.event_producer.send(publish_event);
289         match rv {
290             Err(_) => {
291                 return Err(SessionError {
292                     value: SessionErrorValue::ChannelEventSendErr,
293                 })
294             }
295             _ => {}
296         }
297 
298         match receiver.await {
299             Ok(producer) => {
300                 //print!("set producer before\n");
301                 self.data_producer = producer;
302                 //print!("set producer after\n");
303             }
304             Err(err) => {
305                 print!("publish_to_channels err{}\n", err)
306             }
307         }
308         Ok(())
309     }
310 
311     pub async fn unpublish_to_channels(
312         &mut self,
313         app_name: String,
314         stream_name: String,
315     ) -> Result<(), SessionError> {
316         let unpublish_event = ChannelEvent::UnPublish {
317             app_name,
318             stream_name,
319         };
320 
321         let rv = self.event_producer.send(unpublish_event);
322         match rv {
323             Err(_) => {
324                 println!("unpublish_to_channels error.");
325                 return Err(SessionError {
326                     value: SessionErrorValue::ChannelEventSendErr,
327                 });
328             }
329             _ => {
330                 println!("unpublish_to_channels successfully.")
331             }
332         }
333         Ok(())
334     }
335 }
336