1 use {
2     super::{
3         common::Common,
4         define,
5         define::SessionType,
6         errors::{SessionError, SessionErrorValue},
7     },
8     crate::{
9         amf0::Amf0ValueType,
10         channels::define::ChannelEventProducer,
11         chunk::{
12             define::CHUNK_SIZE,
13             unpacketizer::{ChunkUnpacketizer, UnpackResult},
14         },
15         config, handshake,
16         handshake::{define::ServerHandshakeState, handshake_server::HandshakeServer},
17         messages::{define::RtmpMessageData, parser::MessageParser},
18         netconnection::writer::{ConnectProperties, NetConnection},
19         netstream::writer::NetStreamWriter,
20         protocol_control_messages::writer::ProtocolControlMessagesWriter,
21         user_control_messages::writer::EventMessagesWriter,
22     },
23     bytes::BytesMut,
24     bytesio::{bytes_writer::AsyncBytesWriter, bytesio::BytesIO},
25     std::{collections::HashMap, sync::Arc, time::Duration},
26     tokio::{net::TcpStream, sync::Mutex},
27     uuid::Uuid,
28 };
29 
30 enum ServerSessionState {
31     Handshake,
32     ReadChunk,
33     // OnConnect,
34     // OnCreateStream,
35     //Publish,
36     DeleteStream,
37     Play,
38 }
39 
40 pub struct ServerSession {
41     pub app_name: String,
42     pub stream_name: String,
43     io: Arc<Mutex<BytesIO>>,
44     handshaker: HandshakeServer,
45     unpacketizer: ChunkUnpacketizer,
46     state: ServerSessionState,
47     pub common: Common,
48     bytesio_data: BytesMut,
49     has_remaing_data: bool,
50     /* Used to mark the subscriber's the data producer
51     in channels and delete it from map when unsubscribe
52     is called. */
53     pub session_id: Uuid,
54     connect_properties: ConnectProperties,
55 }
56 
57 impl ServerSession {
58     pub fn new(stream: TcpStream, event_producer: ChannelEventProducer) -> Self {
59         let remote_addr = if let Ok(addr) = stream.peer_addr() {
60             log::info!("server session: {}", addr.to_string());
61             Some(addr)
62         } else {
63             None
64         };
65 
66         let net_io = Arc::new(Mutex::new(BytesIO::new(stream)));
67         let subscriber_id = Uuid::new_v4();
68         Self {
69             app_name: String::from(""),
70             stream_name: String::from(""),
71             io: Arc::clone(&net_io),
72             handshaker: HandshakeServer::new(Arc::clone(&net_io)),
73             unpacketizer: ChunkUnpacketizer::new(),
74             state: ServerSessionState::Handshake,
75             common: Common::new(
76                 Arc::clone(&net_io),
77                 event_producer,
78                 SessionType::Server,
79                 remote_addr,
80             ),
81             session_id: subscriber_id,
82             bytesio_data: BytesMut::new(),
83             has_remaing_data: false,
84             connect_properties: ConnectProperties::default(),
85         }
86     }
87 
88     pub async fn run(&mut self) -> Result<(), SessionError> {
89         loop {
90             match self.state {
91                 ServerSessionState::Handshake => {
92                     self.handshake().await?;
93                 }
94                 ServerSessionState::ReadChunk => {
95                     self.read_parse_chunks().await?;
96                 }
97                 ServerSessionState::Play => {
98                     self.play().await?;
99                 }
100                 ServerSessionState::DeleteStream => {
101                     return Ok(());
102                 }
103             }
104         }
105 
106         //Ok(())
107     }
108 
109     async fn handshake(&mut self) -> Result<(), SessionError> {
110         let mut bytes_len = 0;
111 
112         while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE {
113             self.bytesio_data = self.io.lock().await.read().await?;
114             bytes_len += self.bytesio_data.len();
115             self.handshaker.extend_data(&self.bytesio_data[..]);
116         }
117 
118         self.handshaker.handshake().await?;
119 
120         if let ServerHandshakeState::Finish = self.handshaker.state() {
121             self.state = ServerSessionState::ReadChunk;
122             let left_bytes = self.handshaker.get_remaining_bytes();
123             if !left_bytes.is_empty() {
124                 self.unpacketizer.extend_data(&left_bytes[..]);
125                 self.has_remaing_data = true;
126             }
127             log::info!("[ S->C ] [send_set_chunk_size] ");
128             self.send_set_chunk_size().await?;
129             return Ok(());
130         }
131 
132         Ok(())
133     }
134 
135     async fn read_parse_chunks(&mut self) -> Result<(), SessionError> {
136         if !self.has_remaing_data {
137             match self
138                 .io
139                 .lock()
140                 .await
141                 .read_timeout(Duration::from_secs(2))
142                 .await
143             {
144                 Ok(data) => {
145                     self.bytesio_data = data;
146                 }
147                 Err(err) => {
148                     self.common
149                         .unpublish_to_channels(
150                             self.app_name.clone(),
151                             self.stream_name.clone(),
152                             self.session_id,
153                         )
154                         .await?;
155 
156                     return Err(SessionError {
157                         value: SessionErrorValue::BytesIOError(err),
158                     });
159                 }
160             }
161 
162             self.unpacketizer.extend_data(&self.bytesio_data[..]);
163         }
164 
165         self.has_remaing_data = false;
166 
167         loop {
168             let result = self.unpacketizer.read_chunks();
169 
170             if let Ok(rv) = result {
171                 if let UnpackResult::Chunks(chunks) = rv {
172                     for chunk_info in chunks {
173                         let timestamp = chunk_info.message_header.timestamp;
174                         let msg_stream_id = chunk_info.message_header.msg_streamd_id;
175 
176                         let mut msg = MessageParser::new(chunk_info).parse()?;
177                         self.process_messages(&mut msg, &msg_stream_id, &timestamp)
178                             .await?;
179                     }
180                 }
181             } else {
182                 break;
183             }
184         }
185         Ok(())
186     }
187 
188     async fn play(&mut self) -> Result<(), SessionError> {
189         match self.common.send_channel_data().await {
190             Ok(_) => {}
191             Err(err) => {
192                 self.common
193                     .unsubscribe_from_channels(
194                         self.app_name.clone(),
195                         self.stream_name.clone(),
196                         self.session_id,
197                     )
198                     .await?;
199                 return Err(err);
200             }
201         }
202 
203         Ok(())
204     }
205 
206     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
207         let mut controlmessage =
208             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
209         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
210 
211         Ok(())
212     }
213 
214     pub async fn process_messages(
215         &mut self,
216         rtmp_msg: &mut RtmpMessageData,
217         msg_stream_id: &u32,
218         timestamp: &u32,
219     ) -> Result<(), SessionError> {
220         match rtmp_msg {
221             RtmpMessageData::Amf0Command {
222                 command_name,
223                 transaction_id,
224                 command_object,
225                 others,
226             } => {
227                 self.on_amf0_command_message(
228                     msg_stream_id,
229                     command_name,
230                     transaction_id,
231                     command_object,
232                     others,
233                 )
234                 .await?
235             }
236             RtmpMessageData::SetChunkSize { chunk_size } => {
237                 self.on_set_chunk_size(*chunk_size as usize)?;
238             }
239             RtmpMessageData::AudioData { data } => {
240                 self.common.on_audio_data(data, timestamp)?;
241             }
242             RtmpMessageData::VideoData { data } => {
243                 self.common.on_video_data(data, timestamp)?;
244             }
245             RtmpMessageData::AmfData { raw_data } => {
246                 self.common.on_meta_data(raw_data, timestamp)?;
247             }
248 
249             _ => {}
250         }
251         Ok(())
252     }
253 
254     pub async fn on_amf0_command_message(
255         &mut self,
256         stream_id: &u32,
257         command_name: &Amf0ValueType,
258         transaction_id: &Amf0ValueType,
259         command_object: &Amf0ValueType,
260         others: &mut Vec<Amf0ValueType>,
261     ) -> Result<(), SessionError> {
262         let empty_cmd_name = &String::new();
263         let cmd_name = match command_name {
264             Amf0ValueType::UTF8String(str) => str,
265             _ => empty_cmd_name,
266         };
267 
268         let transaction_id = match transaction_id {
269             Amf0ValueType::Number(number) => number,
270             _ => &0.0,
271         };
272 
273         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
274         let obj = match command_object {
275             Amf0ValueType::Object(obj) => obj,
276             _ => &empty_cmd_obj,
277         };
278 
279         match cmd_name.as_str() {
280             "connect" => {
281                 log::info!("[ S<-C ] [connect] ");
282                 self.on_connect(transaction_id, obj).await?;
283             }
284             "createStream" => {
285                 log::info!("[ S<-C ] [create stream] ");
286                 self.on_create_stream(transaction_id).await?;
287             }
288             "deleteStream" => {
289                 if !others.is_empty() {
290                     let stream_id = match others.pop() {
291                         Some(Amf0ValueType::Number(streamid)) => streamid,
292                         _ => 0.0,
293                     };
294 
295                     log::info!(
296                         "[ S<-C ] [delete stream] app_name: {}, stream_name: {}",
297                         self.app_name,
298                         self.stream_name
299                     );
300 
301                     self.on_delete_stream(transaction_id, &stream_id).await?;
302                     self.state = ServerSessionState::DeleteStream;
303                 }
304             }
305             "play" => {
306                 log::info!(
307                     "[ S<-C ] [play]  app_name: {}, stream_name: {}",
308                     self.app_name,
309                     self.stream_name
310                 );
311                 self.unpacketizer.session_type = config::SERVER_PULL;
312                 self.on_play(transaction_id, stream_id, others).await?;
313             }
314             "publish" => {
315                 self.unpacketizer.session_type = config::SERVER_PUSH;
316                 self.on_publish(transaction_id, stream_id, others).await?;
317             }
318             _ => {}
319         }
320 
321         Ok(())
322     }
323 
324     fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> {
325         log::info!(
326             "[ S<-C ] [set chunk size]  app_name: {}, stream_name: {}, chunk size: {}",
327             self.app_name,
328             self.stream_name,
329             chunk_size
330         );
331         self.unpacketizer.update_max_chunk_size(chunk_size);
332         Ok(())
333     }
334 
335     fn parse_connect_properties(&mut self, command_obj: &HashMap<String, Amf0ValueType>) {
336         for (property, value) in command_obj {
337             match property.as_str() {
338                 "app" => {
339                     if let Amf0ValueType::UTF8String(app) = value {
340                         self.connect_properties.app = Some(app.clone());
341                     }
342                 }
343                 "flashVer" => {
344                     if let Amf0ValueType::UTF8String(flash_ver) = value {
345                         self.connect_properties.flash_ver = Some(flash_ver.clone());
346                     }
347                 }
348                 "swfUrl" => {
349                     if let Amf0ValueType::UTF8String(swf_url) = value {
350                         self.connect_properties.swf_url = Some(swf_url.clone());
351                     }
352                 }
353                 "tcUrl" => {
354                     if let Amf0ValueType::UTF8String(tc_url) = value {
355                         self.connect_properties.tc_url = Some(tc_url.clone());
356                     }
357                 }
358                 "fpad" => {
359                     if let Amf0ValueType::Boolean(fpad) = value {
360                         self.connect_properties.fpad = Some(*fpad);
361                     }
362                 }
363                 "audioCodecs" => {
364                     if let Amf0ValueType::Number(audio_codecs) = value {
365                         self.connect_properties.audio_codecs = Some(*audio_codecs);
366                     }
367                 }
368                 "videoCodecs" => {
369                     if let Amf0ValueType::Number(video_codecs) = value {
370                         self.connect_properties.video_codecs = Some(*video_codecs);
371                     }
372                 }
373                 "videoFunction" => {
374                     if let Amf0ValueType::Number(video_function) = value {
375                         self.connect_properties.video_function = Some(*video_function);
376                     }
377                 }
378                 "pageUrl" => {
379                     if let Amf0ValueType::UTF8String(page_url) = value {
380                         self.connect_properties.page_url = Some(page_url.clone());
381                     }
382                 }
383                 "objectEncoding" => {
384                     if let Amf0ValueType::Number(object_encoding) = value {
385                         self.connect_properties.object_encoding = Some(*object_encoding);
386                     }
387                 }
388                 _ => {
389                     log::warn!("unknown connect properties: {}:{:?}", property, value);
390                 }
391             }
392         }
393     }
394 
395     async fn on_connect(
396         &mut self,
397         transaction_id: &f64,
398         command_obj: &HashMap<String, Amf0ValueType>,
399     ) -> Result<(), SessionError> {
400         self.parse_connect_properties(command_obj);
401         log::info!("connect properties: {:?}", self.connect_properties);
402         let mut control_message =
403             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
404         log::info!("[ S->C ] [set window_acknowledgement_size]");
405         control_message
406             .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)
407             .await?;
408 
409         log::info!("[ S->C ] [set set_peer_bandwidth]",);
410         control_message
411             .write_set_peer_bandwidth(
412                 define::PEER_BANDWIDTH,
413                 define::peer_bandwidth_limit_type::DYNAMIC,
414             )
415             .await?;
416 
417         let obj_encoding = command_obj.get("objectEncoding");
418         let encoding = match obj_encoding {
419             Some(Amf0ValueType::Number(encoding)) => encoding,
420             _ => &define::OBJENCODING_AMF0,
421         };
422 
423         let app_name = command_obj.get("app");
424         self.app_name = match app_name {
425             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
426             _ => {
427                 return Err(SessionError {
428                     value: SessionErrorValue::NoAppName,
429                 });
430             }
431         };
432 
433         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
434         log::info!("[ S->C ] [set connect_response]",);
435         netconnection
436             .write_connect_response(
437                 transaction_id,
438                 define::FMSVER,
439                 &define::CAPABILITIES,
440                 &String::from("NetConnection.Connect.Success"),
441                 define::LEVEL,
442                 &String::from("Connection Succeeded."),
443                 encoding,
444             )
445             .await?;
446 
447         Ok(())
448     }
449 
450     pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
451         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
452         netconnection
453             .write_create_stream_response(transaction_id, &define::STREAM_ID)
454             .await?;
455 
456         log::info!(
457             "[ S->C ] [create_stream_response]  app_name: {}",
458             self.app_name,
459         );
460 
461         Ok(())
462     }
463 
464     pub async fn on_delete_stream(
465         &mut self,
466         transaction_id: &f64,
467         stream_id: &f64,
468     ) -> Result<(), SessionError> {
469         self.common
470             .unpublish_to_channels(
471                 self.app_name.clone(),
472                 self.stream_name.clone(),
473                 self.session_id,
474             )
475             .await?;
476 
477         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
478         netstream
479             .write_on_status(
480                 transaction_id,
481                 "status",
482                 "NetStream.DeleteStream.Suceess",
483                 "",
484             )
485             .await?;
486 
487         //self.unsubscribe_from_channels().await?;
488         log::info!(
489             "[ S->C ] [delete stream success]  app_name: {}, stream_name: {}",
490             self.app_name,
491             self.stream_name
492         );
493         log::trace!("{}", stream_id);
494 
495         Ok(())
496     }
497 
498     fn get_request_url(&mut self) -> String {
499         if let Some(tc_url) = &self.connect_properties.tc_url {
500             format!("{}/{}", tc_url, self.stream_name.clone())
501         } else {
502             format!("{}/{}", self.app_name.clone(), self.stream_name.clone())
503         }
504     }
505 
506     #[allow(clippy::never_loop)]
507     pub async fn on_play(
508         &mut self,
509         transaction_id: &f64,
510         stream_id: &u32,
511         other_values: &mut Vec<Amf0ValueType>,
512     ) -> Result<(), SessionError> {
513         let length = other_values.len() as u8;
514         let mut index: u8 = 0;
515 
516         let mut stream_name: Option<String> = None;
517         let mut start: Option<f64> = None;
518         let mut duration: Option<f64> = None;
519         let mut reset: Option<bool> = None;
520 
521         loop {
522             if index >= length {
523                 break;
524             }
525             index += 1;
526             stream_name = match other_values.remove(0) {
527                 Amf0ValueType::UTF8String(val) => Some(val),
528                 _ => None,
529             };
530 
531             if index >= length {
532                 break;
533             }
534             index += 1;
535             start = match other_values.remove(0) {
536                 Amf0ValueType::Number(val) => Some(val),
537                 _ => None,
538             };
539 
540             if index >= length {
541                 break;
542             }
543             index += 1;
544             duration = match other_values.remove(0) {
545                 Amf0ValueType::Number(val) => Some(val),
546                 _ => None,
547             };
548 
549             if index >= length {
550                 break;
551             }
552             //index = index + 1;
553             reset = match other_values.remove(0) {
554                 Amf0ValueType::Boolean(val) => Some(val),
555                 _ => None,
556             };
557             break;
558         }
559 
560         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
561         event_messages.write_stream_begin(*stream_id).await?;
562         log::info!(
563             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}",
564             self.app_name,
565             self.stream_name
566         );
567         log::trace!(
568             "{} {} {}",
569             start.is_some(),
570             duration.is_some(),
571             reset.is_some()
572         );
573 
574         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
575         netstream
576             .write_on_status(transaction_id, "status", "NetStream.Play.Reset", "reset")
577             .await?;
578 
579         netstream
580             .write_on_status(
581                 transaction_id,
582                 "status",
583                 "NetStream.Play.Start",
584                 "play start",
585             )
586             .await?;
587 
588         netstream
589             .write_on_status(
590                 transaction_id,
591                 "status",
592                 "NetStream.Data.Start",
593                 "data start.",
594             )
595             .await?;
596 
597         netstream
598             .write_on_status(
599                 transaction_id,
600                 "status",
601                 "NetStream.Play.PublishNotify",
602                 "play publish notify.",
603             )
604             .await?;
605 
606         event_messages.write_stream_is_record(*stream_id).await?;
607         log::info!(
608             "[ S->C ] [stream is record]  app_name: {}, stream_name: {}",
609             self.app_name,
610             self.stream_name
611         );
612         self.stream_name = stream_name.clone().unwrap();
613         /*Now it can update the request url*/
614         self.common.request_url = self.get_request_url();
615         self.common
616             .subscribe_from_channels(self.app_name.clone(), stream_name.unwrap(), self.session_id)
617             .await?;
618 
619         self.state = ServerSessionState::Play;
620 
621         Ok(())
622     }
623 
624     pub async fn on_publish(
625         &mut self,
626         transaction_id: &f64,
627         stream_id: &u32,
628         other_values: &mut Vec<Amf0ValueType>,
629     ) -> Result<(), SessionError> {
630         let length = other_values.len();
631 
632         if length < 2 {
633             return Err(SessionError {
634                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
635             });
636         }
637 
638         let stream_name = match other_values.remove(0) {
639             Amf0ValueType::UTF8String(val) => val,
640             _ => {
641                 return Err(SessionError {
642                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
643                 });
644             }
645         };
646 
647         self.stream_name = stream_name;
648         /*Now it can update the request url*/
649         self.common.request_url = self.get_request_url();
650 
651         let _ = match other_values.remove(0) {
652             Amf0ValueType::UTF8String(val) => val,
653             _ => {
654                 return Err(SessionError {
655                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
656                 });
657             }
658         };
659 
660         log::info!(
661             "[ S<-C ] [publish]  app_name: {}, stream_name: {}",
662             self.app_name,
663             self.stream_name
664         );
665 
666         log::info!(
667             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}",
668             self.app_name,
669             self.stream_name
670         );
671 
672         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
673         event_messages.write_stream_begin(*stream_id).await?;
674 
675         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
676         netstream
677             .write_on_status(transaction_id, "status", "NetStream.Publish.Start", "")
678             .await?;
679         log::info!(
680             "[ S->C ] [NetStream.Publish.Start]  app_name: {}, stream_name: {}",
681             self.app_name,
682             self.stream_name
683         );
684 
685         self.common
686             .publish_to_channels(
687                 self.app_name.clone(),
688                 self.stream_name.clone(),
689                 self.session_id,
690             )
691             .await?;
692 
693         Ok(())
694     }
695 }
696