1 use {
2     super::{
3         common::Common,
4         define,
5         define::SessionType,
6         errors::{SessionError, SessionErrorValue},
7     },
8     crate::{
9         amf0::Amf0ValueType,
10         channels::define::ChannelEventProducer,
11         chunk::{
12             define::CHUNK_SIZE,
13             unpacketizer::{ChunkUnpacketizer, UnpackResult},
14         },
15         config, handshake,
16         handshake::{define::ServerHandshakeState, handshake_server::HandshakeServer},
17         messages::{define::RtmpMessageData, parser::MessageParser},
18         netconnection::writer::NetConnection,
19         netstream::writer::NetStreamWriter,
20         protocol_control_messages::writer::ProtocolControlMessagesWriter,
21         user_control_messages::writer::EventMessagesWriter,
22     },
23     bytes::BytesMut,
24     bytesio::{bytes_writer::AsyncBytesWriter, bytesio::BytesIO},
25     std::{collections::HashMap, sync::Arc, time::Duration},
26     tokio::{net::TcpStream, sync::Mutex},
27     uuid::Uuid,
28 };
29 
30 enum ServerSessionState {
31     Handshake,
32     ReadChunk,
33     // OnConnect,
34     // OnCreateStream,
35     //Publish,
36     Play,
37 }
38 
39 pub struct ServerSession {
40     pub app_name: String,
41     pub stream_name: String,
42 
43     io: Arc<Mutex<BytesIO>>,
44     handshaker: HandshakeServer,
45 
46     unpacketizer: ChunkUnpacketizer,
47 
48     state: ServerSessionState,
49 
50     pub common: Common,
51 
52     bytesio_data: BytesMut,
53     has_remaing_data: bool,
54 
55     /* Used to mark the subscriber's the data producer
56     in channels and delete it from map when unsubscribe
57     is called. */
58     pub subscriber_id: Uuid,
59 
60     connect_command_object: Option<HashMap<String, Amf0ValueType>>,
61 }
62 
63 impl ServerSession {
64     pub fn new(stream: TcpStream, event_producer: ChannelEventProducer) -> Self {
65         let net_io = Arc::new(Mutex::new(BytesIO::new(stream)));
66         let subscriber_id = Uuid::new_v4();
67         Self {
68             app_name: String::from(""),
69             stream_name: String::from(""),
70 
71             io: Arc::clone(&net_io),
72             handshaker: HandshakeServer::new(Arc::clone(&net_io)),
73 
74             unpacketizer: ChunkUnpacketizer::new(),
75 
76             state: ServerSessionState::Handshake,
77 
78             common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Server),
79 
80             subscriber_id,
81             bytesio_data: BytesMut::new(),
82             has_remaing_data: false,
83 
84             connect_command_object: None,
85         }
86     }
87 
88     pub async fn run(&mut self) -> Result<(), SessionError> {
89         loop {
90             match self.state {
91                 ServerSessionState::Handshake => {
92                     self.handshake().await?;
93                 }
94                 ServerSessionState::ReadChunk => {
95                     self.read_parse_chunks().await?;
96                 }
97                 ServerSessionState::Play => {
98                     self.play().await?;
99                 }
100             }
101         }
102 
103         //Ok(())
104     }
105 
106     async fn handshake(&mut self) -> Result<(), SessionError> {
107         let mut bytes_len = 0;
108 
109         while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE {
110             self.bytesio_data = self.io.lock().await.read().await?;
111             bytes_len += self.bytesio_data.len();
112             self.handshaker.extend_data(&self.bytesio_data[..]);
113         }
114 
115         self.handshaker.handshake().await?;
116 
117         match self.handshaker.state() {
118             ServerHandshakeState::Finish => {
119                 self.state = ServerSessionState::ReadChunk;
120 
121                 let left_bytes = self.handshaker.get_remaining_bytes();
122                 if left_bytes.len() > 0 {
123                     self.unpacketizer.extend_data(&left_bytes[..]);
124                     self.has_remaing_data = true;
125                 }
126                 log::info!("[ S->C ] [send_set_chunk_size] ");
127                 self.send_set_chunk_size().await?;
128                 return Ok(());
129             }
130             _ => {}
131         }
132 
133         Ok(())
134     }
135 
136     async fn read_parse_chunks(&mut self) -> Result<(), SessionError> {
137         if !self.has_remaing_data {
138             match self
139                 .io
140                 .lock()
141                 .await
142                 .read_timeout(Duration::from_secs(2))
143                 .await
144             {
145                 Ok(data) => {
146                     self.bytesio_data = data;
147                 }
148                 Err(err) => {
149                     self.common
150                         .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone())
151                         .await?;
152 
153                     return Err(SessionError {
154                         value: SessionErrorValue::BytesIOError(err),
155                     });
156                 }
157             }
158 
159             self.unpacketizer.extend_data(&self.bytesio_data[..]);
160         }
161 
162         self.has_remaing_data = false;
163 
164         loop {
165             let result = self.unpacketizer.read_chunks();
166 
167             if let Ok(rv) = result {
168                 match rv {
169                     UnpackResult::Chunks(chunks) => {
170                         for chunk_info in chunks {
171                             let timestamp = chunk_info.message_header.timestamp;
172                             let msg_stream_id = chunk_info.message_header.msg_streamd_id;
173 
174                             let mut msg = MessageParser::new(chunk_info).parse()?;
175                             self.process_messages(&mut msg, &msg_stream_id, &timestamp)
176                                 .await?;
177                         }
178                     }
179                     _ => {}
180                 }
181             } else {
182                 break;
183             }
184         }
185         Ok(())
186     }
187 
188     async fn play(&mut self) -> Result<(), SessionError> {
189         match self.common.send_channel_data().await {
190             Ok(_) => {}
191 
192             Err(err) => {
193                 self.common
194                     .unsubscribe_from_channels(
195                         self.app_name.clone(),
196                         self.stream_name.clone(),
197                         self.subscriber_id,
198                     )
199                     .await?;
200                 return Err(err);
201             }
202         }
203 
204         Ok(())
205     }
206 
207     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
208         let mut controlmessage =
209             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
210         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
211 
212         Ok(())
213     }
214 
215     pub async fn process_messages(
216         &mut self,
217         rtmp_msg: &mut RtmpMessageData,
218         msg_stream_id: &u32,
219         timestamp: &u32,
220     ) -> Result<(), SessionError> {
221         match rtmp_msg {
222             RtmpMessageData::Amf0Command {
223                 command_name,
224                 transaction_id,
225                 command_object,
226                 others,
227             } => {
228                 self.on_amf0_command_message(
229                     msg_stream_id,
230                     command_name,
231                     transaction_id,
232                     command_object,
233                     others,
234                 )
235                 .await?
236             }
237             RtmpMessageData::SetChunkSize { chunk_size } => {
238                 self.on_set_chunk_size(chunk_size.clone() as usize)?;
239             }
240             RtmpMessageData::AudioData { data } => {
241                 self.common.on_audio_data(data, timestamp)?;
242             }
243             RtmpMessageData::VideoData { data } => {
244                 self.common.on_video_data(data, timestamp)?;
245             }
246             RtmpMessageData::AmfData { raw_data } => {
247                 self.common.on_meta_data(raw_data, timestamp)?;
248             }
249 
250             _ => {}
251         }
252         Ok(())
253     }
254 
255     pub async fn on_amf0_command_message(
256         &mut self,
257         stream_id: &u32,
258         command_name: &Amf0ValueType,
259         transaction_id: &Amf0ValueType,
260         command_object: &Amf0ValueType,
261         others: &mut Vec<Amf0ValueType>,
262     ) -> Result<(), SessionError> {
263         let empty_cmd_name = &String::new();
264         let cmd_name = match command_name {
265             Amf0ValueType::UTF8String(str) => str,
266             _ => empty_cmd_name,
267         };
268 
269         let transaction_id = match transaction_id {
270             Amf0ValueType::Number(number) => number,
271             _ => &0.0,
272         };
273 
274         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
275         let obj = match command_object {
276             Amf0ValueType::Object(obj) => obj,
277             _ => &empty_cmd_obj,
278         };
279 
280         match cmd_name.as_str() {
281             "connect" => {
282                 log::info!("[ S<-C ] [connect] ");
283                 self.on_connect(&transaction_id, &obj).await?;
284             }
285             "createStream" => {
286                 log::info!("[ S<-C ] [create stream] ");
287                 self.on_create_stream(transaction_id).await?;
288             }
289             "deleteStream" => {
290                 if others.len() > 0 {
291                     let stream_id = match others.pop() {
292                         Some(val) => match val {
293                             Amf0ValueType::Number(streamid) => streamid,
294                             _ => 0.0,
295                         },
296                         _ => 0.0,
297                     };
298                     log::info!(
299                         "[ S<-C ] [delete stream] app_name: {}, stream_name: {}",
300                         self.app_name,
301                         self.stream_name
302                     );
303 
304                     self.on_delete_stream(transaction_id, &stream_id).await?;
305                 }
306             }
307             "play" => {
308                 log::info!(
309                     "[ S<-C ] [play]  app_name: {}, stream_name: {}",
310                     self.app_name,
311                     self.stream_name
312                 );
313                 self.unpacketizer.session_type = config::SERVER_PULL;
314                 self.on_play(transaction_id, stream_id, others).await?;
315             }
316             "publish" => {
317                 self.unpacketizer.session_type = config::SERVER_PUSH;
318                 self.on_publish(transaction_id, stream_id, others).await?;
319             }
320             _ => {}
321         }
322 
323         Ok(())
324     }
325 
326     fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> {
327         log::info!(
328             "[ S<-C ] [set chunk size]  app_name: {}, stream_name: {}, chunk size: {}",
329             self.app_name,
330             self.stream_name,
331             chunk_size
332         );
333         self.unpacketizer.update_max_chunk_size(chunk_size);
334         Ok(())
335     }
336 
337     async fn on_connect(
338         &mut self,
339         transaction_id: &f64,
340         command_obj: &HashMap<String, Amf0ValueType>,
341     ) -> Result<(), SessionError> {
342         self.connect_command_object = Some(command_obj.clone());
343         let mut control_message =
344             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
345         log::info!("[ S->C ] [set window_acknowledgement_size]");
346         control_message
347             .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)
348             .await?;
349 
350         log::info!("[ S->C ] [set set_peer_bandwidth]",);
351         control_message
352             .write_set_peer_bandwidth(
353                 define::PEER_BANDWIDTH,
354                 define::peer_bandwidth_limit_type::DYNAMIC,
355             )
356             .await?;
357 
358         let obj_encoding = command_obj.get("objectEncoding");
359         let encoding = match obj_encoding {
360             Some(Amf0ValueType::Number(encoding)) => encoding,
361             _ => &define::OBJENCODING_AMF0,
362         };
363 
364         let app_name = command_obj.get("app");
365         self.app_name = match app_name {
366             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
367             _ => {
368                 return Err(SessionError {
369                     value: SessionErrorValue::NoAppName,
370                 });
371             }
372         };
373 
374         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
375         log::info!("[ S->C ] [set connect_response]",);
376         netconnection
377             .write_connect_response(
378                 &transaction_id,
379                 &define::FMSVER.to_string(),
380                 &define::CAPABILITIES,
381                 &String::from("NetConnection.Connect.Success"),
382                 &define::LEVEL.to_string(),
383                 &String::from("Connection Succeeded."),
384                 encoding,
385             )
386             .await?;
387 
388         Ok(())
389     }
390 
391     pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
392         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
393         netconnection
394             .write_create_stream_response(transaction_id, &define::STREAM_ID)
395             .await?;
396 
397         log::info!(
398             "[ S->C ] [create_stream_response]  app_name: {}",
399             self.app_name,
400         );
401 
402         Ok(())
403     }
404 
405     pub async fn on_delete_stream(
406         &mut self,
407         transaction_id: &f64,
408         stream_id: &f64,
409     ) -> Result<(), SessionError> {
410         self.common
411             .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone())
412             .await?;
413 
414         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
415         netstream
416             .write_on_status(
417                 transaction_id,
418                 &"status".to_string(),
419                 &"NetStream.DeleteStream.Suceess".to_string(),
420                 &"".to_string(),
421             )
422             .await?;
423 
424         //self.unsubscribe_from_channels().await?;
425         log::info!(
426             "[ S->C ] [delete stream success]  app_name: {}, stream_name: {}",
427             self.app_name,
428             self.stream_name
429         );
430         log::trace!("{}", stream_id);
431 
432         Ok(())
433     }
434     pub async fn on_play(
435         &mut self,
436         transaction_id: &f64,
437         stream_id: &u32,
438         other_values: &mut Vec<Amf0ValueType>,
439     ) -> Result<(), SessionError> {
440         let length = other_values.len() as u8;
441         let mut index: u8 = 0;
442 
443         let mut stream_name: Option<String> = None;
444         let mut start: Option<f64> = None;
445         let mut duration: Option<f64> = None;
446         let mut reset: Option<bool> = None;
447 
448         loop {
449             if index >= length {
450                 break;
451             }
452             index = index + 1;
453             stream_name = match other_values.remove(0) {
454                 Amf0ValueType::UTF8String(val) => Some(val),
455                 _ => None,
456             };
457 
458             if index >= length {
459                 break;
460             }
461             index = index + 1;
462             start = match other_values.remove(0) {
463                 Amf0ValueType::Number(val) => Some(val),
464                 _ => None,
465             };
466 
467             if index >= length {
468                 break;
469             }
470             index = index + 1;
471             duration = match other_values.remove(0) {
472                 Amf0ValueType::Number(val) => Some(val),
473                 _ => None,
474             };
475 
476             if index >= length {
477                 break;
478             }
479             //index = index + 1;
480             reset = match other_values.remove(0) {
481                 Amf0ValueType::Boolean(val) => Some(val),
482                 _ => None,
483             };
484             break;
485         }
486 
487         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
488         event_messages.write_stream_begin(stream_id.clone()).await?;
489         log::info!(
490             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}",
491             self.app_name,
492             self.stream_name
493         );
494         log::trace!(
495             "{} {} {}",
496             start.is_some(),
497             duration.is_some(),
498             reset.is_some()
499         );
500 
501         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
502         netstream
503             .write_on_status(
504                 transaction_id,
505                 &"status".to_string(),
506                 &"NetStream.Play.Reset".to_string(),
507                 &"reset".to_string(),
508             )
509             .await?;
510 
511         netstream
512             .write_on_status(
513                 transaction_id,
514                 &"status".to_string(),
515                 &"NetStream.Play.Start".to_string(),
516                 &"play start".to_string(),
517             )
518             .await?;
519 
520         netstream
521             .write_on_status(
522                 transaction_id,
523                 &"status".to_string(),
524                 &"NetStream.Data.Start".to_string(),
525                 &"data start.".to_string(),
526             )
527             .await?;
528 
529         netstream
530             .write_on_status(
531                 transaction_id,
532                 &"status".to_string(),
533                 &"NetStream.Play.PublishNotify".to_string(),
534                 &"play publish notify.".to_string(),
535             )
536             .await?;
537 
538         event_messages
539             .write_stream_is_record(stream_id.clone())
540             .await?;
541         log::info!(
542             "[ S->C ] [stream is record]  app_name: {}, stream_name: {}",
543             self.app_name,
544             self.stream_name
545         );
546         self.stream_name = stream_name.clone().unwrap();
547         self.common
548             .subscribe_from_channels(
549                 self.app_name.clone(),
550                 stream_name.unwrap(),
551                 self.subscriber_id,
552             )
553             .await?;
554 
555         self.state = ServerSessionState::Play;
556 
557         Ok(())
558     }
559 
560     pub async fn on_publish(
561         &mut self,
562         transaction_id: &f64,
563         stream_id: &u32,
564         other_values: &mut Vec<Amf0ValueType>,
565     ) -> Result<(), SessionError> {
566         let length = other_values.len();
567 
568         if length < 2 {
569             return Err(SessionError {
570                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
571             });
572         }
573 
574         let stream_name = match other_values.remove(0) {
575             Amf0ValueType::UTF8String(val) => val,
576             _ => {
577                 return Err(SessionError {
578                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
579                 });
580             }
581         };
582 
583         self.stream_name = stream_name;
584 
585         let _ = match other_values.remove(0) {
586             Amf0ValueType::UTF8String(val) => val,
587             _ => {
588                 return Err(SessionError {
589                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
590                 });
591             }
592         };
593 
594         log::info!(
595             "[ S<-C ] [publish]  app_name: {}, stream_name: {}",
596             self.app_name,
597             self.stream_name
598         );
599 
600         log::info!(
601             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}",
602             self.app_name,
603             self.stream_name
604         );
605 
606         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
607         event_messages.write_stream_begin(stream_id.clone()).await?;
608 
609         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
610         netstream
611             .write_on_status(
612                 transaction_id,
613                 &"status".to_string(),
614                 &"NetStream.Publish.Start".to_string(),
615                 &"".to_string(),
616             )
617             .await?;
618         log::info!(
619             "[ S->C ] [NetStream.Publish.Start]  app_name: {}, stream_name: {}",
620             self.app_name,
621             self.stream_name
622         );
623 
624         self.common
625             .publish_to_channels(self.app_name.clone(), self.stream_name.clone())
626             .await?;
627 
628         Ok(())
629     }
630 }
631