xref: /xiu/protocol/rtmp/src/session/common.rs (revision ec2af5fa)
1 use {
2     super::{
3         define,
4         errors::{SessionError, SessionErrorValue},
5     },
6     crate::{
7         amf0::Amf0ValueType,
8         channels::define::{
9             ChannelData, ChannelDataConsumer, ChannelDataProducer, ChannelEvent,
10             ChannelEventProducer,
11         },
12         chunk::{
13             define::{chunk_type, csid_type, CHUNK_SIZE},
14             packetizer::ChunkPacketizer,
15             unpacketizer::{ChunkUnpacketizer, UnpackResult},
16             ChunkInfo,
17         },
18         config,
19         handshake::handshake::{ServerHandshakeState, SimpleHandshakeServer},
20         messages::{
21             define::{msg_type_id, RtmpMessageData},
22             parser::MessageParser,
23         },
24         netconnection::commands::NetConnection,
25         netstream::writer::NetStreamWriter,
26         protocol_control_messages::writer::ProtocolControlMessagesWriter,
27         user_control_messages::writer::EventMessagesWriter,
28     },
29     bytes::BytesMut,
30     networkio::{
31         bytes_writer::{AsyncBytesWriter, BytesWriter},
32         networkio::NetworkIO,
33     },
34     std::{collections::HashMap, sync::Arc},
35     tokio::{
36         net::TcpStream,
37         sync::{mpsc, oneshot, Mutex},
38     },
39 };
40 
41 pub struct Common {
42     packetizer: ChunkPacketizer,
43 
44     data_consumer: ChannelDataConsumer,
45     data_producer: ChannelDataProducer,
46 
47     event_producer: ChannelEventProducer,
48 }
49 
50 impl Common {
51     pub fn new(net_io: Arc<Mutex<NetworkIO>>, event_producer: ChannelEventProducer) -> Self {
52         //only used for init,since I don't found a better way to deal with this.
53         let (init_producer, init_consumer) = mpsc::unbounded_channel();
54 
55         Self {
56             packetizer: ChunkPacketizer::new(Arc::clone(&net_io)),
57 
58             data_producer: init_producer,
59             data_consumer: init_consumer,
60 
61             event_producer,
62         }
63     }
64     pub async fn send_channel_data(&mut self) -> Result<(), SessionError> {
65         loop {
66             if let Some(data) = self.data_consumer.recv().await {
67                 match data {
68                     ChannelData::Audio { timestamp, data } => {
69                         //print!("send audio data\n");
70                         self.send_audio(data, timestamp).await?;
71                     }
72                     ChannelData::Video { timestamp, data } => {
73                         //print!("send video data\n");
74                         self.send_video(data, timestamp).await?;
75                     }
76                     ChannelData::MetaData { body } => {
77                         print!("send meta data\n");
78                         self.send_metadata(body).await?;
79                     }
80                 }
81             }
82         }
83     }
84 
85     pub async fn send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> {
86         let mut chunk_info = ChunkInfo::new(
87             csid_type::AUDIO,
88             chunk_type::TYPE_0,
89             timestamp,
90             data.len() as u32,
91             msg_type_id::AUDIO,
92             0,
93             data,
94         );
95 
96         self.packetizer.write_chunk(&mut chunk_info).await?;
97 
98         Ok(())
99     }
100 
101     pub async fn send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> {
102         let mut chunk_info = ChunkInfo::new(
103             csid_type::VIDEO,
104             chunk_type::TYPE_0,
105             timestamp,
106             data.len() as u32,
107             msg_type_id::VIDEO,
108             0,
109             data,
110         );
111 
112         self.packetizer.write_chunk(&mut chunk_info).await?;
113 
114         Ok(())
115     }
116 
117     pub async fn send_metadata(&mut self, data: BytesMut) -> Result<(), SessionError> {
118         let mut chunk_info = ChunkInfo::new(
119             csid_type::DATA_AMF0_AMF3,
120             chunk_type::TYPE_0,
121             0,
122             data.len() as u32,
123             msg_type_id::DATA_AMF0,
124             0,
125             data,
126         );
127 
128         self.packetizer.write_chunk(&mut chunk_info).await?;
129         Ok(())
130     }
131 
132     pub fn on_video_data(
133         &mut self,
134         data: &mut BytesMut,
135         timestamp: &u32,
136     ) -> Result<(), SessionError> {
137         let data = ChannelData::Video {
138             timestamp: timestamp.clone(),
139             data: data.clone(),
140         };
141 
142         //print!("receive video data\n");
143         match self.data_producer.send(data) {
144             Ok(_) => {}
145             Err(err) => {
146                 print!("send video err {}\n", err);
147                 return Err(SessionError {
148                     value: SessionErrorValue::SendChannelDataErr,
149                 });
150             }
151         }
152 
153         Ok(())
154     }
155 
156     pub fn on_audio_data(
157         &mut self,
158         data: &mut BytesMut,
159         timestamp: &u32,
160     ) -> Result<(), SessionError> {
161         let data = ChannelData::Audio {
162             timestamp: timestamp.clone(),
163             data: data.clone(),
164         };
165 
166         match self.data_producer.send(data) {
167             Ok(_) => {}
168             Err(err) => {
169                 print!("receive audio err {}\n", err);
170                 return Err(SessionError {
171                     value: SessionErrorValue::SendChannelDataErr,
172                 });
173             }
174         }
175 
176         Ok(())
177     }
178 
179     pub fn on_amf_data(&mut self, body: &mut BytesMut) -> Result<(), SessionError> {
180         let data = ChannelData::MetaData { body: body.clone() };
181 
182         match self.data_producer.send(data) {
183             Ok(_) => {}
184             Err(_) => {
185                 return Err(SessionError {
186                     value: SessionErrorValue::SendChannelDataErr,
187                 })
188             }
189         }
190 
191         Ok(())
192     }
193 
194     pub async fn subscribe_from_channels(
195         &mut self,
196         app_name: String,
197         stream_name: String,
198         session_id: u64,
199     ) -> Result<(), SessionError> {
200         print!(
201             "subscribe info............{} {} {}\n",
202             app_name,
203             stream_name.clone(),
204             session_id
205         );
206 
207         let (sender, receiver) = oneshot::channel();
208         let subscribe_event = ChannelEvent::Subscribe {
209             app_name: app_name,
210             stream_name,
211             session_id: session_id,
212             responder: sender,
213         };
214 
215         let rv = self.event_producer.send(subscribe_event);
216         match rv {
217             Err(_) => {
218                 return Err(SessionError {
219                     value: SessionErrorValue::ChannelEventSendErr,
220                 })
221             }
222             _ => {}
223         }
224 
225         match receiver.await {
226             Ok(consumer) => {
227                 self.data_consumer = consumer;
228             }
229             Err(_) => {}
230         }
231         Ok(())
232     }
233 
234     pub async fn unsubscribe_from_channels(
235         &mut self,
236         app_name: String,
237         stream_name: String,
238         session_id: u64,
239     ) -> Result<(), SessionError> {
240         let subscribe_event = ChannelEvent::UnSubscribe {
241             app_name,
242             stream_name,
243             session_id,
244         };
245 
246         println!("unsubscribe_from_channels");
247         let _ = self.event_producer.send(subscribe_event);
248 
249         Ok(())
250     }
251 
252     pub async fn publish_to_channels(
253         &mut self,
254         app_name: String,
255         stream_name: String,
256     ) -> Result<(), SessionError> {
257         let (sender, receiver) = oneshot::channel();
258         let publish_event = ChannelEvent::Publish {
259             app_name,
260             stream_name,
261             responder: sender,
262         };
263 
264         let rv = self.event_producer.send(publish_event);
265         match rv {
266             Err(_) => {
267                 return Err(SessionError {
268                     value: SessionErrorValue::ChannelEventSendErr,
269                 })
270             }
271             _ => {}
272         }
273 
274         match receiver.await {
275             Ok(producer) => {
276                 print!("set producer before\n");
277                 self.data_producer = producer;
278                 print!("set producer after\n");
279             }
280             Err(err) => {
281                 print!("publish_to_channels err{}\n", err)
282             }
283         }
284         Ok(())
285     }
286 
287     pub async fn unpublish_to_channels(
288         &mut self,
289         app_name: String,
290         stream_name: String,
291     ) -> Result<(), SessionError> {
292         let unpublish_event = ChannelEvent::UnPublish {
293             app_name,
294             stream_name,
295         };
296 
297         let rv = self.event_producer.send(unpublish_event);
298         match rv {
299             Err(_) => {
300                 println!("unpublish_to_channels error.");
301                 return Err(SessionError {
302                     value: SessionErrorValue::ChannelEventSendErr,
303                 });
304             }
305             _ => {
306                 println!("unpublish_to_channels successfully.")
307             }
308         }
309         Ok(())
310     }
311 }
312