1 use {
2     super::{
3         common::Common,
4         define,
5         define::SessionType,
6         errors::{SessionError, SessionErrorValue},
7     },
8     crate::{
9         amf0::Amf0ValueType,
10         channels::define::ChannelEventProducer,
11         chunk::{
12             define::CHUNK_SIZE,
13             unpacketizer::{ChunkUnpacketizer, UnpackResult},
14         },
15         config,
16         handshake::handshake::{HandshakeServer, ServerHandshakeState},
17         messages::{define::RtmpMessageData, parser::MessageParser},
18         netconnection::writer::NetConnection,
19         netstream::writer::NetStreamWriter,
20         protocol_control_messages::writer::ProtocolControlMessagesWriter,
21         user_control_messages::writer::EventMessagesWriter,
22     },
23     bytes::BytesMut,
24     bytesio::{bytes_writer::AsyncBytesWriter, bytesio::BytesIO},
25     std::{collections::HashMap, sync::Arc},
26     tokio::{net::TcpStream, sync::Mutex},
27     uuid::Uuid,
28 };
29 
30 enum ServerSessionState {
31     Handshake,
32     ReadChunk,
33     // OnConnect,
34     // OnCreateStream,
35     //Publish,
36     Play,
37 }
38 
39 pub struct ServerSession {
40     pub app_name: String,
41     pub stream_name: String,
42 
43     io: Arc<Mutex<BytesIO>>,
44     handshaker: HandshakeServer,
45 
46     unpacketizer: ChunkUnpacketizer,
47 
48     state: ServerSessionState,
49 
50     pub common: Common,
51 
52     bytesio_data: BytesMut,
53     need_process: bool,
54 
55     /* Used to mark the subscriber's the data producer
56     in channels and delete it from map when unsubscribe
57     is called. */
58     pub subscriber_id: Uuid,
59 
60     connect_command_object: Option<HashMap<String, Amf0ValueType>>,
61 }
62 
63 impl ServerSession {
64     pub fn new(stream: TcpStream, event_producer: ChannelEventProducer) -> Self {
65         let net_io = Arc::new(Mutex::new(BytesIO::new(stream)));
66         let subscriber_id = Uuid::new_v4();
67         Self {
68             app_name: String::from(""),
69             stream_name: String::from(""),
70 
71             io: Arc::clone(&net_io),
72             handshaker: HandshakeServer::new(Arc::clone(&net_io)),
73 
74             unpacketizer: ChunkUnpacketizer::new(),
75 
76             state: ServerSessionState::Handshake,
77 
78             common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Server),
79 
80             subscriber_id,
81             bytesio_data: BytesMut::new(),
82             need_process: false,
83 
84             connect_command_object: None,
85         }
86     }
87 
88     pub async fn run(&mut self) -> Result<(), SessionError> {
89         loop {
90             match self.state {
91                 ServerSessionState::Handshake => {
92                     self.handshake().await?;
93                 }
94                 ServerSessionState::ReadChunk => {
95                     self.read_parse_chunks().await?;
96                 }
97                 ServerSessionState::Play => {
98                     self.play().await?;
99                 }
100             }
101         }
102 
103         //Ok(())
104     }
105 
106     async fn handshake(&mut self) -> Result<(), SessionError> {
107         self.bytesio_data = self.io.lock().await.read().await?;
108         self.handshaker.extend_data(&self.bytesio_data[..]);
109         self.handshaker.handshake().await?;
110 
111         match self.handshaker.state() {
112             ServerHandshakeState::Finish => {
113                 self.state = ServerSessionState::ReadChunk;
114 
115                 let left_bytes = self.handshaker.get_remaining_bytes();
116                 if left_bytes.len() > 0 {
117                     self.unpacketizer.extend_data(&left_bytes[..]);
118                     self.need_process = true;
119                 }
120                 self.send_set_chunk_size().await?;
121                 return Ok(());
122             }
123             _ => {}
124         }
125 
126         Ok(())
127     }
128 
129     async fn read_parse_chunks(&mut self) -> Result<(), SessionError> {
130         if !self.need_process {
131             self.bytesio_data = self.io.lock().await.read().await?;
132             self.unpacketizer.extend_data(&self.bytesio_data[..]);
133         }
134 
135         self.need_process = false;
136 
137         loop {
138             let result = self.unpacketizer.read_chunks();
139 
140             if let Ok(rv) = result {
141                 match rv {
142                     UnpackResult::Chunks(chunks) => {
143                         for chunk_info in chunks.iter() {
144                             let mut msg = MessageParser::new(chunk_info.clone()).parse()?;
145 
146                             let msg_stream_id = chunk_info.message_header.msg_streamd_id;
147                             let timestamp = chunk_info.message_header.timestamp;
148                             self.process_messages(&mut msg, &msg_stream_id, &timestamp)
149                                 .await?;
150                         }
151                     }
152                     _ => {}
153                 }
154             } else {
155                 break;
156             }
157         }
158         Ok(())
159     }
160 
161     async fn play(&mut self) -> Result<(), SessionError> {
162         match self.common.send_channel_data().await {
163             Ok(_) => {}
164 
165             Err(err) => {
166                 self.common
167                     .unsubscribe_from_channels(
168                         self.app_name.clone(),
169                         self.stream_name.clone(),
170                         self.subscriber_id,
171                     )
172                     .await?;
173                 return Err(err);
174             }
175         }
176 
177         Ok(())
178     }
179 
180     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
181         let mut controlmessage =
182             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
183         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
184 
185         Ok(())
186     }
187 
188     pub async fn process_messages(
189         &mut self,
190         rtmp_msg: &mut RtmpMessageData,
191         msg_stream_id: &u32,
192         timestamp: &u32,
193     ) -> Result<(), SessionError> {
194         match rtmp_msg {
195             RtmpMessageData::Amf0Command {
196                 command_name,
197                 transaction_id,
198                 command_object,
199                 others,
200             } => {
201                 self.on_amf0_command_message(
202                     msg_stream_id,
203                     command_name,
204                     transaction_id,
205                     command_object,
206                     others,
207                 )
208                 .await?
209             }
210             RtmpMessageData::SetChunkSize { chunk_size } => {
211                 self.on_set_chunk_size(chunk_size.clone() as usize)?;
212             }
213             RtmpMessageData::AudioData { data } => {
214                 self.common.on_audio_data(data, timestamp)?;
215             }
216             RtmpMessageData::VideoData { data } => {
217                 self.common.on_video_data(data, timestamp)?;
218             }
219             RtmpMessageData::AmfData { raw_data } => {
220                 self.common.on_meta_data(raw_data, timestamp)?;
221             }
222 
223             _ => {}
224         }
225         Ok(())
226     }
227 
228     pub async fn on_amf0_command_message(
229         &mut self,
230         stream_id: &u32,
231         command_name: &Amf0ValueType,
232         transaction_id: &Amf0ValueType,
233         command_object: &Amf0ValueType,
234         others: &mut Vec<Amf0ValueType>,
235     ) -> Result<(), SessionError> {
236         let empty_cmd_name = &String::new();
237         let cmd_name = match command_name {
238             Amf0ValueType::UTF8String(str) => str,
239             _ => empty_cmd_name,
240         };
241 
242         let transaction_id = match transaction_id {
243             Amf0ValueType::Number(number) => number,
244             _ => &0.0,
245         };
246 
247         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
248         let obj = match command_object {
249             Amf0ValueType::Object(obj) => obj,
250             _ => &empty_cmd_obj,
251         };
252 
253         match cmd_name.as_str() {
254             "connect" => {
255                 log::info!("[ S<-C ] [connect] ");
256                 self.on_connect(&transaction_id, &obj).await?;
257             }
258             "createStream" => {
259                 log::info!("[ S<-C ] [create stream] ");
260                 self.on_create_stream(transaction_id).await?;
261             }
262             "deleteStream" => {
263                 if others.len() > 0 {
264                     let stream_id = match others.pop() {
265                         Some(val) => match val {
266                             Amf0ValueType::Number(streamid) => streamid,
267                             _ => 0.0,
268                         },
269                         _ => 0.0,
270                     };
271                     log::info!(
272                         "[ S<-C ] [delete stream] app_name: {}, stream_name: {}",
273                         self.app_name,
274                         self.stream_name
275                     );
276 
277                     self.on_delete_stream(transaction_id, &stream_id).await?;
278                 }
279             }
280             "play" => {
281                 log::info!(
282                     "[ S<-C ] [play]  app_name: {}, stream_name: {}",
283                     self.app_name,
284                     self.stream_name
285                 );
286                 self.unpacketizer.session_type = config::SERVER_PULL;
287                 self.on_play(transaction_id, stream_id, others).await?;
288             }
289             "publish" => {
290                 self.unpacketizer.session_type = config::SERVER_PUSH;
291                 self.on_publish(transaction_id, stream_id, others).await?;
292             }
293             _ => {}
294         }
295 
296         Ok(())
297     }
298 
299     fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> {
300         self.unpacketizer.update_max_chunk_size(chunk_size);
301         Ok(())
302     }
303 
304     async fn on_connect(
305         &mut self,
306         transaction_id: &f64,
307         command_obj: &HashMap<String, Amf0ValueType>,
308     ) -> Result<(), SessionError> {
309         self.connect_command_object = Some(command_obj.clone());
310         let mut control_message =
311             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
312         log::info!("[ S->C ] [set window_acknowledgement_size]");
313         control_message
314             .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)
315             .await?;
316         log::info!("[ S->C ] [set set_peer_bandwidth]",);
317         control_message
318             .write_set_peer_bandwidth(
319                 define::PEER_BANDWIDTH,
320                 define::peer_bandwidth_limit_type::DYNAMIC,
321             )
322             .await?;
323 
324         let obj_encoding = command_obj.get("objectEncoding");
325         let encoding = match obj_encoding {
326             Some(Amf0ValueType::Number(encoding)) => encoding,
327             _ => &define::OBJENCODING_AMF0,
328         };
329 
330         let app_name = command_obj.get("app");
331         self.app_name = match app_name {
332             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
333             _ => {
334                 return Err(SessionError {
335                     value: SessionErrorValue::NoAppName,
336                 });
337             }
338         };
339 
340         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
341         netconnection
342             .write_connect_response(
343                 &transaction_id,
344                 &define::FMSVER.to_string(),
345                 &define::CAPABILITIES,
346                 &String::from("NetConnection.Connect.Success"),
347                 &define::LEVEL.to_string(),
348                 &String::from("Connection Succeeded."),
349                 encoding,
350             )
351             .await?;
352 
353         Ok(())
354     }
355 
356     pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
357         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
358         netconnection
359             .write_create_stream_response(transaction_id, &define::STREAM_ID)
360             .await?;
361 
362         log::info!(
363             "[ S->C ] [create_stream_response]  app_name: {}",
364             self.app_name,
365         );
366 
367         Ok(())
368     }
369 
370     pub async fn on_delete_stream(
371         &mut self,
372         transaction_id: &f64,
373         stream_id: &f64,
374     ) -> Result<(), SessionError> {
375         self.common
376             .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone())
377             .await?;
378 
379         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
380         netstream
381             .write_on_status(
382                 transaction_id,
383                 &"status".to_string(),
384                 &"NetStream.DeleteStream.Suceess".to_string(),
385                 &"".to_string(),
386             )
387             .await?;
388 
389         //self.unsubscribe_from_channels().await?;
390         log::info!(
391             "[ S->C ] [delete stream success]  app_name: {}, stream_name: {}",
392             self.app_name,
393             self.stream_name
394         );
395         log::trace!("{}", stream_id);
396 
397         Ok(())
398     }
399     pub async fn on_play(
400         &mut self,
401         transaction_id: &f64,
402         stream_id: &u32,
403         other_values: &mut Vec<Amf0ValueType>,
404     ) -> Result<(), SessionError> {
405         let length = other_values.len() as u8;
406         let mut index: u8 = 0;
407 
408         let mut stream_name: Option<String> = None;
409         let mut start: Option<f64> = None;
410         let mut duration: Option<f64> = None;
411         let mut reset: Option<bool> = None;
412 
413         loop {
414             if index >= length {
415                 break;
416             }
417             index = index + 1;
418             stream_name = match other_values.remove(0) {
419                 Amf0ValueType::UTF8String(val) => Some(val),
420                 _ => None,
421             };
422 
423             if index >= length {
424                 break;
425             }
426             index = index + 1;
427             start = match other_values.remove(0) {
428                 Amf0ValueType::Number(val) => Some(val),
429                 _ => None,
430             };
431 
432             if index >= length {
433                 break;
434             }
435             index = index + 1;
436             duration = match other_values.remove(0) {
437                 Amf0ValueType::Number(val) => Some(val),
438                 _ => None,
439             };
440 
441             if index >= length {
442                 break;
443             }
444             //index = index + 1;
445             reset = match other_values.remove(0) {
446                 Amf0ValueType::Boolean(val) => Some(val),
447                 _ => None,
448             };
449             break;
450         }
451 
452         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
453         event_messages.write_stream_begin(stream_id.clone()).await?;
454         log::info!(
455             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}",
456             self.app_name,
457             self.stream_name
458         );
459         log::trace!(
460             "{} {} {}",
461             start.is_some(),
462             duration.is_some(),
463             reset.is_some()
464         );
465 
466         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
467         netstream
468             .write_on_status(
469                 transaction_id,
470                 &"status".to_string(),
471                 &"NetStream.Play.Reset".to_string(),
472                 &"reset".to_string(),
473             )
474             .await?;
475 
476         netstream
477             .write_on_status(
478                 transaction_id,
479                 &"status".to_string(),
480                 &"NetStream.Play.Start".to_string(),
481                 &"play start".to_string(),
482             )
483             .await?;
484 
485         netstream
486             .write_on_status(
487                 transaction_id,
488                 &"status".to_string(),
489                 &"NetStream.Data.Start".to_string(),
490                 &"data start.".to_string(),
491             )
492             .await?;
493 
494         netstream
495             .write_on_status(
496                 transaction_id,
497                 &"status".to_string(),
498                 &"NetStream.Play.PublishNotify".to_string(),
499                 &"play publish notify.".to_string(),
500             )
501             .await?;
502 
503         event_messages
504             .write_stream_is_record(stream_id.clone())
505             .await?;
506         log::info!(
507             "[ S->C ] [stream is record]  app_name: {}, stream_name: {}",
508             self.app_name,
509             self.stream_name
510         );
511         self.stream_name = stream_name.clone().unwrap();
512         self.common
513             .subscribe_from_channels(
514                 self.app_name.clone(),
515                 stream_name.unwrap(),
516                 self.subscriber_id,
517             )
518             .await?;
519 
520         self.state = ServerSessionState::Play;
521 
522         Ok(())
523     }
524 
525     pub async fn on_publish(
526         &mut self,
527         transaction_id: &f64,
528         stream_id: &u32,
529         other_values: &mut Vec<Amf0ValueType>,
530     ) -> Result<(), SessionError> {
531         let length = other_values.len();
532 
533         if length < 2 {
534             return Err(SessionError {
535                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
536             });
537         }
538 
539         let stream_name = match other_values.remove(0) {
540             Amf0ValueType::UTF8String(val) => val,
541             _ => {
542                 return Err(SessionError {
543                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
544                 });
545             }
546         };
547 
548         self.stream_name = stream_name;
549 
550         let _ = match other_values.remove(0) {
551             Amf0ValueType::UTF8String(val) => val,
552             _ => {
553                 return Err(SessionError {
554                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
555                 });
556             }
557         };
558 
559         log::info!(
560             "[ S<-C ] [publish]  app_name: {}, stream_name: {}",
561             self.app_name,
562             self.stream_name
563         );
564 
565         log::info!(
566             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}",
567             self.app_name,
568             self.stream_name
569         );
570 
571         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
572         event_messages.write_stream_begin(stream_id.clone()).await?;
573 
574         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
575         netstream
576             .write_on_status(
577                 transaction_id,
578                 &"status".to_string(),
579                 &"NetStream.Publish.Start".to_string(),
580                 &"".to_string(),
581             )
582             .await?;
583         log::info!(
584             "[ S->C ] [NetStream.Publish.Start]  app_name: {}, stream_name: {}",
585             self.app_name,
586             self.stream_name
587         );
588 
589         self.common
590             .publish_to_channels(self.app_name.clone(), self.stream_name.clone())
591             .await?;
592 
593         Ok(())
594     }
595 }
596