1 use {
2     super::{
3         common::Common,
4         define,
5         define::SessionType,
6         errors::{SessionError, SessionErrorValue},
7     },
8     crate::{
9         amf0::Amf0ValueType,
10         chunk::{
11             define::CHUNK_SIZE,
12             unpacketizer::{ChunkUnpacketizer, UnpackResult},
13         },
14         config, handshake,
15         handshake::{define::ServerHandshakeState, handshake_server::HandshakeServer},
16         messages::{define::RtmpMessageData, parser::MessageParser},
17         netconnection::writer::{ConnectProperties, NetConnection},
18         netstream::writer::NetStreamWriter,
19         protocol_control_messages::writer::ProtocolControlMessagesWriter,
20         user_control_messages::writer::EventMessagesWriter,
21         utils::RtmpUrlParser,
22     },
23     bytes::BytesMut,
24     bytesio::{
25         bytes_writer::AsyncBytesWriter,
26         bytesio::{TNetIO, TcpIO},
27     },
28     indexmap::IndexMap,
29     std::{sync::Arc, time::Duration},
30     streamhub::{
31         define::StreamHubEventSender,
32         utils::{RandomDigitCount, Uuid},
33     },
34     tokio::{net::TcpStream, sync::Mutex},
35 };
36 
37 enum ServerSessionState {
38     Handshake,
39     ReadChunk,
40     // OnConnect,
41     // OnCreateStream,
42     //Publish,
43     DeleteStream,
44     Play,
45 }
46 
47 pub struct ServerSession {
48     pub app_name: String,
49     pub stream_name: String,
50     pub url_parameters: String,
51     io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>,
52     handshaker: HandshakeServer,
53     unpacketizer: ChunkUnpacketizer,
54     state: ServerSessionState,
55     bytesio_data: BytesMut,
56     has_remaing_data: bool,
57     /* Used to mark the subscriber's the data producer
58     in channels and delete it from map when unsubscribe
59     is called. */
60     pub session_id: Uuid,
61     connect_properties: ConnectProperties,
62     pub common: Common,
63     /*configure how many gops will be cached.*/
64     gop_num: usize,
65 }
66 
67 impl ServerSession {
68     pub fn new(stream: TcpStream, event_producer: StreamHubEventSender, gop_num: usize) -> Self {
69         let remote_addr = if let Ok(addr) = stream.peer_addr() {
70             log::info!("server session: {}", addr.to_string());
71             Some(addr)
72         } else {
73             None
74         };
75 
76         let tcp_io: Box<dyn TNetIO + Send + Sync> = Box::new(TcpIO::new(stream));
77         let net_io = Arc::new(Mutex::new(tcp_io));
78 
79         let subscriber_id = Uuid::new(RandomDigitCount::Four);
80         Self {
81             app_name: String::from(""),
82             stream_name: String::from(""),
83             url_parameters: String::from(""),
84             io: Arc::clone(&net_io),
85             handshaker: HandshakeServer::new(Arc::clone(&net_io)),
86             unpacketizer: ChunkUnpacketizer::new(),
87             state: ServerSessionState::Handshake,
88             common: Common::new(
89                 Arc::clone(&net_io),
90                 event_producer,
91                 SessionType::Server,
92                 remote_addr,
93             ),
94             session_id: subscriber_id,
95             bytesio_data: BytesMut::new(),
96             has_remaing_data: false,
97             connect_properties: ConnectProperties::default(),
98             gop_num,
99         }
100     }
101 
102     pub async fn run(&mut self) -> Result<(), SessionError> {
103         loop {
104             match self.state {
105                 ServerSessionState::Handshake => {
106                     self.handshake().await?;
107                 }
108                 ServerSessionState::ReadChunk => {
109                     self.read_parse_chunks().await?;
110                 }
111                 ServerSessionState::Play => {
112                     self.play().await?;
113                 }
114                 ServerSessionState::DeleteStream => {
115                     return Ok(());
116                 }
117             }
118         }
119 
120         //Ok(())
121     }
122 
123     async fn handshake(&mut self) -> Result<(), SessionError> {
124         let mut bytes_len = 0;
125 
126         while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE {
127             self.bytesio_data = self.io.lock().await.read().await?;
128             bytes_len += self.bytesio_data.len();
129             self.handshaker.extend_data(&self.bytesio_data[..]);
130         }
131 
132         self.handshaker.handshake().await?;
133 
134         if let ServerHandshakeState::Finish = self.handshaker.state() {
135             self.state = ServerSessionState::ReadChunk;
136             let left_bytes = self.handshaker.get_remaining_bytes();
137             if !left_bytes.is_empty() {
138                 self.unpacketizer.extend_data(&left_bytes[..]);
139                 self.has_remaing_data = true;
140             }
141             log::info!("[ S->C ] [send_set_chunk_size] ");
142             self.send_set_chunk_size().await?;
143             return Ok(());
144         }
145 
146         Ok(())
147     }
148 
149     async fn read_parse_chunks(&mut self) -> Result<(), SessionError> {
150         if !self.has_remaing_data {
151             match self
152                 .io
153                 .lock()
154                 .await
155                 .read_timeout(Duration::from_secs(2))
156                 .await
157             {
158                 Ok(data) => {
159                     self.bytesio_data = data;
160                 }
161                 Err(err) => {
162                     self.common
163                         .unpublish_to_channels(
164                             self.app_name.clone(),
165                             self.stream_name.clone(),
166                             self.session_id,
167                         )
168                         .await?;
169 
170                     return Err(SessionError {
171                         value: SessionErrorValue::BytesIOError(err),
172                     });
173                 }
174             }
175 
176             self.unpacketizer.extend_data(&self.bytesio_data[..]);
177         }
178 
179         self.has_remaing_data = false;
180 
181         loop {
182             let result = self.unpacketizer.read_chunks();
183 
184             if let Ok(rv) = result {
185                 if let UnpackResult::Chunks(chunks) = rv {
186                     for chunk_info in chunks {
187                         let timestamp = chunk_info.message_header.timestamp;
188                         let msg_stream_id = chunk_info.message_header.msg_streamd_id;
189 
190                         let mut msg = MessageParser::new(chunk_info).parse()?;
191                         self.process_messages(&mut msg, &msg_stream_id, &timestamp)
192                             .await?;
193                     }
194                 }
195             } else {
196                 break;
197             }
198         }
199         Ok(())
200     }
201 
202     async fn play(&mut self) -> Result<(), SessionError> {
203         match self.common.send_channel_data().await {
204             Ok(_) => {}
205             Err(err) => {
206                 self.common
207                     .unsubscribe_from_channels(
208                         self.app_name.clone(),
209                         self.stream_name.clone(),
210                         self.session_id,
211                     )
212                     .await?;
213                 return Err(err);
214             }
215         }
216 
217         Ok(())
218     }
219 
220     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
221         let mut controlmessage =
222             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
223         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
224 
225         Ok(())
226     }
227 
228     pub async fn process_messages(
229         &mut self,
230         rtmp_msg: &mut RtmpMessageData,
231         msg_stream_id: &u32,
232         timestamp: &u32,
233     ) -> Result<(), SessionError> {
234         match rtmp_msg {
235             RtmpMessageData::Amf0Command {
236                 command_name,
237                 transaction_id,
238                 command_object,
239                 others,
240             } => {
241                 self.on_amf0_command_message(
242                     msg_stream_id,
243                     command_name,
244                     transaction_id,
245                     command_object,
246                     others,
247                 )
248                 .await?
249             }
250             RtmpMessageData::SetChunkSize { chunk_size } => {
251                 self.on_set_chunk_size(*chunk_size as usize)?;
252             }
253             RtmpMessageData::AudioData { data } => {
254                 self.common.on_audio_data(data, timestamp).await?;
255             }
256             RtmpMessageData::VideoData { data } => {
257                 self.common.on_video_data(data, timestamp).await?;
258             }
259             RtmpMessageData::AmfData { raw_data } => {
260                 self.common.on_meta_data(raw_data, timestamp).await?;
261             }
262 
263             _ => {}
264         }
265         Ok(())
266     }
267 
268     pub async fn on_amf0_command_message(
269         &mut self,
270         stream_id: &u32,
271         command_name: &Amf0ValueType,
272         transaction_id: &Amf0ValueType,
273         command_object: &Amf0ValueType,
274         others: &mut Vec<Amf0ValueType>,
275     ) -> Result<(), SessionError> {
276         let empty_cmd_name = &String::new();
277         let cmd_name = match command_name {
278             Amf0ValueType::UTF8String(str) => str,
279             _ => empty_cmd_name,
280         };
281 
282         let transaction_id = match transaction_id {
283             Amf0ValueType::Number(number) => number,
284             _ => &0.0,
285         };
286 
287         let empty_cmd_obj: IndexMap<String, Amf0ValueType> = IndexMap::new();
288         let obj = match command_object {
289             Amf0ValueType::Object(obj) => obj,
290             _ => &empty_cmd_obj,
291         };
292 
293         match cmd_name.as_str() {
294             "connect" => {
295                 log::info!("[ S<-C ] [connect] ");
296                 self.on_connect(transaction_id, obj).await?;
297             }
298             "createStream" => {
299                 log::info!("[ S<-C ] [create stream] ");
300                 self.on_create_stream(transaction_id).await?;
301             }
302             "deleteStream" => {
303                 if !others.is_empty() {
304                     let stream_id = match others.pop() {
305                         Some(Amf0ValueType::Number(streamid)) => streamid,
306                         _ => 0.0,
307                     };
308 
309                     log::info!(
310                         "[ S<-C ] [delete stream] app_name: {}, stream_name: {}",
311                         self.app_name,
312                         self.stream_name
313                     );
314 
315                     self.on_delete_stream(transaction_id, &stream_id).await?;
316                     self.state = ServerSessionState::DeleteStream;
317                 }
318             }
319             "play" => {
320                 log::info!(
321                     "[ S<-C ] [play]  app_name: {}, stream_name: {}",
322                     self.app_name,
323                     self.stream_name
324                 );
325                 self.unpacketizer.session_type = config::SERVER_PULL;
326                 self.on_play(transaction_id, stream_id, others).await?;
327             }
328             "publish" => {
329                 self.unpacketizer.session_type = config::SERVER_PUSH;
330                 self.on_publish(transaction_id, stream_id, others).await?;
331             }
332             _ => {}
333         }
334 
335         Ok(())
336     }
337 
338     fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> {
339         log::info!(
340             "[ S<-C ] [set chunk size]  app_name: {}, stream_name: {}, chunk size: {}",
341             self.app_name,
342             self.stream_name,
343             chunk_size
344         );
345         self.unpacketizer.update_max_chunk_size(chunk_size);
346         Ok(())
347     }
348 
349     fn parse_connect_properties(&mut self, command_obj: &IndexMap<String, Amf0ValueType>) {
350         for (property, value) in command_obj {
351             match property.as_str() {
352                 "app" => {
353                     if let Amf0ValueType::UTF8String(app) = value {
354                         self.connect_properties.app = Some(app.clone());
355                     }
356                 }
357                 "flashVer" => {
358                     if let Amf0ValueType::UTF8String(flash_ver) = value {
359                         self.connect_properties.flash_ver = Some(flash_ver.clone());
360                     }
361                 }
362                 "swfUrl" => {
363                     if let Amf0ValueType::UTF8String(swf_url) = value {
364                         self.connect_properties.swf_url = Some(swf_url.clone());
365                     }
366                 }
367                 "tcUrl" => {
368                     if let Amf0ValueType::UTF8String(tc_url) = value {
369                         self.connect_properties.tc_url = Some(tc_url.clone());
370                     }
371                 }
372                 "fpad" => {
373                     if let Amf0ValueType::Boolean(fpad) = value {
374                         self.connect_properties.fpad = Some(*fpad);
375                     }
376                 }
377                 "audioCodecs" => {
378                     if let Amf0ValueType::Number(audio_codecs) = value {
379                         self.connect_properties.audio_codecs = Some(*audio_codecs);
380                     }
381                 }
382                 "videoCodecs" => {
383                     if let Amf0ValueType::Number(video_codecs) = value {
384                         self.connect_properties.video_codecs = Some(*video_codecs);
385                     }
386                 }
387                 "videoFunction" => {
388                     if let Amf0ValueType::Number(video_function) = value {
389                         self.connect_properties.video_function = Some(*video_function);
390                     }
391                 }
392                 "pageUrl" => {
393                     if let Amf0ValueType::UTF8String(page_url) = value {
394                         self.connect_properties.page_url = Some(page_url.clone());
395                     }
396                 }
397                 "objectEncoding" => {
398                     if let Amf0ValueType::Number(object_encoding) = value {
399                         self.connect_properties.object_encoding = Some(*object_encoding);
400                     }
401                 }
402                 _ => {
403                     log::warn!("unknown connect properties: {}:{:?}", property, value);
404                 }
405             }
406         }
407     }
408 
409     async fn on_connect(
410         &mut self,
411         transaction_id: &f64,
412         command_obj: &IndexMap<String, Amf0ValueType>,
413     ) -> Result<(), SessionError> {
414         self.parse_connect_properties(command_obj);
415         log::info!("connect properties: {:?}", self.connect_properties);
416         let mut control_message =
417             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
418         log::info!("[ S->C ] [set window_acknowledgement_size]");
419         control_message
420             .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)
421             .await?;
422 
423         log::info!("[ S->C ] [set set_peer_bandwidth]",);
424         control_message
425             .write_set_peer_bandwidth(
426                 define::PEER_BANDWIDTH,
427                 define::peer_bandwidth_limit_type::DYNAMIC,
428             )
429             .await?;
430 
431         let obj_encoding = command_obj.get("objectEncoding");
432         let encoding = match obj_encoding {
433             Some(Amf0ValueType::Number(encoding)) => encoding,
434             _ => &define::OBJENCODING_AMF0,
435         };
436 
437         let app_name = command_obj.get("app");
438         self.app_name = match app_name {
439             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
440             _ => {
441                 return Err(SessionError {
442                     value: SessionErrorValue::NoAppName,
443                 });
444             }
445         };
446 
447         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
448         log::info!("[ S->C ] [set connect_response]",);
449         netconnection
450             .write_connect_response(
451                 transaction_id,
452                 define::FMSVER,
453                 &define::CAPABILITIES,
454                 &String::from("NetConnection.Connect.Success"),
455                 define::LEVEL,
456                 &String::from("Connection Succeeded."),
457                 encoding,
458             )
459             .await?;
460 
461         Ok(())
462     }
463 
464     pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
465         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
466         netconnection
467             .write_create_stream_response(transaction_id, &define::STREAM_ID)
468             .await?;
469 
470         log::info!(
471             "[ S->C ] [create_stream_response]  app_name: {}",
472             self.app_name,
473         );
474 
475         Ok(())
476     }
477 
478     pub async fn on_delete_stream(
479         &mut self,
480         transaction_id: &f64,
481         stream_id: &f64,
482     ) -> Result<(), SessionError> {
483         self.common
484             .unpublish_to_channels(
485                 self.app_name.clone(),
486                 self.stream_name.clone(),
487                 self.session_id,
488             )
489             .await?;
490 
491         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
492         netstream
493             .write_on_status(
494                 transaction_id,
495                 "status",
496                 "NetStream.DeleteStream.Suceess",
497                 "",
498             )
499             .await?;
500 
501         //self.unsubscribe_from_channels().await?;
502         log::info!(
503             "[ S->C ] [delete stream success]  app_name: {}, stream_name: {}",
504             self.app_name,
505             self.stream_name
506         );
507         log::trace!("{}", stream_id);
508 
509         Ok(())
510     }
511 
512     fn get_request_url(&mut self, raw_stream_name: String) -> String {
513         if let Some(tc_url) = &self.connect_properties.tc_url {
514             format!("{tc_url}/{raw_stream_name}")
515         } else {
516             format!("{}/{}", self.app_name.clone(), raw_stream_name)
517         }
518     }
519 
520     #[allow(clippy::never_loop)]
521     pub async fn on_play(
522         &mut self,
523         transaction_id: &f64,
524         stream_id: &u32,
525         other_values: &mut Vec<Amf0ValueType>,
526     ) -> Result<(), SessionError> {
527         let length = other_values.len() as u8;
528         let mut index: u8 = 0;
529 
530         let mut stream_name: Option<String> = None;
531         let mut start: Option<f64> = None;
532         let mut duration: Option<f64> = None;
533         let mut reset: Option<bool> = None;
534 
535         loop {
536             if index >= length {
537                 break;
538             }
539             index += 1;
540             stream_name = match other_values.remove(0) {
541                 Amf0ValueType::UTF8String(val) => Some(val),
542                 _ => None,
543             };
544 
545             if index >= length {
546                 break;
547             }
548             index += 1;
549             start = match other_values.remove(0) {
550                 Amf0ValueType::Number(val) => Some(val),
551                 _ => None,
552             };
553 
554             if index >= length {
555                 break;
556             }
557             index += 1;
558             duration = match other_values.remove(0) {
559                 Amf0ValueType::Number(val) => Some(val),
560                 _ => None,
561             };
562 
563             if index >= length {
564                 break;
565             }
566             //index = index + 1;
567             reset = match other_values.remove(0) {
568                 Amf0ValueType::Boolean(val) => Some(val),
569                 _ => None,
570             };
571             break;
572         }
573 
574         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
575         event_messages.write_stream_begin(*stream_id).await?;
576         log::info!(
577             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}",
578             self.app_name,
579             self.stream_name
580         );
581         log::trace!(
582             "{} {} {}",
583             start.is_some(),
584             duration.is_some(),
585             reset.is_some()
586         );
587 
588         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
589         netstream
590             .write_on_status(transaction_id, "status", "NetStream.Play.Reset", "reset")
591             .await?;
592 
593         netstream
594             .write_on_status(
595                 transaction_id,
596                 "status",
597                 "NetStream.Play.Start",
598                 "play start",
599             )
600             .await?;
601 
602         netstream
603             .write_on_status(
604                 transaction_id,
605                 "status",
606                 "NetStream.Data.Start",
607                 "data start.",
608             )
609             .await?;
610 
611         netstream
612             .write_on_status(
613                 transaction_id,
614                 "status",
615                 "NetStream.Play.PublishNotify",
616                 "play publish notify.",
617             )
618             .await?;
619 
620         event_messages.write_stream_is_record(*stream_id).await?;
621 
622         let raw_stream_name = stream_name.unwrap();
623 
624         (self.stream_name, self.url_parameters) = RtmpUrlParser::default()
625             .set_raw_stream_name(raw_stream_name.clone())
626             .parse_raw_stream_name();
627 
628         log::info!(
629             "[ S->C ] [stream is record]  app_name: {}, stream_name: {}, url parameters: {}",
630             self.app_name,
631             self.stream_name,
632             self.url_parameters
633         );
634 
635         /*Now it can update the request url*/
636         self.common.request_url = self.get_request_url(raw_stream_name);
637         self.common
638             .subscribe_from_channels(
639                 self.app_name.clone(),
640                 self.stream_name.clone(),
641                 self.session_id,
642             )
643             .await?;
644 
645         self.state = ServerSessionState::Play;
646 
647         Ok(())
648     }
649 
650     pub async fn on_publish(
651         &mut self,
652         transaction_id: &f64,
653         stream_id: &u32,
654         other_values: &mut Vec<Amf0ValueType>,
655     ) -> Result<(), SessionError> {
656         let length = other_values.len();
657 
658         if length < 2 {
659             return Err(SessionError {
660                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
661             });
662         }
663 
664         let raw_stream_name = match other_values.remove(0) {
665             Amf0ValueType::UTF8String(val) => val,
666             _ => {
667                 return Err(SessionError {
668                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
669                 });
670             }
671         };
672 
673         (self.stream_name, self.url_parameters) = RtmpUrlParser::default()
674             .set_raw_stream_name(raw_stream_name.clone())
675             .parse_raw_stream_name();
676 
677         /*Now it can update the request url*/
678         self.common.request_url = self.get_request_url(raw_stream_name);
679 
680         let _ = match other_values.remove(0) {
681             Amf0ValueType::UTF8String(val) => val,
682             _ => {
683                 return Err(SessionError {
684                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
685                 });
686             }
687         };
688 
689         log::info!(
690             "[ S<-C ] [publish]  app_name: {}, stream_name: {}, url parameters: {}",
691             self.app_name,
692             self.stream_name,
693             self.url_parameters
694         );
695 
696         log::info!(
697             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}, url parameters: {}",
698             self.app_name,
699             self.stream_name,
700             self.url_parameters
701         );
702 
703         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
704         event_messages.write_stream_begin(*stream_id).await?;
705 
706         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
707         netstream
708             .write_on_status(transaction_id, "status", "NetStream.Publish.Start", "")
709             .await?;
710         log::info!(
711             "[ S->C ] [NetStream.Publish.Start]  app_name: {}, stream_name: {}",
712             self.app_name,
713             self.stream_name
714         );
715 
716         self.common
717             .publish_to_channels(
718                 self.app_name.clone(),
719                 self.stream_name.clone(),
720                 self.session_id,
721                 self.gop_num,
722             )
723             .await?;
724 
725         Ok(())
726     }
727 }
728