1 use crate::chunk::{errors::UnpackErrorValue, packetizer::ChunkPacketizer};
2 
3 use {
4     super::{
5         common::Common,
6         define,
7         define::SessionType,
8         errors::{SessionError, SessionErrorValue},
9     },
10     crate::{
11         amf0::Amf0ValueType,
12         chunk::{
13             define::CHUNK_SIZE,
14             unpacketizer::{ChunkUnpacketizer, UnpackResult},
15         },
16         config, handshake,
17         handshake::{define::ServerHandshakeState, handshake_server::HandshakeServer},
18         messages::{define::RtmpMessageData, parser::MessageParser},
19         netconnection::writer::{ConnectProperties, NetConnection},
20         netstream::writer::NetStreamWriter,
21         protocol_control_messages::writer::ProtocolControlMessagesWriter,
22         user_control_messages::writer::EventMessagesWriter,
23         utils::RtmpUrlParser,
24     },
25     bytes::BytesMut,
26     bytesio::{
27         bytes_writer::AsyncBytesWriter,
28         bytesio::{TNetIO, TcpIO},
29     },
30     indexmap::IndexMap,
31     std::{sync::Arc, time::Duration},
32     streamhub::{
33         define::StreamHubEventSender,
34         utils::{RandomDigitCount, Uuid},
35     },
36     tokio::{net::TcpStream, sync::Mutex},
37 };
38 
39 enum ServerSessionState {
40     Handshake,
41     ReadChunk,
42     // OnConnect,
43     // OnCreateStream,
44     //Publish,
45     DeleteStream,
46     Play,
47 }
48 
49 pub struct ServerSession {
50     pub app_name: String,
51     pub stream_name: String,
52     pub url_parameters: String,
53     io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>,
54     handshaker: HandshakeServer,
55     unpacketizer: ChunkUnpacketizer,
56     state: ServerSessionState,
57     bytesio_data: BytesMut,
58     has_remaing_data: bool,
59     /* Used to mark the subscriber's the data producer
60     in channels and delete it from map when unsubscribe
61     is called. */
62     pub session_id: Uuid,
63     connect_properties: ConnectProperties,
64     pub common: Common,
65     /*configure how many gops will be cached.*/
66     gop_num: usize,
67 }
68 
69 impl ServerSession {
new(stream: TcpStream, event_producer: StreamHubEventSender, gop_num: usize) -> Self70     pub fn new(stream: TcpStream, event_producer: StreamHubEventSender, gop_num: usize) -> Self {
71         let remote_addr = if let Ok(addr) = stream.peer_addr() {
72             log::info!("server session: {}", addr.to_string());
73             Some(addr)
74         } else {
75             None
76         };
77 
78         let tcp_io: Box<dyn TNetIO + Send + Sync> = Box::new(TcpIO::new(stream));
79         let net_io = Arc::new(Mutex::new(tcp_io));
80 
81         Self {
82             app_name: String::from(""),
83             stream_name: String::from(""),
84             url_parameters: String::from(""),
85             io: Arc::clone(&net_io),
86             handshaker: HandshakeServer::new(Arc::clone(&net_io)),
87             unpacketizer: ChunkUnpacketizer::new(),
88             state: ServerSessionState::Handshake,
89             common: Common::new(
90                 Some(ChunkPacketizer::new(Arc::clone(&net_io))),
91                 event_producer,
92                 SessionType::Server,
93                 remote_addr,
94             ),
95             session_id: Uuid::new(RandomDigitCount::Four),
96             bytesio_data: BytesMut::new(),
97             has_remaing_data: false,
98             connect_properties: ConnectProperties::default(),
99             gop_num,
100         }
101     }
102 
run(&mut self) -> Result<(), SessionError>103     pub async fn run(&mut self) -> Result<(), SessionError> {
104         loop {
105             match self.state {
106                 ServerSessionState::Handshake => {
107                     self.handshake().await?;
108                 }
109                 ServerSessionState::ReadChunk => {
110                     self.read_parse_chunks().await?;
111                 }
112                 ServerSessionState::Play => {
113                     self.play().await?;
114                 }
115                 ServerSessionState::DeleteStream => {
116                     return Ok(());
117                 }
118             }
119         }
120 
121         //Ok(())
122     }
123 
handshake(&mut self) -> Result<(), SessionError>124     async fn handshake(&mut self) -> Result<(), SessionError> {
125         let mut bytes_len = 0;
126 
127         while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE {
128             self.bytesio_data = self.io.lock().await.read().await?;
129             bytes_len += self.bytesio_data.len();
130             self.handshaker.extend_data(&self.bytesio_data[..]);
131         }
132 
133         self.handshaker.handshake().await?;
134 
135         if let ServerHandshakeState::Finish = self.handshaker.state() {
136             self.state = ServerSessionState::ReadChunk;
137             let left_bytes = self.handshaker.get_remaining_bytes();
138             if !left_bytes.is_empty() {
139                 self.unpacketizer.extend_data(&left_bytes[..]);
140                 self.has_remaing_data = true;
141             }
142             log::info!("[ S->C ] [send_set_chunk_size] ");
143             self.send_set_chunk_size().await?;
144             return Ok(());
145         }
146 
147         Ok(())
148     }
149 
read_parse_chunks(&mut self) -> Result<(), SessionError>150     async fn read_parse_chunks(&mut self) -> Result<(), SessionError> {
151         if !self.has_remaing_data {
152             match self
153                 .io
154                 .lock()
155                 .await
156                 .read_timeout(Duration::from_secs(2))
157                 .await
158             {
159                 Ok(data) => {
160                     self.bytesio_data = data;
161                 }
162                 Err(err) => {
163                     self.common
164                         .unpublish_to_channels(
165                             self.app_name.clone(),
166                             self.stream_name.clone(),
167                             self.session_id,
168                         )
169                         .await?;
170 
171                     return Err(SessionError {
172                         value: SessionErrorValue::BytesIOError(err),
173                     });
174                 }
175             }
176 
177             self.unpacketizer.extend_data(&self.bytesio_data[..]);
178         }
179 
180         self.has_remaing_data = false;
181 
182         loop {
183             match self.unpacketizer.read_chunks() {
184                 Ok(rv) => {
185                     if let UnpackResult::Chunks(chunks) = rv {
186                         for chunk_info in chunks {
187                             let timestamp = chunk_info.message_header.timestamp;
188                             let msg_stream_id = chunk_info.message_header.msg_streamd_id;
189 
190                             if let Some(mut msg) = MessageParser::new(chunk_info).parse()? {
191                                 self.process_messages(&mut msg, &msg_stream_id, &timestamp)
192                                     .await?;
193                             }
194                         }
195                     }
196                 }
197                 Err(err) => {
198                     if let UnpackErrorValue::CannotParse = err.value {
199                         self.common
200                             .unpublish_to_channels(
201                                 self.app_name.clone(),
202                                 self.stream_name.clone(),
203                                 self.session_id,
204                             )
205                             .await?;
206                         return Err(err)?;
207                     }
208                     break;
209                 }
210             }
211         }
212         Ok(())
213     }
214 
play(&mut self) -> Result<(), SessionError>215     async fn play(&mut self) -> Result<(), SessionError> {
216         match self.common.send_channel_data().await {
217             Ok(_) => {}
218             Err(err) => {
219                 self.common
220                     .unsubscribe_from_channels(
221                         self.app_name.clone(),
222                         self.stream_name.clone(),
223                         self.session_id,
224                     )
225                     .await?;
226                 return Err(err);
227             }
228         }
229 
230         Ok(())
231     }
232 
send_set_chunk_size(&mut self) -> Result<(), SessionError>233     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
234         let mut controlmessage =
235             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
236         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
237 
238         Ok(())
239     }
240 
process_messages( &mut self, rtmp_msg: &mut RtmpMessageData, msg_stream_id: &u32, timestamp: &u32, ) -> Result<(), SessionError>241     pub async fn process_messages(
242         &mut self,
243         rtmp_msg: &mut RtmpMessageData,
244         msg_stream_id: &u32,
245         timestamp: &u32,
246     ) -> Result<(), SessionError> {
247         match rtmp_msg {
248             RtmpMessageData::Amf0Command {
249                 command_name,
250                 transaction_id,
251                 command_object,
252                 others,
253             } => {
254                 self.on_amf0_command_message(
255                     msg_stream_id,
256                     command_name,
257                     transaction_id,
258                     command_object,
259                     others,
260                 )
261                 .await?
262             }
263             RtmpMessageData::SetChunkSize { chunk_size } => {
264                 self.on_set_chunk_size(*chunk_size as usize)?;
265             }
266             RtmpMessageData::AudioData { data } => {
267                 self.common.on_audio_data(data, timestamp).await?;
268             }
269             RtmpMessageData::VideoData { data } => {
270                 self.common.on_video_data(data, timestamp).await?;
271             }
272             RtmpMessageData::AmfData { raw_data } => {
273                 self.common.on_meta_data(raw_data, timestamp).await?;
274             }
275 
276             _ => {}
277         }
278         Ok(())
279     }
280 
on_amf0_command_message( &mut self, stream_id: &u32, command_name: &Amf0ValueType, transaction_id: &Amf0ValueType, command_object: &Amf0ValueType, others: &mut Vec<Amf0ValueType>, ) -> Result<(), SessionError>281     pub async fn on_amf0_command_message(
282         &mut self,
283         stream_id: &u32,
284         command_name: &Amf0ValueType,
285         transaction_id: &Amf0ValueType,
286         command_object: &Amf0ValueType,
287         others: &mut Vec<Amf0ValueType>,
288     ) -> Result<(), SessionError> {
289         let empty_cmd_name = &String::new();
290         let cmd_name = match command_name {
291             Amf0ValueType::UTF8String(str) => str,
292             _ => empty_cmd_name,
293         };
294 
295         let transaction_id = match transaction_id {
296             Amf0ValueType::Number(number) => number,
297             _ => &0.0,
298         };
299 
300         let empty_cmd_obj: IndexMap<String, Amf0ValueType> = IndexMap::new();
301         let obj = match command_object {
302             Amf0ValueType::Object(obj) => obj,
303             _ => &empty_cmd_obj,
304         };
305 
306         match cmd_name.as_str() {
307             "connect" => {
308                 log::info!("[ S<-C ] [connect] ");
309                 self.on_connect(transaction_id, obj).await?;
310             }
311             "createStream" => {
312                 log::info!("[ S<-C ] [create stream] ");
313                 self.on_create_stream(transaction_id).await?;
314             }
315             "deleteStream" => {
316                 if !others.is_empty() {
317                     let stream_id = match others.pop() {
318                         Some(Amf0ValueType::Number(streamid)) => streamid,
319                         _ => 0.0,
320                     };
321 
322                     log::info!(
323                         "[ S<-C ] [delete stream] app_name: {}, stream_name: {}",
324                         self.app_name,
325                         self.stream_name
326                     );
327 
328                     self.on_delete_stream(transaction_id, &stream_id).await?;
329                     self.state = ServerSessionState::DeleteStream;
330                 }
331             }
332             "play" => {
333                 log::info!(
334                     "[ S<-C ] [play]  app_name: {}, stream_name: {}",
335                     self.app_name,
336                     self.stream_name
337                 );
338                 self.unpacketizer.session_type = config::SERVER_PULL;
339                 self.on_play(transaction_id, stream_id, others).await?;
340             }
341             "publish" => {
342                 self.unpacketizer.session_type = config::SERVER_PUSH;
343                 self.on_publish(transaction_id, stream_id, others).await?;
344             }
345             _ => {}
346         }
347 
348         Ok(())
349     }
350 
on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError>351     fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> {
352         log::info!(
353             "[ S<-C ] [set chunk size]  app_name: {}, stream_name: {}, chunk size: {}",
354             self.app_name,
355             self.stream_name,
356             chunk_size
357         );
358         self.unpacketizer.update_max_chunk_size(chunk_size);
359         Ok(())
360     }
361 
parse_connect_properties(&mut self, command_obj: &IndexMap<String, Amf0ValueType>)362     fn parse_connect_properties(&mut self, command_obj: &IndexMap<String, Amf0ValueType>) {
363         for (property, value) in command_obj {
364             match property.as_str() {
365                 "app" => {
366                     if let Amf0ValueType::UTF8String(app) = value {
367                         self.connect_properties.app = Some(app.clone());
368                     }
369                 }
370                 "flashVer" => {
371                     if let Amf0ValueType::UTF8String(flash_ver) = value {
372                         self.connect_properties.flash_ver = Some(flash_ver.clone());
373                     }
374                 }
375                 "swfUrl" => {
376                     if let Amf0ValueType::UTF8String(swf_url) = value {
377                         self.connect_properties.swf_url = Some(swf_url.clone());
378                     }
379                 }
380                 "tcUrl" => {
381                     if let Amf0ValueType::UTF8String(tc_url) = value {
382                         self.connect_properties.tc_url = Some(tc_url.clone());
383                     }
384                 }
385                 "fpad" => {
386                     if let Amf0ValueType::Boolean(fpad) = value {
387                         self.connect_properties.fpad = Some(*fpad);
388                     }
389                 }
390                 "audioCodecs" => {
391                     if let Amf0ValueType::Number(audio_codecs) = value {
392                         self.connect_properties.audio_codecs = Some(*audio_codecs);
393                     }
394                 }
395                 "videoCodecs" => {
396                     if let Amf0ValueType::Number(video_codecs) = value {
397                         self.connect_properties.video_codecs = Some(*video_codecs);
398                     }
399                 }
400                 "videoFunction" => {
401                     if let Amf0ValueType::Number(video_function) = value {
402                         self.connect_properties.video_function = Some(*video_function);
403                     }
404                 }
405                 "pageUrl" => {
406                     if let Amf0ValueType::UTF8String(page_url) = value {
407                         self.connect_properties.page_url = Some(page_url.clone());
408                     }
409                 }
410                 "objectEncoding" => {
411                     if let Amf0ValueType::Number(object_encoding) = value {
412                         self.connect_properties.object_encoding = Some(*object_encoding);
413                     }
414                 }
415                 _ => {
416                     log::warn!("unknown connect properties: {}:{:?}", property, value);
417                 }
418             }
419         }
420     }
421 
on_connect( &mut self, transaction_id: &f64, command_obj: &IndexMap<String, Amf0ValueType>, ) -> Result<(), SessionError>422     async fn on_connect(
423         &mut self,
424         transaction_id: &f64,
425         command_obj: &IndexMap<String, Amf0ValueType>,
426     ) -> Result<(), SessionError> {
427         self.parse_connect_properties(command_obj);
428         log::info!("connect properties: {:?}", self.connect_properties);
429         let mut control_message =
430             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
431         log::info!("[ S->C ] [set window_acknowledgement_size]");
432         control_message
433             .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)
434             .await?;
435 
436         log::info!("[ S->C ] [set set_peer_bandwidth]",);
437         control_message
438             .write_set_peer_bandwidth(
439                 define::PEER_BANDWIDTH,
440                 define::peer_bandwidth_limit_type::DYNAMIC,
441             )
442             .await?;
443 
444         let obj_encoding = command_obj.get("objectEncoding");
445         let encoding = match obj_encoding {
446             Some(Amf0ValueType::Number(encoding)) => encoding,
447             _ => &define::OBJENCODING_AMF0,
448         };
449 
450         let app_name = command_obj.get("app");
451         self.app_name = match app_name {
452             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
453             _ => {
454                 return Err(SessionError {
455                     value: SessionErrorValue::NoAppName,
456                 });
457             }
458         };
459 
460         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
461         log::info!("[ S->C ] [set connect_response]",);
462         netconnection
463             .write_connect_response(
464                 transaction_id,
465                 define::FMSVER,
466                 &define::CAPABILITIES,
467                 &String::from("NetConnection.Connect.Success"),
468                 define::LEVEL,
469                 &String::from("Connection Succeeded."),
470                 encoding,
471             )
472             .await?;
473 
474         Ok(())
475     }
476 
on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError>477     pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
478         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
479         netconnection
480             .write_create_stream_response(transaction_id, &define::STREAM_ID)
481             .await?;
482 
483         log::info!(
484             "[ S->C ] [create_stream_response]  app_name: {}",
485             self.app_name,
486         );
487 
488         Ok(())
489     }
490 
on_delete_stream( &mut self, transaction_id: &f64, stream_id: &f64, ) -> Result<(), SessionError>491     pub async fn on_delete_stream(
492         &mut self,
493         transaction_id: &f64,
494         stream_id: &f64,
495     ) -> Result<(), SessionError> {
496         self.common
497             .unpublish_to_channels(
498                 self.app_name.clone(),
499                 self.stream_name.clone(),
500                 self.session_id,
501             )
502             .await?;
503 
504         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
505         netstream
506             .write_on_status(
507                 transaction_id,
508                 "status",
509                 "NetStream.DeleteStream.Suceess",
510                 "",
511             )
512             .await?;
513 
514         //self.unsubscribe_from_channels().await?;
515         log::info!(
516             "[ S->C ] [delete stream success]  app_name: {}, stream_name: {}",
517             self.app_name,
518             self.stream_name
519         );
520         log::trace!("{}", stream_id);
521 
522         Ok(())
523     }
524 
get_request_url(&mut self, raw_stream_name: String) -> String525     fn get_request_url(&mut self, raw_stream_name: String) -> String {
526         if let Some(tc_url) = &self.connect_properties.tc_url {
527             format!("{tc_url}/{raw_stream_name}")
528         } else {
529             format!("{}/{}", self.app_name.clone(), raw_stream_name)
530         }
531     }
532 
533     #[allow(clippy::never_loop)]
on_play( &mut self, transaction_id: &f64, stream_id: &u32, other_values: &mut Vec<Amf0ValueType>, ) -> Result<(), SessionError>534     pub async fn on_play(
535         &mut self,
536         transaction_id: &f64,
537         stream_id: &u32,
538         other_values: &mut Vec<Amf0ValueType>,
539     ) -> Result<(), SessionError> {
540         let length = other_values.len() as u8;
541         let mut index: u8 = 0;
542 
543         let mut stream_name: Option<String> = None;
544         let mut start: Option<f64> = None;
545         let mut duration: Option<f64> = None;
546         let mut reset: Option<bool> = None;
547 
548         loop {
549             if index >= length {
550                 break;
551             }
552             index += 1;
553             stream_name = match other_values.remove(0) {
554                 Amf0ValueType::UTF8String(val) => Some(val),
555                 _ => None,
556             };
557 
558             if index >= length {
559                 break;
560             }
561             index += 1;
562             start = match other_values.remove(0) {
563                 Amf0ValueType::Number(val) => Some(val),
564                 _ => None,
565             };
566 
567             if index >= length {
568                 break;
569             }
570             index += 1;
571             duration = match other_values.remove(0) {
572                 Amf0ValueType::Number(val) => Some(val),
573                 _ => None,
574             };
575 
576             if index >= length {
577                 break;
578             }
579             //index = index + 1;
580             reset = match other_values.remove(0) {
581                 Amf0ValueType::Boolean(val) => Some(val),
582                 _ => None,
583             };
584             break;
585         }
586 
587         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
588         event_messages.write_stream_begin(*stream_id).await?;
589         log::info!(
590             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}",
591             self.app_name,
592             self.stream_name
593         );
594         log::trace!(
595             "{} {} {}",
596             start.is_some(),
597             duration.is_some(),
598             reset.is_some()
599         );
600 
601         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
602         netstream
603             .write_on_status(transaction_id, "status", "NetStream.Play.Reset", "reset")
604             .await?;
605 
606         netstream
607             .write_on_status(
608                 transaction_id,
609                 "status",
610                 "NetStream.Play.Start",
611                 "play start",
612             )
613             .await?;
614 
615         netstream
616             .write_on_status(
617                 transaction_id,
618                 "status",
619                 "NetStream.Data.Start",
620                 "data start.",
621             )
622             .await?;
623 
624         netstream
625             .write_on_status(
626                 transaction_id,
627                 "status",
628                 "NetStream.Play.PublishNotify",
629                 "play publish notify.",
630             )
631             .await?;
632 
633         event_messages.write_stream_is_record(*stream_id).await?;
634 
635         let raw_stream_name = stream_name.unwrap();
636 
637         (self.stream_name, self.url_parameters) = RtmpUrlParser::default()
638             .set_raw_stream_name(raw_stream_name.clone())
639             .parse_raw_stream_name();
640 
641         log::info!(
642             "[ S->C ] [stream is record]  app_name: {}, stream_name: {}, url parameters: {}",
643             self.app_name,
644             self.stream_name,
645             self.url_parameters
646         );
647 
648         /*Now it can update the request url*/
649         self.common.request_url = self.get_request_url(raw_stream_name);
650         self.common
651             .subscribe_from_channels(
652                 self.app_name.clone(),
653                 self.stream_name.clone(),
654                 self.session_id,
655             )
656             .await?;
657 
658         self.state = ServerSessionState::Play;
659 
660         Ok(())
661     }
662 
on_publish( &mut self, transaction_id: &f64, stream_id: &u32, other_values: &mut Vec<Amf0ValueType>, ) -> Result<(), SessionError>663     pub async fn on_publish(
664         &mut self,
665         transaction_id: &f64,
666         stream_id: &u32,
667         other_values: &mut Vec<Amf0ValueType>,
668     ) -> Result<(), SessionError> {
669         let length = other_values.len();
670 
671         if length < 2 {
672             return Err(SessionError {
673                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
674             });
675         }
676 
677         let raw_stream_name = match other_values.remove(0) {
678             Amf0ValueType::UTF8String(val) => val,
679             _ => {
680                 return Err(SessionError {
681                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
682                 });
683             }
684         };
685 
686         (self.stream_name, self.url_parameters) = RtmpUrlParser::default()
687             .set_raw_stream_name(raw_stream_name.clone())
688             .parse_raw_stream_name();
689 
690         /*Now it can update the request url*/
691         self.common.request_url = self.get_request_url(raw_stream_name);
692 
693         let _ = match other_values.remove(0) {
694             Amf0ValueType::UTF8String(val) => val,
695             _ => {
696                 return Err(SessionError {
697                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
698                 });
699             }
700         };
701 
702         log::info!(
703             "[ S<-C ] [publish]  app_name: {}, stream_name: {}, url parameters: {}",
704             self.app_name,
705             self.stream_name,
706             self.url_parameters
707         );
708 
709         log::info!(
710             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}, url parameters: {}",
711             self.app_name,
712             self.stream_name,
713             self.url_parameters
714         );
715 
716         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
717         event_messages.write_stream_begin(*stream_id).await?;
718 
719         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
720         netstream
721             .write_on_status(transaction_id, "status", "NetStream.Publish.Start", "")
722             .await?;
723         log::info!(
724             "[ S->C ] [NetStream.Publish.Start]  app_name: {}, stream_name: {}",
725             self.app_name,
726             self.stream_name
727         );
728 
729         self.common
730             .publish_to_channels(
731                 self.app_name.clone(),
732                 self.stream_name.clone(),
733                 self.session_id,
734                 self.gop_num,
735             )
736             .await?;
737 
738         Ok(())
739     }
740 }
741