1 use crate::chunk::packetizer::ChunkPacketizer;
2 
3 use {
4     super::{
5         common::Common,
6         define,
7         define::SessionType,
8         errors::{SessionError, SessionErrorValue},
9     },
10     crate::{
11         amf0::Amf0ValueType,
12         chunk::{
13             define::CHUNK_SIZE,
14             unpacketizer::{ChunkUnpacketizer, UnpackResult},
15         },
16         config, handshake,
17         handshake::{define::ServerHandshakeState, handshake_server::HandshakeServer},
18         messages::{define::RtmpMessageData, parser::MessageParser},
19         netconnection::writer::{ConnectProperties, NetConnection},
20         netstream::writer::NetStreamWriter,
21         protocol_control_messages::writer::ProtocolControlMessagesWriter,
22         user_control_messages::writer::EventMessagesWriter,
23         utils::RtmpUrlParser,
24     },
25     bytes::BytesMut,
26     bytesio::{
27         bytes_writer::AsyncBytesWriter,
28         bytesio::{TNetIO, TcpIO},
29     },
30     indexmap::IndexMap,
31     std::{sync::Arc, time::Duration},
32     streamhub::{
33         define::StreamHubEventSender,
34         utils::{RandomDigitCount, Uuid},
35     },
36     tokio::{net::TcpStream, sync::Mutex},
37 };
38 
39 enum ServerSessionState {
40     Handshake,
41     ReadChunk,
42     // OnConnect,
43     // OnCreateStream,
44     //Publish,
45     DeleteStream,
46     Play,
47 }
48 
49 pub struct ServerSession {
50     pub app_name: String,
51     pub stream_name: String,
52     pub url_parameters: String,
53     io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>,
54     handshaker: HandshakeServer,
55     unpacketizer: ChunkUnpacketizer,
56     state: ServerSessionState,
57     bytesio_data: BytesMut,
58     has_remaing_data: bool,
59     /* Used to mark the subscriber's the data producer
60     in channels and delete it from map when unsubscribe
61     is called. */
62     pub session_id: Uuid,
63     connect_properties: ConnectProperties,
64     pub common: Common,
65     /*configure how many gops will be cached.*/
66     gop_num: usize,
67 }
68 
69 impl ServerSession {
70     pub fn new(stream: TcpStream, event_producer: StreamHubEventSender, gop_num: usize) -> Self {
71         let remote_addr = if let Ok(addr) = stream.peer_addr() {
72             log::info!("server session: {}", addr.to_string());
73             Some(addr)
74         } else {
75             None
76         };
77 
78         let tcp_io: Box<dyn TNetIO + Send + Sync> = Box::new(TcpIO::new(stream));
79         let net_io = Arc::new(Mutex::new(tcp_io));
80 
81         Self {
82             app_name: String::from(""),
83             stream_name: String::from(""),
84             url_parameters: String::from(""),
85             io: Arc::clone(&net_io),
86             handshaker: HandshakeServer::new(Arc::clone(&net_io)),
87             unpacketizer: ChunkUnpacketizer::new(),
88             state: ServerSessionState::Handshake,
89             common: Common::new(
90                 Some(ChunkPacketizer::new(Arc::clone(&net_io))),
91                 event_producer,
92                 SessionType::Server,
93                 remote_addr,
94             ),
95             session_id: Uuid::new(RandomDigitCount::Four),
96             bytesio_data: BytesMut::new(),
97             has_remaing_data: false,
98             connect_properties: ConnectProperties::default(),
99             gop_num,
100         }
101     }
102 
103     pub async fn run(&mut self) -> Result<(), SessionError> {
104         loop {
105             match self.state {
106                 ServerSessionState::Handshake => {
107                     self.handshake().await?;
108                 }
109                 ServerSessionState::ReadChunk => {
110                     self.read_parse_chunks().await?;
111                 }
112                 ServerSessionState::Play => {
113                     self.play().await?;
114                 }
115                 ServerSessionState::DeleteStream => {
116                     return Ok(());
117                 }
118             }
119         }
120 
121         //Ok(())
122     }
123 
124     async fn handshake(&mut self) -> Result<(), SessionError> {
125         let mut bytes_len = 0;
126 
127         while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE {
128             self.bytesio_data = self.io.lock().await.read().await?;
129             bytes_len += self.bytesio_data.len();
130             self.handshaker.extend_data(&self.bytesio_data[..]);
131         }
132 
133         self.handshaker.handshake().await?;
134 
135         if let ServerHandshakeState::Finish = self.handshaker.state() {
136             self.state = ServerSessionState::ReadChunk;
137             let left_bytes = self.handshaker.get_remaining_bytes();
138             if !left_bytes.is_empty() {
139                 self.unpacketizer.extend_data(&left_bytes[..]);
140                 self.has_remaing_data = true;
141             }
142             log::info!("[ S->C ] [send_set_chunk_size] ");
143             self.send_set_chunk_size().await?;
144             return Ok(());
145         }
146 
147         Ok(())
148     }
149 
150     async fn read_parse_chunks(&mut self) -> Result<(), SessionError> {
151         if !self.has_remaing_data {
152             match self
153                 .io
154                 .lock()
155                 .await
156                 .read_timeout(Duration::from_secs(2))
157                 .await
158             {
159                 Ok(data) => {
160                     self.bytesio_data = data;
161                 }
162                 Err(err) => {
163                     self.common
164                         .unpublish_to_channels(
165                             self.app_name.clone(),
166                             self.stream_name.clone(),
167                             self.session_id,
168                         )
169                         .await?;
170 
171                     return Err(SessionError {
172                         value: SessionErrorValue::BytesIOError(err),
173                     });
174                 }
175             }
176 
177             self.unpacketizer.extend_data(&self.bytesio_data[..]);
178         }
179 
180         self.has_remaing_data = false;
181 
182         loop {
183             let result = self.unpacketizer.read_chunks();
184 
185             if let Ok(rv) = result {
186                 if let UnpackResult::Chunks(chunks) = rv {
187                     for chunk_info in chunks {
188                         let timestamp = chunk_info.message_header.timestamp;
189                         let msg_stream_id = chunk_info.message_header.msg_streamd_id;
190 
191                         if let Some(mut msg) = MessageParser::new(chunk_info).parse()? {
192                             self.process_messages(&mut msg, &msg_stream_id, &timestamp)
193                                 .await?;
194                         }
195                     }
196                 }
197             } else {
198                 break;
199             }
200         }
201         Ok(())
202     }
203 
204     async fn play(&mut self) -> Result<(), SessionError> {
205         match self.common.send_channel_data().await {
206             Ok(_) => {}
207             Err(err) => {
208                 self.common
209                     .unsubscribe_from_channels(
210                         self.app_name.clone(),
211                         self.stream_name.clone(),
212                         self.session_id,
213                     )
214                     .await?;
215                 return Err(err);
216             }
217         }
218 
219         Ok(())
220     }
221 
222     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
223         let mut controlmessage =
224             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
225         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
226 
227         Ok(())
228     }
229 
230     pub async fn process_messages(
231         &mut self,
232         rtmp_msg: &mut RtmpMessageData,
233         msg_stream_id: &u32,
234         timestamp: &u32,
235     ) -> Result<(), SessionError> {
236         match rtmp_msg {
237             RtmpMessageData::Amf0Command {
238                 command_name,
239                 transaction_id,
240                 command_object,
241                 others,
242             } => {
243                 self.on_amf0_command_message(
244                     msg_stream_id,
245                     command_name,
246                     transaction_id,
247                     command_object,
248                     others,
249                 )
250                 .await?
251             }
252             RtmpMessageData::SetChunkSize { chunk_size } => {
253                 self.on_set_chunk_size(*chunk_size as usize)?;
254             }
255             RtmpMessageData::AudioData { data } => {
256                 self.common.on_audio_data(data, timestamp).await?;
257             }
258             RtmpMessageData::VideoData { data } => {
259                 self.common.on_video_data(data, timestamp).await?;
260             }
261             RtmpMessageData::AmfData { raw_data } => {
262                 self.common.on_meta_data(raw_data, timestamp).await?;
263             }
264 
265             _ => {}
266         }
267         Ok(())
268     }
269 
270     pub async fn on_amf0_command_message(
271         &mut self,
272         stream_id: &u32,
273         command_name: &Amf0ValueType,
274         transaction_id: &Amf0ValueType,
275         command_object: &Amf0ValueType,
276         others: &mut Vec<Amf0ValueType>,
277     ) -> Result<(), SessionError> {
278         let empty_cmd_name = &String::new();
279         let cmd_name = match command_name {
280             Amf0ValueType::UTF8String(str) => str,
281             _ => empty_cmd_name,
282         };
283 
284         let transaction_id = match transaction_id {
285             Amf0ValueType::Number(number) => number,
286             _ => &0.0,
287         };
288 
289         let empty_cmd_obj: IndexMap<String, Amf0ValueType> = IndexMap::new();
290         let obj = match command_object {
291             Amf0ValueType::Object(obj) => obj,
292             _ => &empty_cmd_obj,
293         };
294 
295         match cmd_name.as_str() {
296             "connect" => {
297                 log::info!("[ S<-C ] [connect] ");
298                 self.on_connect(transaction_id, obj).await?;
299             }
300             "createStream" => {
301                 log::info!("[ S<-C ] [create stream] ");
302                 self.on_create_stream(transaction_id).await?;
303             }
304             "deleteStream" => {
305                 if !others.is_empty() {
306                     let stream_id = match others.pop() {
307                         Some(Amf0ValueType::Number(streamid)) => streamid,
308                         _ => 0.0,
309                     };
310 
311                     log::info!(
312                         "[ S<-C ] [delete stream] app_name: {}, stream_name: {}",
313                         self.app_name,
314                         self.stream_name
315                     );
316 
317                     self.on_delete_stream(transaction_id, &stream_id).await?;
318                     self.state = ServerSessionState::DeleteStream;
319                 }
320             }
321             "play" => {
322                 log::info!(
323                     "[ S<-C ] [play]  app_name: {}, stream_name: {}",
324                     self.app_name,
325                     self.stream_name
326                 );
327                 self.unpacketizer.session_type = config::SERVER_PULL;
328                 self.on_play(transaction_id, stream_id, others).await?;
329             }
330             "publish" => {
331                 self.unpacketizer.session_type = config::SERVER_PUSH;
332                 self.on_publish(transaction_id, stream_id, others).await?;
333             }
334             _ => {}
335         }
336 
337         Ok(())
338     }
339 
340     fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> {
341         log::info!(
342             "[ S<-C ] [set chunk size]  app_name: {}, stream_name: {}, chunk size: {}",
343             self.app_name,
344             self.stream_name,
345             chunk_size
346         );
347         self.unpacketizer.update_max_chunk_size(chunk_size);
348         Ok(())
349     }
350 
351     fn parse_connect_properties(&mut self, command_obj: &IndexMap<String, Amf0ValueType>) {
352         for (property, value) in command_obj {
353             match property.as_str() {
354                 "app" => {
355                     if let Amf0ValueType::UTF8String(app) = value {
356                         self.connect_properties.app = Some(app.clone());
357                     }
358                 }
359                 "flashVer" => {
360                     if let Amf0ValueType::UTF8String(flash_ver) = value {
361                         self.connect_properties.flash_ver = Some(flash_ver.clone());
362                     }
363                 }
364                 "swfUrl" => {
365                     if let Amf0ValueType::UTF8String(swf_url) = value {
366                         self.connect_properties.swf_url = Some(swf_url.clone());
367                     }
368                 }
369                 "tcUrl" => {
370                     if let Amf0ValueType::UTF8String(tc_url) = value {
371                         self.connect_properties.tc_url = Some(tc_url.clone());
372                     }
373                 }
374                 "fpad" => {
375                     if let Amf0ValueType::Boolean(fpad) = value {
376                         self.connect_properties.fpad = Some(*fpad);
377                     }
378                 }
379                 "audioCodecs" => {
380                     if let Amf0ValueType::Number(audio_codecs) = value {
381                         self.connect_properties.audio_codecs = Some(*audio_codecs);
382                     }
383                 }
384                 "videoCodecs" => {
385                     if let Amf0ValueType::Number(video_codecs) = value {
386                         self.connect_properties.video_codecs = Some(*video_codecs);
387                     }
388                 }
389                 "videoFunction" => {
390                     if let Amf0ValueType::Number(video_function) = value {
391                         self.connect_properties.video_function = Some(*video_function);
392                     }
393                 }
394                 "pageUrl" => {
395                     if let Amf0ValueType::UTF8String(page_url) = value {
396                         self.connect_properties.page_url = Some(page_url.clone());
397                     }
398                 }
399                 "objectEncoding" => {
400                     if let Amf0ValueType::Number(object_encoding) = value {
401                         self.connect_properties.object_encoding = Some(*object_encoding);
402                     }
403                 }
404                 _ => {
405                     log::warn!("unknown connect properties: {}:{:?}", property, value);
406                 }
407             }
408         }
409     }
410 
411     async fn on_connect(
412         &mut self,
413         transaction_id: &f64,
414         command_obj: &IndexMap<String, Amf0ValueType>,
415     ) -> Result<(), SessionError> {
416         self.parse_connect_properties(command_obj);
417         log::info!("connect properties: {:?}", self.connect_properties);
418         let mut control_message =
419             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
420         log::info!("[ S->C ] [set window_acknowledgement_size]");
421         control_message
422             .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)
423             .await?;
424 
425         log::info!("[ S->C ] [set set_peer_bandwidth]",);
426         control_message
427             .write_set_peer_bandwidth(
428                 define::PEER_BANDWIDTH,
429                 define::peer_bandwidth_limit_type::DYNAMIC,
430             )
431             .await?;
432 
433         let obj_encoding = command_obj.get("objectEncoding");
434         let encoding = match obj_encoding {
435             Some(Amf0ValueType::Number(encoding)) => encoding,
436             _ => &define::OBJENCODING_AMF0,
437         };
438 
439         let app_name = command_obj.get("app");
440         self.app_name = match app_name {
441             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
442             _ => {
443                 return Err(SessionError {
444                     value: SessionErrorValue::NoAppName,
445                 });
446             }
447         };
448 
449         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
450         log::info!("[ S->C ] [set connect_response]",);
451         netconnection
452             .write_connect_response(
453                 transaction_id,
454                 define::FMSVER,
455                 &define::CAPABILITIES,
456                 &String::from("NetConnection.Connect.Success"),
457                 define::LEVEL,
458                 &String::from("Connection Succeeded."),
459                 encoding,
460             )
461             .await?;
462 
463         Ok(())
464     }
465 
466     pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
467         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
468         netconnection
469             .write_create_stream_response(transaction_id, &define::STREAM_ID)
470             .await?;
471 
472         log::info!(
473             "[ S->C ] [create_stream_response]  app_name: {}",
474             self.app_name,
475         );
476 
477         Ok(())
478     }
479 
480     pub async fn on_delete_stream(
481         &mut self,
482         transaction_id: &f64,
483         stream_id: &f64,
484     ) -> Result<(), SessionError> {
485         self.common
486             .unpublish_to_channels(
487                 self.app_name.clone(),
488                 self.stream_name.clone(),
489                 self.session_id,
490             )
491             .await?;
492 
493         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
494         netstream
495             .write_on_status(
496                 transaction_id,
497                 "status",
498                 "NetStream.DeleteStream.Suceess",
499                 "",
500             )
501             .await?;
502 
503         //self.unsubscribe_from_channels().await?;
504         log::info!(
505             "[ S->C ] [delete stream success]  app_name: {}, stream_name: {}",
506             self.app_name,
507             self.stream_name
508         );
509         log::trace!("{}", stream_id);
510 
511         Ok(())
512     }
513 
514     fn get_request_url(&mut self, raw_stream_name: String) -> String {
515         if let Some(tc_url) = &self.connect_properties.tc_url {
516             format!("{tc_url}/{raw_stream_name}")
517         } else {
518             format!("{}/{}", self.app_name.clone(), raw_stream_name)
519         }
520     }
521 
522     #[allow(clippy::never_loop)]
523     pub async fn on_play(
524         &mut self,
525         transaction_id: &f64,
526         stream_id: &u32,
527         other_values: &mut Vec<Amf0ValueType>,
528     ) -> Result<(), SessionError> {
529         let length = other_values.len() as u8;
530         let mut index: u8 = 0;
531 
532         let mut stream_name: Option<String> = None;
533         let mut start: Option<f64> = None;
534         let mut duration: Option<f64> = None;
535         let mut reset: Option<bool> = None;
536 
537         loop {
538             if index >= length {
539                 break;
540             }
541             index += 1;
542             stream_name = match other_values.remove(0) {
543                 Amf0ValueType::UTF8String(val) => Some(val),
544                 _ => None,
545             };
546 
547             if index >= length {
548                 break;
549             }
550             index += 1;
551             start = match other_values.remove(0) {
552                 Amf0ValueType::Number(val) => Some(val),
553                 _ => None,
554             };
555 
556             if index >= length {
557                 break;
558             }
559             index += 1;
560             duration = match other_values.remove(0) {
561                 Amf0ValueType::Number(val) => Some(val),
562                 _ => None,
563             };
564 
565             if index >= length {
566                 break;
567             }
568             //index = index + 1;
569             reset = match other_values.remove(0) {
570                 Amf0ValueType::Boolean(val) => Some(val),
571                 _ => None,
572             };
573             break;
574         }
575 
576         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
577         event_messages.write_stream_begin(*stream_id).await?;
578         log::info!(
579             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}",
580             self.app_name,
581             self.stream_name
582         );
583         log::trace!(
584             "{} {} {}",
585             start.is_some(),
586             duration.is_some(),
587             reset.is_some()
588         );
589 
590         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
591         netstream
592             .write_on_status(transaction_id, "status", "NetStream.Play.Reset", "reset")
593             .await?;
594 
595         netstream
596             .write_on_status(
597                 transaction_id,
598                 "status",
599                 "NetStream.Play.Start",
600                 "play start",
601             )
602             .await?;
603 
604         netstream
605             .write_on_status(
606                 transaction_id,
607                 "status",
608                 "NetStream.Data.Start",
609                 "data start.",
610             )
611             .await?;
612 
613         netstream
614             .write_on_status(
615                 transaction_id,
616                 "status",
617                 "NetStream.Play.PublishNotify",
618                 "play publish notify.",
619             )
620             .await?;
621 
622         event_messages.write_stream_is_record(*stream_id).await?;
623 
624         let raw_stream_name = stream_name.unwrap();
625 
626         (self.stream_name, self.url_parameters) = RtmpUrlParser::default()
627             .set_raw_stream_name(raw_stream_name.clone())
628             .parse_raw_stream_name();
629 
630         log::info!(
631             "[ S->C ] [stream is record]  app_name: {}, stream_name: {}, url parameters: {}",
632             self.app_name,
633             self.stream_name,
634             self.url_parameters
635         );
636 
637         /*Now it can update the request url*/
638         self.common.request_url = self.get_request_url(raw_stream_name);
639         self.common
640             .subscribe_from_channels(
641                 self.app_name.clone(),
642                 self.stream_name.clone(),
643                 self.session_id,
644             )
645             .await?;
646 
647         self.state = ServerSessionState::Play;
648 
649         Ok(())
650     }
651 
652     pub async fn on_publish(
653         &mut self,
654         transaction_id: &f64,
655         stream_id: &u32,
656         other_values: &mut Vec<Amf0ValueType>,
657     ) -> Result<(), SessionError> {
658         let length = other_values.len();
659 
660         if length < 2 {
661             return Err(SessionError {
662                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
663             });
664         }
665 
666         let raw_stream_name = match other_values.remove(0) {
667             Amf0ValueType::UTF8String(val) => val,
668             _ => {
669                 return Err(SessionError {
670                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
671                 });
672             }
673         };
674 
675         (self.stream_name, self.url_parameters) = RtmpUrlParser::default()
676             .set_raw_stream_name(raw_stream_name.clone())
677             .parse_raw_stream_name();
678 
679         /*Now it can update the request url*/
680         self.common.request_url = self.get_request_url(raw_stream_name);
681 
682         let _ = match other_values.remove(0) {
683             Amf0ValueType::UTF8String(val) => val,
684             _ => {
685                 return Err(SessionError {
686                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
687                 });
688             }
689         };
690 
691         log::info!(
692             "[ S<-C ] [publish]  app_name: {}, stream_name: {}, url parameters: {}",
693             self.app_name,
694             self.stream_name,
695             self.url_parameters
696         );
697 
698         log::info!(
699             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}, url parameters: {}",
700             self.app_name,
701             self.stream_name,
702             self.url_parameters
703         );
704 
705         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
706         event_messages.write_stream_begin(*stream_id).await?;
707 
708         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
709         netstream
710             .write_on_status(transaction_id, "status", "NetStream.Publish.Start", "")
711             .await?;
712         log::info!(
713             "[ S->C ] [NetStream.Publish.Start]  app_name: {}, stream_name: {}",
714             self.app_name,
715             self.stream_name
716         );
717 
718         self.common
719             .publish_to_channels(
720                 self.app_name.clone(),
721                 self.stream_name.clone(),
722                 self.session_id,
723                 self.gop_num,
724             )
725             .await?;
726 
727         Ok(())
728     }
729 }
730