1 use crate::chunk::packetizer::ChunkPacketizer;
2 
3 use {
4     super::{
5         common::Common,
6         define,
7         define::SessionType,
8         errors::{SessionError, SessionErrorValue},
9     },
10     crate::{
11         amf0::Amf0ValueType,
12         chunk::{
13             define::CHUNK_SIZE,
14             unpacketizer::{ChunkUnpacketizer, UnpackResult},
15         },
16         config, handshake,
17         handshake::{define::ServerHandshakeState, handshake_server::HandshakeServer},
18         messages::{define::RtmpMessageData, parser::MessageParser},
19         netconnection::writer::{ConnectProperties, NetConnection},
20         netstream::writer::NetStreamWriter,
21         protocol_control_messages::writer::ProtocolControlMessagesWriter,
22         user_control_messages::writer::EventMessagesWriter,
23         utils::RtmpUrlParser,
24     },
25     bytes::BytesMut,
26     bytesio::{
27         bytes_writer::AsyncBytesWriter,
28         bytesio::{TNetIO, TcpIO},
29     },
30     indexmap::IndexMap,
31     std::{sync::Arc, time::Duration},
32     streamhub::{
33         define::StreamHubEventSender,
34         utils::{RandomDigitCount, Uuid},
35     },
36     tokio::{net::TcpStream, sync::Mutex},
37 };
38 
39 enum ServerSessionState {
40     Handshake,
41     ReadChunk,
42     // OnConnect,
43     // OnCreateStream,
44     //Publish,
45     DeleteStream,
46     Play,
47 }
48 
49 pub struct ServerSession {
50     pub app_name: String,
51     pub stream_name: String,
52     pub url_parameters: String,
53     io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>,
54     handshaker: HandshakeServer,
55     unpacketizer: ChunkUnpacketizer,
56     state: ServerSessionState,
57     bytesio_data: BytesMut,
58     has_remaing_data: bool,
59     /* Used to mark the subscriber's the data producer
60     in channels and delete it from map when unsubscribe
61     is called. */
62     pub session_id: Uuid,
63     connect_properties: ConnectProperties,
64     pub common: Common,
65     /*configure how many gops will be cached.*/
66     gop_num: usize,
67 }
68 
69 impl ServerSession {
70     pub fn new(stream: TcpStream, event_producer: StreamHubEventSender, gop_num: usize) -> Self {
71         let remote_addr = if let Ok(addr) = stream.peer_addr() {
72             log::info!("server session: {}", addr.to_string());
73             Some(addr)
74         } else {
75             None
76         };
77 
78         let tcp_io: Box<dyn TNetIO + Send + Sync> = Box::new(TcpIO::new(stream));
79         let net_io = Arc::new(Mutex::new(tcp_io));
80 
81         Self {
82             app_name: String::from(""),
83             stream_name: String::from(""),
84             url_parameters: String::from(""),
85             io: Arc::clone(&net_io),
86             handshaker: HandshakeServer::new(Arc::clone(&net_io)),
87             unpacketizer: ChunkUnpacketizer::new(),
88             state: ServerSessionState::Handshake,
89             common: Common::new(
90                 Some(ChunkPacketizer::new(Arc::clone(&net_io))),
91                 event_producer,
92                 SessionType::Server,
93                 remote_addr,
94             ),
95             session_id: Uuid::new(RandomDigitCount::Four),
96             bytesio_data: BytesMut::new(),
97             has_remaing_data: false,
98             connect_properties: ConnectProperties::default(),
99             gop_num,
100         }
101     }
102 
103     pub async fn run(&mut self) -> Result<(), SessionError> {
104         loop {
105             match self.state {
106                 ServerSessionState::Handshake => {
107                     self.handshake().await?;
108                 }
109                 ServerSessionState::ReadChunk => {
110                     self.read_parse_chunks().await?;
111                 }
112                 ServerSessionState::Play => {
113                     self.play().await?;
114                 }
115                 ServerSessionState::DeleteStream => {
116                     return Ok(());
117                 }
118             }
119         }
120 
121         //Ok(())
122     }
123 
124     async fn handshake(&mut self) -> Result<(), SessionError> {
125         let mut bytes_len = 0;
126 
127         while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE {
128             self.bytesio_data = self.io.lock().await.read().await?;
129             bytes_len += self.bytesio_data.len();
130             self.handshaker.extend_data(&self.bytesio_data[..]);
131         }
132 
133         self.handshaker.handshake().await?;
134 
135         if let ServerHandshakeState::Finish = self.handshaker.state() {
136             self.state = ServerSessionState::ReadChunk;
137             let left_bytes = self.handshaker.get_remaining_bytes();
138             if !left_bytes.is_empty() {
139                 self.unpacketizer.extend_data(&left_bytes[..]);
140                 self.has_remaing_data = true;
141             }
142             log::info!("[ S->C ] [send_set_chunk_size] ");
143             self.send_set_chunk_size().await?;
144             return Ok(());
145         }
146 
147         Ok(())
148     }
149 
150     async fn read_parse_chunks(&mut self) -> Result<(), SessionError> {
151         if !self.has_remaing_data {
152             match self
153                 .io
154                 .lock()
155                 .await
156                 .read_timeout(Duration::from_secs(2))
157                 .await
158             {
159                 Ok(data) => {
160                     self.bytesio_data = data;
161                 }
162                 Err(err) => {
163                     self.common
164                         .unpublish_to_channels(
165                             self.app_name.clone(),
166                             self.stream_name.clone(),
167                             self.session_id,
168                         )
169                         .await?;
170 
171                     return Err(SessionError {
172                         value: SessionErrorValue::BytesIOError(err),
173                     });
174                 }
175             }
176 
177             self.unpacketizer.extend_data(&self.bytesio_data[..]);
178         }
179 
180         self.has_remaing_data = false;
181 
182         loop {
183             let result = self.unpacketizer.read_chunks();
184 
185             if let Ok(rv) = result {
186                 if let UnpackResult::Chunks(chunks) = rv {
187                     for chunk_info in chunks {
188                         let timestamp = chunk_info.message_header.timestamp;
189                         let msg_stream_id = chunk_info.message_header.msg_streamd_id;
190 
191                         let mut msg = MessageParser::new(chunk_info).parse()?;
192                         self.process_messages(&mut msg, &msg_stream_id, &timestamp)
193                             .await?;
194                     }
195                 }
196             } else {
197                 break;
198             }
199         }
200         Ok(())
201     }
202 
203     async fn play(&mut self) -> Result<(), SessionError> {
204         match self.common.send_channel_data().await {
205             Ok(_) => {}
206             Err(err) => {
207                 self.common
208                     .unsubscribe_from_channels(
209                         self.app_name.clone(),
210                         self.stream_name.clone(),
211                         self.session_id,
212                     )
213                     .await?;
214                 return Err(err);
215             }
216         }
217 
218         Ok(())
219     }
220 
221     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
222         let mut controlmessage =
223             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
224         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
225 
226         Ok(())
227     }
228 
229     pub async fn process_messages(
230         &mut self,
231         rtmp_msg: &mut RtmpMessageData,
232         msg_stream_id: &u32,
233         timestamp: &u32,
234     ) -> Result<(), SessionError> {
235         match rtmp_msg {
236             RtmpMessageData::Amf0Command {
237                 command_name,
238                 transaction_id,
239                 command_object,
240                 others,
241             } => {
242                 self.on_amf0_command_message(
243                     msg_stream_id,
244                     command_name,
245                     transaction_id,
246                     command_object,
247                     others,
248                 )
249                 .await?
250             }
251             RtmpMessageData::SetChunkSize { chunk_size } => {
252                 self.on_set_chunk_size(*chunk_size as usize)?;
253             }
254             RtmpMessageData::AudioData { data } => {
255                 self.common.on_audio_data(data, timestamp).await?;
256             }
257             RtmpMessageData::VideoData { data } => {
258                 self.common.on_video_data(data, timestamp).await?;
259             }
260             RtmpMessageData::AmfData { raw_data } => {
261                 self.common.on_meta_data(raw_data, timestamp).await?;
262             }
263 
264             _ => {}
265         }
266         Ok(())
267     }
268 
269     pub async fn on_amf0_command_message(
270         &mut self,
271         stream_id: &u32,
272         command_name: &Amf0ValueType,
273         transaction_id: &Amf0ValueType,
274         command_object: &Amf0ValueType,
275         others: &mut Vec<Amf0ValueType>,
276     ) -> Result<(), SessionError> {
277         let empty_cmd_name = &String::new();
278         let cmd_name = match command_name {
279             Amf0ValueType::UTF8String(str) => str,
280             _ => empty_cmd_name,
281         };
282 
283         let transaction_id = match transaction_id {
284             Amf0ValueType::Number(number) => number,
285             _ => &0.0,
286         };
287 
288         let empty_cmd_obj: IndexMap<String, Amf0ValueType> = IndexMap::new();
289         let obj = match command_object {
290             Amf0ValueType::Object(obj) => obj,
291             _ => &empty_cmd_obj,
292         };
293 
294         match cmd_name.as_str() {
295             "connect" => {
296                 log::info!("[ S<-C ] [connect] ");
297                 self.on_connect(transaction_id, obj).await?;
298             }
299             "createStream" => {
300                 log::info!("[ S<-C ] [create stream] ");
301                 self.on_create_stream(transaction_id).await?;
302             }
303             "deleteStream" => {
304                 if !others.is_empty() {
305                     let stream_id = match others.pop() {
306                         Some(Amf0ValueType::Number(streamid)) => streamid,
307                         _ => 0.0,
308                     };
309 
310                     log::info!(
311                         "[ S<-C ] [delete stream] app_name: {}, stream_name: {}",
312                         self.app_name,
313                         self.stream_name
314                     );
315 
316                     self.on_delete_stream(transaction_id, &stream_id).await?;
317                     self.state = ServerSessionState::DeleteStream;
318                 }
319             }
320             "play" => {
321                 log::info!(
322                     "[ S<-C ] [play]  app_name: {}, stream_name: {}",
323                     self.app_name,
324                     self.stream_name
325                 );
326                 self.unpacketizer.session_type = config::SERVER_PULL;
327                 self.on_play(transaction_id, stream_id, others).await?;
328             }
329             "publish" => {
330                 self.unpacketizer.session_type = config::SERVER_PUSH;
331                 self.on_publish(transaction_id, stream_id, others).await?;
332             }
333             _ => {}
334         }
335 
336         Ok(())
337     }
338 
339     fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> {
340         log::info!(
341             "[ S<-C ] [set chunk size]  app_name: {}, stream_name: {}, chunk size: {}",
342             self.app_name,
343             self.stream_name,
344             chunk_size
345         );
346         self.unpacketizer.update_max_chunk_size(chunk_size);
347         Ok(())
348     }
349 
350     fn parse_connect_properties(&mut self, command_obj: &IndexMap<String, Amf0ValueType>) {
351         for (property, value) in command_obj {
352             match property.as_str() {
353                 "app" => {
354                     if let Amf0ValueType::UTF8String(app) = value {
355                         self.connect_properties.app = Some(app.clone());
356                     }
357                 }
358                 "flashVer" => {
359                     if let Amf0ValueType::UTF8String(flash_ver) = value {
360                         self.connect_properties.flash_ver = Some(flash_ver.clone());
361                     }
362                 }
363                 "swfUrl" => {
364                     if let Amf0ValueType::UTF8String(swf_url) = value {
365                         self.connect_properties.swf_url = Some(swf_url.clone());
366                     }
367                 }
368                 "tcUrl" => {
369                     if let Amf0ValueType::UTF8String(tc_url) = value {
370                         self.connect_properties.tc_url = Some(tc_url.clone());
371                     }
372                 }
373                 "fpad" => {
374                     if let Amf0ValueType::Boolean(fpad) = value {
375                         self.connect_properties.fpad = Some(*fpad);
376                     }
377                 }
378                 "audioCodecs" => {
379                     if let Amf0ValueType::Number(audio_codecs) = value {
380                         self.connect_properties.audio_codecs = Some(*audio_codecs);
381                     }
382                 }
383                 "videoCodecs" => {
384                     if let Amf0ValueType::Number(video_codecs) = value {
385                         self.connect_properties.video_codecs = Some(*video_codecs);
386                     }
387                 }
388                 "videoFunction" => {
389                     if let Amf0ValueType::Number(video_function) = value {
390                         self.connect_properties.video_function = Some(*video_function);
391                     }
392                 }
393                 "pageUrl" => {
394                     if let Amf0ValueType::UTF8String(page_url) = value {
395                         self.connect_properties.page_url = Some(page_url.clone());
396                     }
397                 }
398                 "objectEncoding" => {
399                     if let Amf0ValueType::Number(object_encoding) = value {
400                         self.connect_properties.object_encoding = Some(*object_encoding);
401                     }
402                 }
403                 _ => {
404                     log::warn!("unknown connect properties: {}:{:?}", property, value);
405                 }
406             }
407         }
408     }
409 
410     async fn on_connect(
411         &mut self,
412         transaction_id: &f64,
413         command_obj: &IndexMap<String, Amf0ValueType>,
414     ) -> Result<(), SessionError> {
415         self.parse_connect_properties(command_obj);
416         log::info!("connect properties: {:?}", self.connect_properties);
417         let mut control_message =
418             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
419         log::info!("[ S->C ] [set window_acknowledgement_size]");
420         control_message
421             .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)
422             .await?;
423 
424         log::info!("[ S->C ] [set set_peer_bandwidth]",);
425         control_message
426             .write_set_peer_bandwidth(
427                 define::PEER_BANDWIDTH,
428                 define::peer_bandwidth_limit_type::DYNAMIC,
429             )
430             .await?;
431 
432         let obj_encoding = command_obj.get("objectEncoding");
433         let encoding = match obj_encoding {
434             Some(Amf0ValueType::Number(encoding)) => encoding,
435             _ => &define::OBJENCODING_AMF0,
436         };
437 
438         let app_name = command_obj.get("app");
439         self.app_name = match app_name {
440             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
441             _ => {
442                 return Err(SessionError {
443                     value: SessionErrorValue::NoAppName,
444                 });
445             }
446         };
447 
448         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
449         log::info!("[ S->C ] [set connect_response]",);
450         netconnection
451             .write_connect_response(
452                 transaction_id,
453                 define::FMSVER,
454                 &define::CAPABILITIES,
455                 &String::from("NetConnection.Connect.Success"),
456                 define::LEVEL,
457                 &String::from("Connection Succeeded."),
458                 encoding,
459             )
460             .await?;
461 
462         Ok(())
463     }
464 
465     pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
466         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
467         netconnection
468             .write_create_stream_response(transaction_id, &define::STREAM_ID)
469             .await?;
470 
471         log::info!(
472             "[ S->C ] [create_stream_response]  app_name: {}",
473             self.app_name,
474         );
475 
476         Ok(())
477     }
478 
479     pub async fn on_delete_stream(
480         &mut self,
481         transaction_id: &f64,
482         stream_id: &f64,
483     ) -> Result<(), SessionError> {
484         self.common
485             .unpublish_to_channels(
486                 self.app_name.clone(),
487                 self.stream_name.clone(),
488                 self.session_id,
489             )
490             .await?;
491 
492         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
493         netstream
494             .write_on_status(
495                 transaction_id,
496                 "status",
497                 "NetStream.DeleteStream.Suceess",
498                 "",
499             )
500             .await?;
501 
502         //self.unsubscribe_from_channels().await?;
503         log::info!(
504             "[ S->C ] [delete stream success]  app_name: {}, stream_name: {}",
505             self.app_name,
506             self.stream_name
507         );
508         log::trace!("{}", stream_id);
509 
510         Ok(())
511     }
512 
513     fn get_request_url(&mut self, raw_stream_name: String) -> String {
514         if let Some(tc_url) = &self.connect_properties.tc_url {
515             format!("{tc_url}/{raw_stream_name}")
516         } else {
517             format!("{}/{}", self.app_name.clone(), raw_stream_name)
518         }
519     }
520 
521     #[allow(clippy::never_loop)]
522     pub async fn on_play(
523         &mut self,
524         transaction_id: &f64,
525         stream_id: &u32,
526         other_values: &mut Vec<Amf0ValueType>,
527     ) -> Result<(), SessionError> {
528         let length = other_values.len() as u8;
529         let mut index: u8 = 0;
530 
531         let mut stream_name: Option<String> = None;
532         let mut start: Option<f64> = None;
533         let mut duration: Option<f64> = None;
534         let mut reset: Option<bool> = None;
535 
536         loop {
537             if index >= length {
538                 break;
539             }
540             index += 1;
541             stream_name = match other_values.remove(0) {
542                 Amf0ValueType::UTF8String(val) => Some(val),
543                 _ => None,
544             };
545 
546             if index >= length {
547                 break;
548             }
549             index += 1;
550             start = match other_values.remove(0) {
551                 Amf0ValueType::Number(val) => Some(val),
552                 _ => None,
553             };
554 
555             if index >= length {
556                 break;
557             }
558             index += 1;
559             duration = match other_values.remove(0) {
560                 Amf0ValueType::Number(val) => Some(val),
561                 _ => None,
562             };
563 
564             if index >= length {
565                 break;
566             }
567             //index = index + 1;
568             reset = match other_values.remove(0) {
569                 Amf0ValueType::Boolean(val) => Some(val),
570                 _ => None,
571             };
572             break;
573         }
574 
575         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
576         event_messages.write_stream_begin(*stream_id).await?;
577         log::info!(
578             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}",
579             self.app_name,
580             self.stream_name
581         );
582         log::trace!(
583             "{} {} {}",
584             start.is_some(),
585             duration.is_some(),
586             reset.is_some()
587         );
588 
589         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
590         netstream
591             .write_on_status(transaction_id, "status", "NetStream.Play.Reset", "reset")
592             .await?;
593 
594         netstream
595             .write_on_status(
596                 transaction_id,
597                 "status",
598                 "NetStream.Play.Start",
599                 "play start",
600             )
601             .await?;
602 
603         netstream
604             .write_on_status(
605                 transaction_id,
606                 "status",
607                 "NetStream.Data.Start",
608                 "data start.",
609             )
610             .await?;
611 
612         netstream
613             .write_on_status(
614                 transaction_id,
615                 "status",
616                 "NetStream.Play.PublishNotify",
617                 "play publish notify.",
618             )
619             .await?;
620 
621         event_messages.write_stream_is_record(*stream_id).await?;
622 
623         let raw_stream_name = stream_name.unwrap();
624 
625         (self.stream_name, self.url_parameters) = RtmpUrlParser::default()
626             .set_raw_stream_name(raw_stream_name.clone())
627             .parse_raw_stream_name();
628 
629         log::info!(
630             "[ S->C ] [stream is record]  app_name: {}, stream_name: {}, url parameters: {}",
631             self.app_name,
632             self.stream_name,
633             self.url_parameters
634         );
635 
636         /*Now it can update the request url*/
637         self.common.request_url = self.get_request_url(raw_stream_name);
638         self.common
639             .subscribe_from_channels(
640                 self.app_name.clone(),
641                 self.stream_name.clone(),
642                 self.session_id,
643             )
644             .await?;
645 
646         self.state = ServerSessionState::Play;
647 
648         Ok(())
649     }
650 
651     pub async fn on_publish(
652         &mut self,
653         transaction_id: &f64,
654         stream_id: &u32,
655         other_values: &mut Vec<Amf0ValueType>,
656     ) -> Result<(), SessionError> {
657         let length = other_values.len();
658 
659         if length < 2 {
660             return Err(SessionError {
661                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
662             });
663         }
664 
665         let raw_stream_name = match other_values.remove(0) {
666             Amf0ValueType::UTF8String(val) => val,
667             _ => {
668                 return Err(SessionError {
669                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
670                 });
671             }
672         };
673 
674         (self.stream_name, self.url_parameters) = RtmpUrlParser::default()
675             .set_raw_stream_name(raw_stream_name.clone())
676             .parse_raw_stream_name();
677 
678         /*Now it can update the request url*/
679         self.common.request_url = self.get_request_url(raw_stream_name);
680 
681         let _ = match other_values.remove(0) {
682             Amf0ValueType::UTF8String(val) => val,
683             _ => {
684                 return Err(SessionError {
685                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
686                 });
687             }
688         };
689 
690         log::info!(
691             "[ S<-C ] [publish]  app_name: {}, stream_name: {}, url parameters: {}",
692             self.app_name,
693             self.stream_name,
694             self.url_parameters
695         );
696 
697         log::info!(
698             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}, url parameters: {}",
699             self.app_name,
700             self.stream_name,
701             self.url_parameters
702         );
703 
704         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
705         event_messages.write_stream_begin(*stream_id).await?;
706 
707         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
708         netstream
709             .write_on_status(transaction_id, "status", "NetStream.Publish.Start", "")
710             .await?;
711         log::info!(
712             "[ S->C ] [NetStream.Publish.Start]  app_name: {}, stream_name: {}",
713             self.app_name,
714             self.stream_name
715         );
716 
717         self.common
718             .publish_to_channels(
719                 self.app_name.clone(),
720                 self.stream_name.clone(),
721                 self.session_id,
722                 self.gop_num,
723             )
724             .await?;
725 
726         Ok(())
727     }
728 }
729