1 use {
2     super::{
3         common::Common,
4         define,
5         define::SessionType,
6         errors::{SessionError, SessionErrorValue},
7     },
8     crate::{
9         amf0::Amf0ValueType,
10         channels::define::ChannelEventProducer,
11         chunk::{
12             define::CHUNK_SIZE,
13             unpacketizer::{ChunkUnpacketizer, UnpackResult},
14         },
15         config, handshake,
16         handshake::{define::ServerHandshakeState, handshake_server::HandshakeServer},
17         messages::{define::RtmpMessageData, parser::MessageParser},
18         netconnection::writer::{ConnectProperties, NetConnection},
19         netstream::writer::NetStreamWriter,
20         protocol_control_messages::writer::ProtocolControlMessagesWriter,
21         user_control_messages::writer::EventMessagesWriter,
22     },
23     bytes::BytesMut,
24     bytesio::{bytes_writer::AsyncBytesWriter, bytesio::BytesIO},
25     std::{collections::HashMap, sync::Arc, time::Duration},
26     tokio::{net::TcpStream, sync::Mutex},
27     uuid::Uuid,
28 };
29 
30 enum ServerSessionState {
31     Handshake,
32     ReadChunk,
33     // OnConnect,
34     // OnCreateStream,
35     //Publish,
36     DeleteStream,
37     Play,
38 }
39 
40 pub struct ServerSession {
41     pub app_name: String,
42     pub stream_name: String,
43     pub url_parameters: String,
44     io: Arc<Mutex<BytesIO>>,
45     handshaker: HandshakeServer,
46     unpacketizer: ChunkUnpacketizer,
47     state: ServerSessionState,
48     pub common: Common,
49     bytesio_data: BytesMut,
50     has_remaing_data: bool,
51     /* Used to mark the subscriber's the data producer
52     in channels and delete it from map when unsubscribe
53     is called. */
54     pub session_id: Uuid,
55     connect_properties: ConnectProperties,
56 }
57 
58 impl ServerSession {
59     pub fn new(stream: TcpStream, event_producer: ChannelEventProducer) -> Self {
60         let remote_addr = if let Ok(addr) = stream.peer_addr() {
61             log::info!("server session: {}", addr.to_string());
62             Some(addr)
63         } else {
64             None
65         };
66 
67         let net_io = Arc::new(Mutex::new(BytesIO::new(stream)));
68         let subscriber_id = Uuid::new_v4();
69         Self {
70             app_name: String::from(""),
71             stream_name: String::from(""),
72             url_parameters: String::from(""),
73             io: Arc::clone(&net_io),
74             handshaker: HandshakeServer::new(Arc::clone(&net_io)),
75             unpacketizer: ChunkUnpacketizer::new(),
76             state: ServerSessionState::Handshake,
77             common: Common::new(
78                 Arc::clone(&net_io),
79                 event_producer,
80                 SessionType::Server,
81                 remote_addr,
82             ),
83             session_id: subscriber_id,
84             bytesio_data: BytesMut::new(),
85             has_remaing_data: false,
86             connect_properties: ConnectProperties::default(),
87         }
88     }
89 
90     pub async fn run(&mut self) -> Result<(), SessionError> {
91         loop {
92             match self.state {
93                 ServerSessionState::Handshake => {
94                     self.handshake().await?;
95                 }
96                 ServerSessionState::ReadChunk => {
97                     self.read_parse_chunks().await?;
98                 }
99                 ServerSessionState::Play => {
100                     self.play().await?;
101                 }
102                 ServerSessionState::DeleteStream => {
103                     return Ok(());
104                 }
105             }
106         }
107 
108         //Ok(())
109     }
110 
111     async fn handshake(&mut self) -> Result<(), SessionError> {
112         let mut bytes_len = 0;
113 
114         while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE {
115             self.bytesio_data = self.io.lock().await.read().await?;
116             bytes_len += self.bytesio_data.len();
117             self.handshaker.extend_data(&self.bytesio_data[..]);
118         }
119 
120         self.handshaker.handshake().await?;
121 
122         if let ServerHandshakeState::Finish = self.handshaker.state() {
123             self.state = ServerSessionState::ReadChunk;
124             let left_bytes = self.handshaker.get_remaining_bytes();
125             if !left_bytes.is_empty() {
126                 self.unpacketizer.extend_data(&left_bytes[..]);
127                 self.has_remaing_data = true;
128             }
129             log::info!("[ S->C ] [send_set_chunk_size] ");
130             self.send_set_chunk_size().await?;
131             return Ok(());
132         }
133 
134         Ok(())
135     }
136 
137     async fn read_parse_chunks(&mut self) -> Result<(), SessionError> {
138         if !self.has_remaing_data {
139             match self
140                 .io
141                 .lock()
142                 .await
143                 .read_timeout(Duration::from_secs(2))
144                 .await
145             {
146                 Ok(data) => {
147                     self.bytesio_data = data;
148                 }
149                 Err(err) => {
150                     self.common
151                         .unpublish_to_channels(
152                             self.app_name.clone(),
153                             self.stream_name.clone(),
154                             self.session_id,
155                         )
156                         .await?;
157 
158                     return Err(SessionError {
159                         value: SessionErrorValue::BytesIOError(err),
160                     });
161                 }
162             }
163 
164             self.unpacketizer.extend_data(&self.bytesio_data[..]);
165         }
166 
167         self.has_remaing_data = false;
168 
169         loop {
170             let result = self.unpacketizer.read_chunks();
171 
172             if let Ok(rv) = result {
173                 if let UnpackResult::Chunks(chunks) = rv {
174                     for chunk_info in chunks {
175                         let timestamp = chunk_info.message_header.timestamp;
176                         let msg_stream_id = chunk_info.message_header.msg_streamd_id;
177 
178                         let mut msg = MessageParser::new(chunk_info).parse()?;
179                         self.process_messages(&mut msg, &msg_stream_id, &timestamp)
180                             .await?;
181                     }
182                 }
183             } else {
184                 break;
185             }
186         }
187         Ok(())
188     }
189 
190     async fn play(&mut self) -> Result<(), SessionError> {
191         match self.common.send_channel_data().await {
192             Ok(_) => {}
193             Err(err) => {
194                 self.common
195                     .unsubscribe_from_channels(
196                         self.app_name.clone(),
197                         self.stream_name.clone(),
198                         self.session_id,
199                     )
200                     .await?;
201                 return Err(err);
202             }
203         }
204 
205         Ok(())
206     }
207 
208     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
209         let mut controlmessage =
210             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
211         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
212 
213         Ok(())
214     }
215 
216     pub async fn process_messages(
217         &mut self,
218         rtmp_msg: &mut RtmpMessageData,
219         msg_stream_id: &u32,
220         timestamp: &u32,
221     ) -> Result<(), SessionError> {
222         match rtmp_msg {
223             RtmpMessageData::Amf0Command {
224                 command_name,
225                 transaction_id,
226                 command_object,
227                 others,
228             } => {
229                 self.on_amf0_command_message(
230                     msg_stream_id,
231                     command_name,
232                     transaction_id,
233                     command_object,
234                     others,
235                 )
236                 .await?
237             }
238             RtmpMessageData::SetChunkSize { chunk_size } => {
239                 self.on_set_chunk_size(*chunk_size as usize)?;
240             }
241             RtmpMessageData::AudioData { data } => {
242                 self.common.on_audio_data(data, timestamp)?;
243             }
244             RtmpMessageData::VideoData { data } => {
245                 self.common.on_video_data(data, timestamp)?;
246             }
247             RtmpMessageData::AmfData { raw_data } => {
248                 self.common.on_meta_data(raw_data, timestamp)?;
249             }
250 
251             _ => {}
252         }
253         Ok(())
254     }
255 
256     pub async fn on_amf0_command_message(
257         &mut self,
258         stream_id: &u32,
259         command_name: &Amf0ValueType,
260         transaction_id: &Amf0ValueType,
261         command_object: &Amf0ValueType,
262         others: &mut Vec<Amf0ValueType>,
263     ) -> Result<(), SessionError> {
264         let empty_cmd_name = &String::new();
265         let cmd_name = match command_name {
266             Amf0ValueType::UTF8String(str) => str,
267             _ => empty_cmd_name,
268         };
269 
270         let transaction_id = match transaction_id {
271             Amf0ValueType::Number(number) => number,
272             _ => &0.0,
273         };
274 
275         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
276         let obj = match command_object {
277             Amf0ValueType::Object(obj) => obj,
278             _ => &empty_cmd_obj,
279         };
280 
281         match cmd_name.as_str() {
282             "connect" => {
283                 log::info!("[ S<-C ] [connect] ");
284                 self.on_connect(transaction_id, obj).await?;
285             }
286             "createStream" => {
287                 log::info!("[ S<-C ] [create stream] ");
288                 self.on_create_stream(transaction_id).await?;
289             }
290             "deleteStream" => {
291                 if !others.is_empty() {
292                     let stream_id = match others.pop() {
293                         Some(Amf0ValueType::Number(streamid)) => streamid,
294                         _ => 0.0,
295                     };
296 
297                     log::info!(
298                         "[ S<-C ] [delete stream] app_name: {}, stream_name: {}",
299                         self.app_name,
300                         self.stream_name
301                     );
302 
303                     self.on_delete_stream(transaction_id, &stream_id).await?;
304                     self.state = ServerSessionState::DeleteStream;
305                 }
306             }
307             "play" => {
308                 log::info!(
309                     "[ S<-C ] [play]  app_name: {}, stream_name: {}",
310                     self.app_name,
311                     self.stream_name
312                 );
313                 self.unpacketizer.session_type = config::SERVER_PULL;
314                 self.on_play(transaction_id, stream_id, others).await?;
315             }
316             "publish" => {
317                 self.unpacketizer.session_type = config::SERVER_PUSH;
318                 self.on_publish(transaction_id, stream_id, others).await?;
319             }
320             _ => {}
321         }
322 
323         Ok(())
324     }
325 
326     fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> {
327         log::info!(
328             "[ S<-C ] [set chunk size]  app_name: {}, stream_name: {}, chunk size: {}",
329             self.app_name,
330             self.stream_name,
331             chunk_size
332         );
333         self.unpacketizer.update_max_chunk_size(chunk_size);
334         Ok(())
335     }
336 
337     fn parse_connect_properties(&mut self, command_obj: &HashMap<String, Amf0ValueType>) {
338         for (property, value) in command_obj {
339             match property.as_str() {
340                 "app" => {
341                     if let Amf0ValueType::UTF8String(app) = value {
342                         self.connect_properties.app = Some(app.clone());
343                     }
344                 }
345                 "flashVer" => {
346                     if let Amf0ValueType::UTF8String(flash_ver) = value {
347                         self.connect_properties.flash_ver = Some(flash_ver.clone());
348                     }
349                 }
350                 "swfUrl" => {
351                     if let Amf0ValueType::UTF8String(swf_url) = value {
352                         self.connect_properties.swf_url = Some(swf_url.clone());
353                     }
354                 }
355                 "tcUrl" => {
356                     if let Amf0ValueType::UTF8String(tc_url) = value {
357                         self.connect_properties.tc_url = Some(tc_url.clone());
358                     }
359                 }
360                 "fpad" => {
361                     if let Amf0ValueType::Boolean(fpad) = value {
362                         self.connect_properties.fpad = Some(*fpad);
363                     }
364                 }
365                 "audioCodecs" => {
366                     if let Amf0ValueType::Number(audio_codecs) = value {
367                         self.connect_properties.audio_codecs = Some(*audio_codecs);
368                     }
369                 }
370                 "videoCodecs" => {
371                     if let Amf0ValueType::Number(video_codecs) = value {
372                         self.connect_properties.video_codecs = Some(*video_codecs);
373                     }
374                 }
375                 "videoFunction" => {
376                     if let Amf0ValueType::Number(video_function) = value {
377                         self.connect_properties.video_function = Some(*video_function);
378                     }
379                 }
380                 "pageUrl" => {
381                     if let Amf0ValueType::UTF8String(page_url) = value {
382                         self.connect_properties.page_url = Some(page_url.clone());
383                     }
384                 }
385                 "objectEncoding" => {
386                     if let Amf0ValueType::Number(object_encoding) = value {
387                         self.connect_properties.object_encoding = Some(*object_encoding);
388                     }
389                 }
390                 _ => {
391                     log::warn!("unknown connect properties: {}:{:?}", property, value);
392                 }
393             }
394         }
395     }
396 
397     async fn on_connect(
398         &mut self,
399         transaction_id: &f64,
400         command_obj: &HashMap<String, Amf0ValueType>,
401     ) -> Result<(), SessionError> {
402         self.parse_connect_properties(command_obj);
403         log::info!("connect properties: {:?}", self.connect_properties);
404         let mut control_message =
405             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
406         log::info!("[ S->C ] [set window_acknowledgement_size]");
407         control_message
408             .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)
409             .await?;
410 
411         log::info!("[ S->C ] [set set_peer_bandwidth]",);
412         control_message
413             .write_set_peer_bandwidth(
414                 define::PEER_BANDWIDTH,
415                 define::peer_bandwidth_limit_type::DYNAMIC,
416             )
417             .await?;
418 
419         let obj_encoding = command_obj.get("objectEncoding");
420         let encoding = match obj_encoding {
421             Some(Amf0ValueType::Number(encoding)) => encoding,
422             _ => &define::OBJENCODING_AMF0,
423         };
424 
425         let app_name = command_obj.get("app");
426         self.app_name = match app_name {
427             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
428             _ => {
429                 return Err(SessionError {
430                     value: SessionErrorValue::NoAppName,
431                 });
432             }
433         };
434 
435         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
436         log::info!("[ S->C ] [set connect_response]",);
437         netconnection
438             .write_connect_response(
439                 transaction_id,
440                 define::FMSVER,
441                 &define::CAPABILITIES,
442                 &String::from("NetConnection.Connect.Success"),
443                 define::LEVEL,
444                 &String::from("Connection Succeeded."),
445                 encoding,
446             )
447             .await?;
448 
449         Ok(())
450     }
451 
452     pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
453         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
454         netconnection
455             .write_create_stream_response(transaction_id, &define::STREAM_ID)
456             .await?;
457 
458         log::info!(
459             "[ S->C ] [create_stream_response]  app_name: {}",
460             self.app_name,
461         );
462 
463         Ok(())
464     }
465 
466     pub async fn on_delete_stream(
467         &mut self,
468         transaction_id: &f64,
469         stream_id: &f64,
470     ) -> Result<(), SessionError> {
471         self.common
472             .unpublish_to_channels(
473                 self.app_name.clone(),
474                 self.stream_name.clone(),
475                 self.session_id,
476             )
477             .await?;
478 
479         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
480         netstream
481             .write_on_status(
482                 transaction_id,
483                 "status",
484                 "NetStream.DeleteStream.Suceess",
485                 "",
486             )
487             .await?;
488 
489         //self.unsubscribe_from_channels().await?;
490         log::info!(
491             "[ S->C ] [delete stream success]  app_name: {}, stream_name: {}",
492             self.app_name,
493             self.stream_name
494         );
495         log::trace!("{}", stream_id);
496 
497         Ok(())
498     }
499 
500     fn get_request_url(&mut self, raw_stream_name: String) -> String {
501         if let Some(tc_url) = &self.connect_properties.tc_url {
502             format!("{tc_url}/{raw_stream_name}")
503         } else {
504             format!("{}/{}", self.app_name.clone(), raw_stream_name)
505         }
506     }
507     /*parse the raw stream name to get real stream name and the URL parameters*/
508     fn parse_raw_stream_name(&mut self, raw_stream_name: String) {
509         let data: Vec<&str> = raw_stream_name.split('?').collect();
510         self.stream_name = data[0].to_string();
511         if data.len() > 1 {
512             self.url_parameters = data[1].to_string();
513         }
514     }
515 
516     #[allow(clippy::never_loop)]
517     pub async fn on_play(
518         &mut self,
519         transaction_id: &f64,
520         stream_id: &u32,
521         other_values: &mut Vec<Amf0ValueType>,
522     ) -> Result<(), SessionError> {
523         let length = other_values.len() as u8;
524         let mut index: u8 = 0;
525 
526         let mut stream_name: Option<String> = None;
527         let mut start: Option<f64> = None;
528         let mut duration: Option<f64> = None;
529         let mut reset: Option<bool> = None;
530 
531         loop {
532             if index >= length {
533                 break;
534             }
535             index += 1;
536             stream_name = match other_values.remove(0) {
537                 Amf0ValueType::UTF8String(val) => Some(val),
538                 _ => None,
539             };
540 
541             if index >= length {
542                 break;
543             }
544             index += 1;
545             start = match other_values.remove(0) {
546                 Amf0ValueType::Number(val) => Some(val),
547                 _ => None,
548             };
549 
550             if index >= length {
551                 break;
552             }
553             index += 1;
554             duration = match other_values.remove(0) {
555                 Amf0ValueType::Number(val) => Some(val),
556                 _ => None,
557             };
558 
559             if index >= length {
560                 break;
561             }
562             //index = index + 1;
563             reset = match other_values.remove(0) {
564                 Amf0ValueType::Boolean(val) => Some(val),
565                 _ => None,
566             };
567             break;
568         }
569 
570         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
571         event_messages.write_stream_begin(*stream_id).await?;
572         log::info!(
573             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}",
574             self.app_name,
575             self.stream_name
576         );
577         log::trace!(
578             "{} {} {}",
579             start.is_some(),
580             duration.is_some(),
581             reset.is_some()
582         );
583 
584         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
585         netstream
586             .write_on_status(transaction_id, "status", "NetStream.Play.Reset", "reset")
587             .await?;
588 
589         netstream
590             .write_on_status(
591                 transaction_id,
592                 "status",
593                 "NetStream.Play.Start",
594                 "play start",
595             )
596             .await?;
597 
598         netstream
599             .write_on_status(
600                 transaction_id,
601                 "status",
602                 "NetStream.Data.Start",
603                 "data start.",
604             )
605             .await?;
606 
607         netstream
608             .write_on_status(
609                 transaction_id,
610                 "status",
611                 "NetStream.Play.PublishNotify",
612                 "play publish notify.",
613             )
614             .await?;
615 
616         event_messages.write_stream_is_record(*stream_id).await?;
617 
618         let raw_stream_name = stream_name.unwrap();
619         self.parse_raw_stream_name(raw_stream_name.clone());
620 
621         log::info!(
622             "[ S->C ] [stream is record]  app_name: {}, stream_name: {}, url parameters: {}",
623             self.app_name,
624             self.stream_name,
625             self.url_parameters
626         );
627 
628         /*Now it can update the request url*/
629         self.common.request_url = self.get_request_url(raw_stream_name);
630         self.common
631             .subscribe_from_channels(
632                 self.app_name.clone(),
633                 self.stream_name.clone(),
634                 self.session_id,
635             )
636             .await?;
637 
638         self.state = ServerSessionState::Play;
639 
640         Ok(())
641     }
642 
643     pub async fn on_publish(
644         &mut self,
645         transaction_id: &f64,
646         stream_id: &u32,
647         other_values: &mut Vec<Amf0ValueType>,
648     ) -> Result<(), SessionError> {
649         let length = other_values.len();
650 
651         if length < 2 {
652             return Err(SessionError {
653                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
654             });
655         }
656 
657         let raw_stream_name = match other_values.remove(0) {
658             Amf0ValueType::UTF8String(val) => val,
659             _ => {
660                 return Err(SessionError {
661                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
662                 });
663             }
664         };
665 
666         self.parse_raw_stream_name(raw_stream_name.clone());
667         /*Now it can update the request url*/
668         self.common.request_url = self.get_request_url(raw_stream_name);
669 
670         let _ = match other_values.remove(0) {
671             Amf0ValueType::UTF8String(val) => val,
672             _ => {
673                 return Err(SessionError {
674                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
675                 });
676             }
677         };
678 
679         log::info!(
680             "[ S<-C ] [publish]  app_name: {}, stream_name: {}, url parameters: {}",
681             self.app_name,
682             self.stream_name,
683             self.url_parameters
684         );
685 
686         log::info!(
687             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}, url parameters: {}",
688             self.app_name,
689             self.stream_name,
690             self.url_parameters
691         );
692 
693         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
694         event_messages.write_stream_begin(*stream_id).await?;
695 
696         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
697         netstream
698             .write_on_status(transaction_id, "status", "NetStream.Publish.Start", "")
699             .await?;
700         log::info!(
701             "[ S->C ] [NetStream.Publish.Start]  app_name: {}, stream_name: {}",
702             self.app_name,
703             self.stream_name
704         );
705 
706         self.common
707             .publish_to_channels(
708                 self.app_name.clone(),
709                 self.stream_name.clone(),
710                 self.session_id,
711             )
712             .await?;
713 
714         Ok(())
715     }
716 }
717