1 use {
2     super::{
3         common::Common,
4         define,
5         define::SessionType,
6         errors::{SessionError, SessionErrorValue},
7     },
8     crate::{
9         amf0::Amf0ValueType,
10         channels::define::ChannelEventProducer,
11         chunk::{
12             define::CHUNK_SIZE,
13             unpacketizer::{ChunkUnpacketizer, UnpackResult},
14         },
15         config,
16         handshake::handshake::{HandshakeServer, ServerHandshakeState},
17         messages::{define::RtmpMessageData, parser::MessageParser},
18         netconnection::writer::NetConnection,
19         netstream::writer::NetStreamWriter,
20         protocol_control_messages::writer::ProtocolControlMessagesWriter,
21         user_control_messages::writer::EventMessagesWriter,
22     },
23     bytes::BytesMut,
24     bytesio::{bytes_writer::AsyncBytesWriter, bytesio::BytesIO},
25     std::{collections::HashMap, sync::Arc, time::Duration},
26     tokio::{net::TcpStream, sync::Mutex},
27     uuid::Uuid,
28 };
29 
30 enum ServerSessionState {
31     Handshake,
32     ReadChunk,
33     // OnConnect,
34     // OnCreateStream,
35     //Publish,
36     Play,
37 }
38 
39 pub struct ServerSession {
40     pub app_name: String,
41     pub stream_name: String,
42 
43     io: Arc<Mutex<BytesIO>>,
44     handshaker: HandshakeServer,
45 
46     unpacketizer: ChunkUnpacketizer,
47 
48     state: ServerSessionState,
49 
50     pub common: Common,
51 
52     bytesio_data: BytesMut,
53     has_remaing_data: bool,
54 
55     /* Used to mark the subscriber's the data producer
56     in channels and delete it from map when unsubscribe
57     is called. */
58     pub subscriber_id: Uuid,
59 
60     connect_command_object: Option<HashMap<String, Amf0ValueType>>,
61 }
62 
63 impl ServerSession {
64     pub fn new(stream: TcpStream, event_producer: ChannelEventProducer) -> Self {
65         let net_io = Arc::new(Mutex::new(BytesIO::new(stream)));
66         let subscriber_id = Uuid::new_v4();
67         Self {
68             app_name: String::from(""),
69             stream_name: String::from(""),
70 
71             io: Arc::clone(&net_io),
72             handshaker: HandshakeServer::new(Arc::clone(&net_io)),
73 
74             unpacketizer: ChunkUnpacketizer::new(),
75 
76             state: ServerSessionState::Handshake,
77 
78             common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Server),
79 
80             subscriber_id,
81             bytesio_data: BytesMut::new(),
82             has_remaing_data: false,
83 
84             connect_command_object: None,
85         }
86     }
87 
88     pub async fn run(&mut self) -> Result<(), SessionError> {
89         loop {
90             match self.state {
91                 ServerSessionState::Handshake => {
92                     self.handshake().await?;
93                 }
94                 ServerSessionState::ReadChunk => {
95                     self.read_parse_chunks().await?;
96                 }
97                 ServerSessionState::Play => {
98                     self.play().await?;
99                 }
100             }
101         }
102 
103         //Ok(())
104     }
105 
106     async fn handshake(&mut self) -> Result<(), SessionError> {
107         self.bytesio_data = self.io.lock().await.read().await?;
108 
109         self.handshaker.extend_data(&self.bytesio_data[..]);
110         self.handshaker.handshake().await?;
111 
112         match self.handshaker.state() {
113             ServerHandshakeState::Finish => {
114                 self.state = ServerSessionState::ReadChunk;
115 
116                 let left_bytes = self.handshaker.get_remaining_bytes();
117                 if left_bytes.len() > 0 {
118                     self.unpacketizer.extend_data(&left_bytes[..]);
119                     self.has_remaing_data = true;
120                 }
121                 self.send_set_chunk_size().await?;
122                 return Ok(());
123             }
124             _ => {}
125         }
126 
127         Ok(())
128     }
129 
130     async fn read_parse_chunks(&mut self) -> Result<(), SessionError> {
131         if !self.has_remaing_data {
132             match self
133                 .io
134                 .lock()
135                 .await
136                 .read_timeout(Duration::from_secs(2))
137                 .await
138             {
139                 Ok(data) => {
140                     self.bytesio_data = data;
141                 }
142                 Err(err) => {
143                     self.common
144                         .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone())
145                         .await?;
146 
147                     return Err(SessionError {
148                         value: SessionErrorValue::BytesIOError(err),
149                     });
150                 }
151             }
152 
153             self.unpacketizer.extend_data(&self.bytesio_data[..]);
154         }
155 
156         self.has_remaing_data = false;
157 
158         loop {
159             let result = self.unpacketizer.read_chunks();
160 
161             if let Ok(rv) = result {
162                 match rv {
163                     UnpackResult::Chunks(chunks) => {
164                         for chunk_info in chunks.iter() {
165                             let mut msg = MessageParser::new(chunk_info.clone()).parse()?;
166 
167                             let msg_stream_id = chunk_info.message_header.msg_streamd_id;
168                             let timestamp = chunk_info.message_header.timestamp;
169                             self.process_messages(&mut msg, &msg_stream_id, &timestamp)
170                                 .await?;
171                         }
172                     }
173                     _ => {}
174                 }
175             } else {
176                 break;
177             }
178         }
179         Ok(())
180     }
181 
182     async fn play(&mut self) -> Result<(), SessionError> {
183         match self.common.send_channel_data().await {
184             Ok(_) => {}
185 
186             Err(err) => {
187                 self.common
188                     .unsubscribe_from_channels(
189                         self.app_name.clone(),
190                         self.stream_name.clone(),
191                         self.subscriber_id,
192                     )
193                     .await?;
194                 return Err(err);
195             }
196         }
197 
198         Ok(())
199     }
200 
201     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
202         let mut controlmessage =
203             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
204         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
205 
206         Ok(())
207     }
208 
209     pub async fn process_messages(
210         &mut self,
211         rtmp_msg: &mut RtmpMessageData,
212         msg_stream_id: &u32,
213         timestamp: &u32,
214     ) -> Result<(), SessionError> {
215         match rtmp_msg {
216             RtmpMessageData::Amf0Command {
217                 command_name,
218                 transaction_id,
219                 command_object,
220                 others,
221             } => {
222                 self.on_amf0_command_message(
223                     msg_stream_id,
224                     command_name,
225                     transaction_id,
226                     command_object,
227                     others,
228                 )
229                 .await?
230             }
231             RtmpMessageData::SetChunkSize { chunk_size } => {
232                 self.on_set_chunk_size(chunk_size.clone() as usize)?;
233             }
234             RtmpMessageData::AudioData { data } => {
235                 self.common.on_audio_data(data, timestamp)?;
236             }
237             RtmpMessageData::VideoData { data } => {
238                 self.common.on_video_data(data, timestamp)?;
239             }
240             RtmpMessageData::AmfData { raw_data } => {
241                 self.common.on_meta_data(raw_data, timestamp)?;
242             }
243 
244             _ => {}
245         }
246         Ok(())
247     }
248 
249     pub async fn on_amf0_command_message(
250         &mut self,
251         stream_id: &u32,
252         command_name: &Amf0ValueType,
253         transaction_id: &Amf0ValueType,
254         command_object: &Amf0ValueType,
255         others: &mut Vec<Amf0ValueType>,
256     ) -> Result<(), SessionError> {
257         let empty_cmd_name = &String::new();
258         let cmd_name = match command_name {
259             Amf0ValueType::UTF8String(str) => str,
260             _ => empty_cmd_name,
261         };
262 
263         let transaction_id = match transaction_id {
264             Amf0ValueType::Number(number) => number,
265             _ => &0.0,
266         };
267 
268         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
269         let obj = match command_object {
270             Amf0ValueType::Object(obj) => obj,
271             _ => &empty_cmd_obj,
272         };
273 
274         match cmd_name.as_str() {
275             "connect" => {
276                 log::info!("[ S<-C ] [connect] ");
277                 self.on_connect(&transaction_id, &obj).await?;
278             }
279             "createStream" => {
280                 log::info!("[ S<-C ] [create stream] ");
281                 self.on_create_stream(transaction_id).await?;
282             }
283             "deleteStream" => {
284                 if others.len() > 0 {
285                     let stream_id = match others.pop() {
286                         Some(val) => match val {
287                             Amf0ValueType::Number(streamid) => streamid,
288                             _ => 0.0,
289                         },
290                         _ => 0.0,
291                     };
292                     log::info!(
293                         "[ S<-C ] [delete stream] app_name: {}, stream_name: {}",
294                         self.app_name,
295                         self.stream_name
296                     );
297 
298                     self.on_delete_stream(transaction_id, &stream_id).await?;
299                 }
300             }
301             "play" => {
302                 log::info!(
303                     "[ S<-C ] [play]  app_name: {}, stream_name: {}",
304                     self.app_name,
305                     self.stream_name
306                 );
307                 self.unpacketizer.session_type = config::SERVER_PULL;
308                 self.on_play(transaction_id, stream_id, others).await?;
309             }
310             "publish" => {
311                 self.unpacketizer.session_type = config::SERVER_PUSH;
312                 self.on_publish(transaction_id, stream_id, others).await?;
313             }
314             _ => {}
315         }
316 
317         Ok(())
318     }
319 
320     fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> {
321         self.unpacketizer.update_max_chunk_size(chunk_size);
322         Ok(())
323     }
324 
325     async fn on_connect(
326         &mut self,
327         transaction_id: &f64,
328         command_obj: &HashMap<String, Amf0ValueType>,
329     ) -> Result<(), SessionError> {
330         self.connect_command_object = Some(command_obj.clone());
331         let mut control_message =
332             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
333         log::info!("[ S->C ] [set window_acknowledgement_size]");
334         control_message
335             .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)
336             .await?;
337         log::info!("[ S->C ] [set set_peer_bandwidth]",);
338         control_message
339             .write_set_peer_bandwidth(
340                 define::PEER_BANDWIDTH,
341                 define::peer_bandwidth_limit_type::DYNAMIC,
342             )
343             .await?;
344 
345         let obj_encoding = command_obj.get("objectEncoding");
346         let encoding = match obj_encoding {
347             Some(Amf0ValueType::Number(encoding)) => encoding,
348             _ => &define::OBJENCODING_AMF0,
349         };
350 
351         let app_name = command_obj.get("app");
352         self.app_name = match app_name {
353             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
354             _ => {
355                 return Err(SessionError {
356                     value: SessionErrorValue::NoAppName,
357                 });
358             }
359         };
360 
361         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
362         netconnection
363             .write_connect_response(
364                 &transaction_id,
365                 &define::FMSVER.to_string(),
366                 &define::CAPABILITIES,
367                 &String::from("NetConnection.Connect.Success"),
368                 &define::LEVEL.to_string(),
369                 &String::from("Connection Succeeded."),
370                 encoding,
371             )
372             .await?;
373 
374         Ok(())
375     }
376 
377     pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
378         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
379         netconnection
380             .write_create_stream_response(transaction_id, &define::STREAM_ID)
381             .await?;
382 
383         log::info!(
384             "[ S->C ] [create_stream_response]  app_name: {}",
385             self.app_name,
386         );
387 
388         Ok(())
389     }
390 
391     pub async fn on_delete_stream(
392         &mut self,
393         transaction_id: &f64,
394         stream_id: &f64,
395     ) -> Result<(), SessionError> {
396         self.common
397             .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone())
398             .await?;
399 
400         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
401         netstream
402             .write_on_status(
403                 transaction_id,
404                 &"status".to_string(),
405                 &"NetStream.DeleteStream.Suceess".to_string(),
406                 &"".to_string(),
407             )
408             .await?;
409 
410         //self.unsubscribe_from_channels().await?;
411         log::info!(
412             "[ S->C ] [delete stream success]  app_name: {}, stream_name: {}",
413             self.app_name,
414             self.stream_name
415         );
416         log::trace!("{}", stream_id);
417 
418         Ok(())
419     }
420     pub async fn on_play(
421         &mut self,
422         transaction_id: &f64,
423         stream_id: &u32,
424         other_values: &mut Vec<Amf0ValueType>,
425     ) -> Result<(), SessionError> {
426         let length = other_values.len() as u8;
427         let mut index: u8 = 0;
428 
429         let mut stream_name: Option<String> = None;
430         let mut start: Option<f64> = None;
431         let mut duration: Option<f64> = None;
432         let mut reset: Option<bool> = None;
433 
434         loop {
435             if index >= length {
436                 break;
437             }
438             index = index + 1;
439             stream_name = match other_values.remove(0) {
440                 Amf0ValueType::UTF8String(val) => Some(val),
441                 _ => None,
442             };
443 
444             if index >= length {
445                 break;
446             }
447             index = index + 1;
448             start = match other_values.remove(0) {
449                 Amf0ValueType::Number(val) => Some(val),
450                 _ => None,
451             };
452 
453             if index >= length {
454                 break;
455             }
456             index = index + 1;
457             duration = match other_values.remove(0) {
458                 Amf0ValueType::Number(val) => Some(val),
459                 _ => None,
460             };
461 
462             if index >= length {
463                 break;
464             }
465             //index = index + 1;
466             reset = match other_values.remove(0) {
467                 Amf0ValueType::Boolean(val) => Some(val),
468                 _ => None,
469             };
470             break;
471         }
472 
473         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
474         event_messages.write_stream_begin(stream_id.clone()).await?;
475         log::info!(
476             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}",
477             self.app_name,
478             self.stream_name
479         );
480         log::trace!(
481             "{} {} {}",
482             start.is_some(),
483             duration.is_some(),
484             reset.is_some()
485         );
486 
487         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
488         netstream
489             .write_on_status(
490                 transaction_id,
491                 &"status".to_string(),
492                 &"NetStream.Play.Reset".to_string(),
493                 &"reset".to_string(),
494             )
495             .await?;
496 
497         netstream
498             .write_on_status(
499                 transaction_id,
500                 &"status".to_string(),
501                 &"NetStream.Play.Start".to_string(),
502                 &"play start".to_string(),
503             )
504             .await?;
505 
506         netstream
507             .write_on_status(
508                 transaction_id,
509                 &"status".to_string(),
510                 &"NetStream.Data.Start".to_string(),
511                 &"data start.".to_string(),
512             )
513             .await?;
514 
515         netstream
516             .write_on_status(
517                 transaction_id,
518                 &"status".to_string(),
519                 &"NetStream.Play.PublishNotify".to_string(),
520                 &"play publish notify.".to_string(),
521             )
522             .await?;
523 
524         event_messages
525             .write_stream_is_record(stream_id.clone())
526             .await?;
527         log::info!(
528             "[ S->C ] [stream is record]  app_name: {}, stream_name: {}",
529             self.app_name,
530             self.stream_name
531         );
532         self.stream_name = stream_name.clone().unwrap();
533         self.common
534             .subscribe_from_channels(
535                 self.app_name.clone(),
536                 stream_name.unwrap(),
537                 self.subscriber_id,
538             )
539             .await?;
540 
541         self.state = ServerSessionState::Play;
542 
543         Ok(())
544     }
545 
546     pub async fn on_publish(
547         &mut self,
548         transaction_id: &f64,
549         stream_id: &u32,
550         other_values: &mut Vec<Amf0ValueType>,
551     ) -> Result<(), SessionError> {
552         let length = other_values.len();
553 
554         if length < 2 {
555             return Err(SessionError {
556                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
557             });
558         }
559 
560         let stream_name = match other_values.remove(0) {
561             Amf0ValueType::UTF8String(val) => val,
562             _ => {
563                 return Err(SessionError {
564                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
565                 });
566             }
567         };
568 
569         self.stream_name = stream_name;
570 
571         let _ = match other_values.remove(0) {
572             Amf0ValueType::UTF8String(val) => val,
573             _ => {
574                 return Err(SessionError {
575                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
576                 });
577             }
578         };
579 
580         log::info!(
581             "[ S<-C ] [publish]  app_name: {}, stream_name: {}",
582             self.app_name,
583             self.stream_name
584         );
585 
586         log::info!(
587             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}",
588             self.app_name,
589             self.stream_name
590         );
591 
592         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
593         event_messages.write_stream_begin(stream_id.clone()).await?;
594 
595         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
596         netstream
597             .write_on_status(
598                 transaction_id,
599                 &"status".to_string(),
600                 &"NetStream.Publish.Start".to_string(),
601                 &"".to_string(),
602             )
603             .await?;
604         log::info!(
605             "[ S->C ] [NetStream.Publish.Start]  app_name: {}, stream_name: {}",
606             self.app_name,
607             self.stream_name
608         );
609 
610         self.common
611             .publish_to_channels(self.app_name.clone(), self.stream_name.clone())
612             .await?;
613 
614         Ok(())
615     }
616 }
617