1 use super::define;
2 use super::errors::SessionError;
3 use super::errors::SessionErrorValue;
4 use crate::handshake::handshake::SimpleHandshakeServer;
5 use crate::{amf0::Amf0ValueType, chunk::unpacketizer::UnpackResult};
6 use crate::{
7     application,
8     chunk::{unpacketizer::ChunkUnpacketizer, ChunkInfo},
9 };
10 use crate::{channels, chunk::packetizer::ChunkPacketizer};
11 use crate::{
12     chunk::define::CHUNK_SIZE,
13     chunk::define::{chunk_type, csid_type},
14 };
15 
16 use crate::messages::define::msg_type_id;
17 use crate::messages::define::RtmpMessageData;
18 
19 use crate::messages::parser::MessageParser;
20 use bytes::BytesMut;
21 
22 use netio::bytes_writer::AsyncBytesWriter;
23 use netio::bytes_writer::BytesWriter;
24 use netio::netio::NetworkIO;
25 use std::{borrow::BorrowMut, time::Duration};
26 
27 use crate::channels::define::ChannelEvent;
28 use crate::channels::define::MultiConsumerForData;
29 use crate::channels::define::MultiProducerForEvent;
30 use crate::channels::define::SingleProducerForData;
31 use crate::netconnection::commands::NetConnection;
32 use crate::netstream::commands::NetStream;
33 use crate::protocol_control_messages::control_messages::ControlMessages;
34 
35 use crate::user_control_messages::event_messages::EventMessages;
36 
37 use std::collections::HashMap;
38 
39 use tokio::{prelude::*, sync::broadcast};
40 
41 use std::sync::Arc;
42 use tokio::sync::oneshot;
43 use tokio::sync::Mutex;
44 
45 use crate::channels::define::ChannelData;
46 use crate::channels::errors::ChannelError;
47 use crate::channels::errors::ChannelErrorValue;
48 
49 enum ServerSessionState {
50     Handshake,
51     ReadChunk,
52     // OnConnect,
53     // OnCreateStream,
54     //Publish,
55     Play,
56 }
57 
58 pub struct ServerSession<S>
59 where
60     S: AsyncRead + AsyncWrite + Unpin + Send + Sync,
61 {
62     app_name: String,
63 
64     io: Arc<Mutex<NetworkIO<S>>>,
65     handshaker: SimpleHandshakeServer<S>,
66 
67     packetizer: ChunkPacketizer<S>,
68     unpacketizer: ChunkUnpacketizer,
69 
70     state: ServerSessionState,
71 
72     event_producer: MultiProducerForEvent,
73 
74     //send video, audio or metadata from publish server session to player server sessions
75     data_producer: SingleProducerForData,
76     //receive video, audio or metadata from publish server session and send out to player
77     data_consumer: MultiConsumerForData,
78 }
79 
80 impl<S> ServerSession<S>
81 where
82     S: AsyncRead + AsyncWrite + Unpin + Send + Sync,
83 {
84     pub fn new(stream: S, event_producer: MultiProducerForEvent, timeout: Duration) -> Self {
85         let net_io = Arc::new(Mutex::new(NetworkIO::new(stream, timeout)));
86         let bytes_writer = AsyncBytesWriter::new(Arc::clone(&net_io));
87 
88         //only used for init,since I don't found a better way to deal with this.
89         let (init_producer, init_consumer) = broadcast::channel(1);
90 
91         Self {
92             app_name: String::from(""),
93 
94             io: Arc::clone(&net_io),
95             handshaker: SimpleHandshakeServer::new(Arc::clone(&net_io)),
96 
97             packetizer: ChunkPacketizer::new(bytes_writer),
98             unpacketizer: ChunkUnpacketizer::new(),
99 
100             state: ServerSessionState::Handshake,
101 
102             event_producer,
103             data_producer: init_producer,
104             data_consumer: init_consumer,
105         }
106     }
107 
108     pub async fn run(&mut self) -> Result<(), SessionError> {
109         let duration = Duration::new(10, 10);
110 
111         loop {
112             let data = self.io.lock().await.read().await?;
113 
114             match self.state {
115                 ServerSessionState::Handshake => {
116                     self.handshaker.extend_data(&data[..]);
117                     let result = self.handshaker.handshake().await;
118 
119                     match result {
120                         Ok(v) => {
121                             self.state = ServerSessionState::ReadChunk;
122                         }
123                         Err(e) => {}
124                     }
125                 }
126                 ServerSessionState::ReadChunk => {
127                     self.unpacketizer.extend_data(&data[..]);
128                     let result = self.unpacketizer.read_chunk()?;
129 
130                     match result {
131                         UnpackResult::ChunkInfo(chunk_info) => {
132                             let msg_stream_id = chunk_info.message_header.msg_streamd_id;
133                             let timestamp = chunk_info.message_header.timestamp;
134 
135                             let mut message_parser = MessageParser::new(chunk_info);
136                             let mut msg = message_parser.parse()?;
137 
138                             self.process_messages(&mut msg, &msg_stream_id, &timestamp)
139                                 .await?;
140                         }
141                         _ => {}
142                     }
143                 }
144 
145                 //when in play state, only transfer publisher's video/audio/metadta to player.
146                 ServerSessionState::Play => loop {
147                     let data = self.data_consumer.recv().await;
148 
149                     match data {
150                         Ok(val) => match val {
151                             ChannelData::Audio { timestamp, data } => {
152                                 self.send_audio(data, timestamp).await?;
153                             }
154                             ChannelData::Video { timestamp, data } => {
155                                 self.send_video(data, timestamp).await?;
156                             }
157                             ChannelData::MetaData {} => {}
158                         },
159                         Err(err) => {}
160                     }
161                 },
162             }
163         }
164 
165         //Ok(())
166     }
167 
168     pub fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
169         let mut controlmessage = ControlMessages::new(AsyncBytesWriter::new(self.io.clone()));
170         controlmessage.write_set_chunk_size(CHUNK_SIZE)?;
171 
172         Ok(())
173     }
174     pub async fn send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> {
175         let mut chunk_info = ChunkInfo::new(
176             csid_type::AUDIO,
177             chunk_type::TYPE_0,
178             timestamp,
179             data.len() as u32,
180             msg_type_id::AUDIO,
181             0,
182             data,
183         );
184 
185         self.packetizer.write_chunk(&mut chunk_info).await?;
186 
187         Ok(())
188     }
189 
190     pub async fn send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> {
191         let mut chunk_info = ChunkInfo::new(
192             csid_type::VIDEO,
193             chunk_type::TYPE_0,
194             timestamp,
195             data.len() as u32,
196             msg_type_id::VIDEO,
197             0,
198             data,
199         );
200 
201         self.packetizer.write_chunk(&mut chunk_info).await?;
202 
203         Ok(())
204     }
205     pub async fn process_messages(
206         &mut self,
207         rtmp_msg: &mut RtmpMessageData,
208         msg_stream_id: &u32,
209         timestamp: &u32,
210     ) -> Result<(), SessionError> {
211         match rtmp_msg {
212             RtmpMessageData::Amf0Command {
213                 command_name,
214                 transaction_id,
215                 command_object,
216                 others,
217             } => {
218                 self.process_amf0_command_message(
219                     msg_stream_id,
220                     command_name,
221                     transaction_id,
222                     command_object,
223                     others,
224                 )
225                 .await?
226             }
227             RtmpMessageData::AudioData { data } => {
228                 self.on_audio_data(data, timestamp)?;
229             }
230             RtmpMessageData::VideoData { data } => {
231                 self.on_video_data(data, timestamp)?;
232             }
233 
234             _ => {}
235         }
236         Ok(())
237     }
238 
239     pub async fn process_amf0_command_message(
240         &mut self,
241         stream_id: &u32,
242         command_name: &Amf0ValueType,
243         transaction_id: &Amf0ValueType,
244         command_object: &Amf0ValueType,
245         others: &mut Vec<Amf0ValueType>,
246     ) -> Result<(), SessionError> {
247         let empty_cmd_name = &String::new();
248         let cmd_name = match command_name {
249             Amf0ValueType::UTF8String(str) => str,
250             _ => empty_cmd_name,
251         };
252 
253         let transaction_id = match transaction_id {
254             Amf0ValueType::Number(number) => number,
255             _ => &0.0,
256         };
257 
258         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
259         let obj = match command_object {
260             Amf0ValueType::Object(obj) => obj,
261             _ => &empty_cmd_obj,
262         };
263 
264         match cmd_name.as_str() {
265             "connect" => {
266                 self.on_connect(&transaction_id, &obj).await?;
267             }
268             "createStream" => {
269                 self.on_create_stream(transaction_id)?;
270             }
271             "deleteStream" => {
272                 if others.len() > 1 {
273                     let stream_id = match others.pop() {
274                         Some(val) => match val {
275                             Amf0ValueType::Number(streamid) => streamid,
276                             _ => 0.0,
277                         },
278                         _ => 0.0,
279                     };
280 
281                     self.on_delete_stream(transaction_id, &stream_id).await?;
282                 }
283             }
284             "play" => {
285                 self.on_play(transaction_id, stream_id, others).await?;
286             }
287             "publish" => {
288                 self.on_publish(transaction_id, stream_id, others).await?;
289             }
290             _ => {}
291         }
292 
293         Ok(())
294     }
295 
296     async fn on_connect(
297         &mut self,
298         transaction_id: &f64,
299         command_obj: &HashMap<String, Amf0ValueType>,
300     ) -> Result<(), SessionError> {
301         let mut control_message = ControlMessages::new(AsyncBytesWriter::new(self.io.clone()));
302         control_message.write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)?;
303         control_message.write_set_peer_bandwidth(
304             define::PEER_BANDWIDTH,
305             define::PeerBandWidthLimitType::DYNAMIC,
306         )?;
307         control_message.write_set_chunk_size(CHUNK_SIZE)?;
308 
309         let obj_encoding = command_obj.get("objectEncoding");
310         let encoding = match obj_encoding {
311             Some(Amf0ValueType::Number(encoding)) => encoding,
312             _ => &define::OBJENCODING_AMF0,
313         };
314 
315         let app_name = command_obj.get("app");
316         self.app_name = match app_name {
317             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
318             _ => {
319                 return Err(SessionError {
320                     value: SessionErrorValue::NoAppName,
321                 });
322             }
323         };
324 
325         let mut netconnection = NetConnection::new(BytesWriter::new());
326         let data = netconnection.connect_response(
327             &transaction_id,
328             &define::FMSVER.to_string(),
329             &define::CAPABILITIES,
330             &String::from("NetConnection.Connect.Success"),
331             &define::LEVEL.to_string(),
332             &String::from("Connection Succeeded."),
333             encoding,
334         )?;
335 
336         let mut chunk_info = ChunkInfo::new(
337             csid_type::COMMAND_AMF0_AMF3,
338             chunk_type::TYPE_0,
339             0,
340             data.len() as u32,
341             msg_type_id::COMMAND_AMF0,
342             0,
343             data,
344         );
345 
346         self.packetizer.write_chunk(&mut chunk_info).await?;
347 
348         Ok(())
349     }
350 
351     pub fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
352         let mut netconnection = NetConnection::new(BytesWriter::new());
353         netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?;
354 
355         Ok(())
356     }
357 
358     pub async fn on_delete_stream(
359         &mut self,
360         transaction_id: &f64,
361         stream_id: &f64,
362     ) -> Result<(), SessionError> {
363         let mut netstream = NetStream::new(BytesWriter::new());
364         let data = netstream.on_status(
365             transaction_id,
366             &"status".to_string(),
367             &"NetStream.DeleteStream.Suceess".to_string(),
368             &"".to_string(),
369         )?;
370 
371         let mut chunk_info = ChunkInfo::new(
372             csid_type::COMMAND_AMF0_AMF3,
373             chunk_type::TYPE_0,
374             0,
375             data.len() as u32,
376             msg_type_id::COMMAND_AMF0,
377             0,
378             data,
379         );
380 
381         self.packetizer.write_chunk(&mut chunk_info).await?;
382         Ok(())
383     }
384     pub async fn on_play(
385         &mut self,
386         transaction_id: &f64,
387         stream_id: &u32,
388         other_values: &mut Vec<Amf0ValueType>,
389     ) -> Result<(), SessionError> {
390         let length = other_values.len() as u8;
391         let mut index: u8 = 0;
392 
393         let mut stream_name: Option<String> = None;
394         let mut start: Option<f64> = None;
395         let mut duration: Option<f64> = None;
396         let mut reset: Option<bool> = None;
397 
398         loop {
399             if index >= length {
400                 break;
401             }
402             index = index + 1;
403             stream_name = match other_values.remove(0) {
404                 Amf0ValueType::UTF8String(val) => Some(val),
405                 _ => None,
406             };
407 
408             if index >= length {
409                 break;
410             }
411             index = index + 1;
412             start = match other_values.remove(0) {
413                 Amf0ValueType::Number(val) => Some(val),
414                 _ => None,
415             };
416 
417             if index >= length {
418                 break;
419             }
420             index = index + 1;
421             duration = match other_values.remove(0) {
422                 Amf0ValueType::Number(val) => Some(val),
423                 _ => None,
424             };
425 
426             if index >= length {
427                 break;
428             }
429             index = index + 1;
430             reset = match other_values.remove(0) {
431                 Amf0ValueType::Boolean(val) => Some(val),
432                 _ => None,
433             };
434             break;
435         }
436 
437         let mut event_messages = EventMessages::new(AsyncBytesWriter::new(self.io.clone()));
438         event_messages.stream_begin(stream_id.clone()).await?;
439 
440         let mut netstream = NetStream::new(BytesWriter::new());
441         match reset {
442             Some(val) => {
443                 if val {
444                     netstream.on_status(
445                         transaction_id,
446                         &"status".to_string(),
447                         &"NetStream.Play.Reset".to_string(),
448                         &"".to_string(),
449                     )?;
450                 }
451             }
452             _ => {}
453         }
454 
455         netstream.on_status(
456             transaction_id,
457             &"status".to_string(),
458             &"NetStream.Play.Start".to_string(),
459             &"".to_string(),
460         )?;
461 
462         event_messages.stream_is_record(stream_id.clone()).await?;
463 
464         self.subscribe_from_channels(stream_name.unwrap()).await?;
465         self.state = ServerSessionState::Play;
466 
467         Ok(())
468     }
469 
470     async fn subscribe_from_channels(&mut self, stream_name: String) -> Result<(), SessionError> {
471         let (sender, receiver) = oneshot::channel();
472         let subscribe_event = ChannelEvent::Subscribe {
473             app_name: self.app_name.clone(),
474             stream_name,
475             responder: sender,
476         };
477 
478         let rv = self.event_producer.send(subscribe_event);
479         match rv {
480             Err(_) => {
481                 return Err(SessionError {
482                     value: SessionErrorValue::ChannelEventSendErr,
483                 })
484             }
485             _ => {}
486         }
487 
488         match receiver.await {
489             Ok(consumer) => {
490                 self.data_consumer = consumer;
491             }
492             Err(_) => {}
493         }
494         Ok(())
495     }
496 
497     pub async fn on_publish(
498         &mut self,
499         transaction_id: &f64,
500         stream_id: &u32,
501         other_values: &mut Vec<Amf0ValueType>,
502     ) -> Result<(), SessionError> {
503         let length = other_values.len();
504 
505         if length < 2 {
506             return Err(SessionError {
507                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
508             });
509         }
510 
511         let stream_name = match other_values.remove(0) {
512             Amf0ValueType::UTF8String(val) => val,
513             _ => {
514                 return Err(SessionError {
515                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
516                 });
517             }
518         };
519 
520         let stream_type = match other_values.remove(0) {
521             Amf0ValueType::UTF8String(val) => val,
522             _ => {
523                 return Err(SessionError {
524                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
525                 });
526             }
527         };
528 
529         let mut event_messages = EventMessages::new(AsyncBytesWriter::new(self.io.clone()));
530         event_messages.stream_begin(stream_id.clone()).await?;
531 
532         let mut netstream = NetStream::new(BytesWriter::new());
533         let data = netstream.on_status(
534             transaction_id,
535             &"status".to_string(),
536             &"NetStream.Publish.Start".to_string(),
537             &"".to_string(),
538         )?;
539 
540         let mut chunk_info = ChunkInfo::new(
541             csid_type::COMMAND_AMF0_AMF3,
542             chunk_type::TYPE_0,
543             0,
544             data.len() as u32,
545             msg_type_id::COMMAND_AMF0,
546             0,
547             data,
548         );
549 
550         self.packetizer.write_chunk(&mut chunk_info).await?;
551 
552         self.publish_to_channels(stream_name).await?;
553 
554         Ok(())
555     }
556 
557     async fn publish_to_channels(&mut self, stream_name: String) -> Result<(), SessionError> {
558         let (sender, receiver) = oneshot::channel();
559         let publish_event = ChannelEvent::Publish {
560             app_name: self.app_name.clone(),
561             stream_name,
562             responder: sender,
563         };
564 
565         let rv = self.event_producer.send(publish_event);
566         match rv {
567             Err(_) => {
568                 return Err(SessionError {
569                     value: SessionErrorValue::ChannelEventSendErr,
570                 })
571             }
572             _ => {}
573         }
574 
575         match receiver.await {
576             Ok(producer) => {
577                 self.data_producer = producer;
578             }
579             Err(_) => {}
580         }
581         Ok(())
582     }
583 
584     pub fn on_video_data(
585         &mut self,
586         data: &mut BytesMut,
587         timestamp: &u32,
588     ) -> Result<(), SessionError> {
589         let data = ChannelData::Video {
590             timestamp: timestamp.clone(),
591             data: data.clone(),
592         };
593 
594         match self.data_producer.send(data) {
595             Ok(size) => {}
596             Err(_) => {
597                 return Err(SessionError {
598                     value: SessionErrorValue::SendChannelDataErr,
599                 })
600             }
601         }
602 
603         Ok(())
604     }
605 
606     pub fn on_audio_data(
607         &mut self,
608         data: &mut BytesMut,
609         timestamp: &u32,
610     ) -> Result<(), SessionError> {
611         let data = ChannelData::Audio {
612             timestamp: timestamp.clone(),
613             data: data.clone(),
614         };
615 
616         match self.data_producer.send(data) {
617             Ok(size) => {}
618             Err(_) => {
619                 return Err(SessionError {
620                     value: SessionErrorValue::SendChannelDataErr,
621                 })
622             }
623         }
624 
625         Ok(())
626     }
627 }
628