1 use {
2     super::{
3         common::Common,
4         define,
5         define::SessionType,
6         errors::{SessionError, SessionErrorValue},
7     },
8     crate::{
9         amf0::Amf0ValueType,
10         channels::define::ChannelEventProducer,
11         chunk::{
12             define::CHUNK_SIZE,
13             unpacketizer::{ChunkUnpacketizer, UnpackResult},
14         },
15         config, handshake,
16         handshake::{define::ServerHandshakeState, handshake_server::HandshakeServer},
17         messages::{define::RtmpMessageData, parser::MessageParser},
18         netconnection::writer::{ConnectProperties, NetConnection},
19         netstream::writer::NetStreamWriter,
20         protocol_control_messages::writer::ProtocolControlMessagesWriter,
21         user_control_messages::writer::EventMessagesWriter,
22         utils::RtmpUrlParser,
23     },
24     bytes::BytesMut,
25     bytesio::{bytes_writer::AsyncBytesWriter, bytesio::BytesIO},
26     indexmap::IndexMap,
27     std::{sync::Arc, time::Duration},
28     tokio::{net::TcpStream, sync::Mutex},
29     uuid::Uuid,
30 };
31 
32 enum ServerSessionState {
33     Handshake,
34     ReadChunk,
35     // OnConnect,
36     // OnCreateStream,
37     //Publish,
38     DeleteStream,
39     Play,
40 }
41 
42 pub struct ServerSession {
43     pub app_name: String,
44     pub stream_name: String,
45     pub url_parameters: String,
46     io: Arc<Mutex<BytesIO>>,
47     handshaker: HandshakeServer,
48     unpacketizer: ChunkUnpacketizer,
49     state: ServerSessionState,
50     pub common: Common,
51     bytesio_data: BytesMut,
52     has_remaing_data: bool,
53     /* Used to mark the subscriber's the data producer
54     in channels and delete it from map when unsubscribe
55     is called. */
56     pub session_id: Uuid,
57     connect_properties: ConnectProperties,
58 }
59 
60 impl ServerSession {
61     pub fn new(stream: TcpStream, event_producer: ChannelEventProducer) -> Self {
62         let remote_addr = if let Ok(addr) = stream.peer_addr() {
63             log::info!("server session: {}", addr.to_string());
64             Some(addr)
65         } else {
66             None
67         };
68 
69         let net_io = Arc::new(Mutex::new(BytesIO::new(stream)));
70         let subscriber_id = Uuid::new_v4();
71         Self {
72             app_name: String::from(""),
73             stream_name: String::from(""),
74             url_parameters: String::from(""),
75             io: Arc::clone(&net_io),
76             handshaker: HandshakeServer::new(Arc::clone(&net_io)),
77             unpacketizer: ChunkUnpacketizer::new(),
78             state: ServerSessionState::Handshake,
79             common: Common::new(
80                 Arc::clone(&net_io),
81                 event_producer,
82                 SessionType::Server,
83                 remote_addr,
84             ),
85             session_id: subscriber_id,
86             bytesio_data: BytesMut::new(),
87             has_remaing_data: false,
88             connect_properties: ConnectProperties::default(),
89         }
90     }
91 
92     pub async fn run(&mut self) -> Result<(), SessionError> {
93         loop {
94             match self.state {
95                 ServerSessionState::Handshake => {
96                     self.handshake().await?;
97                 }
98                 ServerSessionState::ReadChunk => {
99                     self.read_parse_chunks().await?;
100                 }
101                 ServerSessionState::Play => {
102                     self.play().await?;
103                 }
104                 ServerSessionState::DeleteStream => {
105                     return Ok(());
106                 }
107             }
108         }
109 
110         //Ok(())
111     }
112 
113     async fn handshake(&mut self) -> Result<(), SessionError> {
114         let mut bytes_len = 0;
115 
116         while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE {
117             self.bytesio_data = self.io.lock().await.read().await?;
118             bytes_len += self.bytesio_data.len();
119             self.handshaker.extend_data(&self.bytesio_data[..]);
120         }
121 
122         self.handshaker.handshake().await?;
123 
124         if let ServerHandshakeState::Finish = self.handshaker.state() {
125             self.state = ServerSessionState::ReadChunk;
126             let left_bytes = self.handshaker.get_remaining_bytes();
127             if !left_bytes.is_empty() {
128                 self.unpacketizer.extend_data(&left_bytes[..]);
129                 self.has_remaing_data = true;
130             }
131             log::info!("[ S->C ] [send_set_chunk_size] ");
132             self.send_set_chunk_size().await?;
133             return Ok(());
134         }
135 
136         Ok(())
137     }
138 
139     async fn read_parse_chunks(&mut self) -> Result<(), SessionError> {
140         if !self.has_remaing_data {
141             match self
142                 .io
143                 .lock()
144                 .await
145                 .read_timeout(Duration::from_secs(2))
146                 .await
147             {
148                 Ok(data) => {
149                     self.bytesio_data = data;
150                 }
151                 Err(err) => {
152                     self.common
153                         .unpublish_to_channels(
154                             self.app_name.clone(),
155                             self.stream_name.clone(),
156                             self.session_id,
157                         )
158                         .await?;
159 
160                     return Err(SessionError {
161                         value: SessionErrorValue::BytesIOError(err),
162                     });
163                 }
164             }
165 
166             self.unpacketizer.extend_data(&self.bytesio_data[..]);
167         }
168 
169         self.has_remaing_data = false;
170 
171         loop {
172             let result = self.unpacketizer.read_chunks();
173 
174             if let Ok(rv) = result {
175                 if let UnpackResult::Chunks(chunks) = rv {
176                     for chunk_info in chunks {
177                         let timestamp = chunk_info.message_header.timestamp;
178                         let msg_stream_id = chunk_info.message_header.msg_streamd_id;
179 
180                         let mut msg = MessageParser::new(chunk_info).parse()?;
181                         self.process_messages(&mut msg, &msg_stream_id, &timestamp)
182                             .await?;
183                     }
184                 }
185             } else {
186                 break;
187             }
188         }
189         Ok(())
190     }
191 
192     async fn play(&mut self) -> Result<(), SessionError> {
193         match self.common.send_channel_data().await {
194             Ok(_) => {}
195             Err(err) => {
196                 self.common
197                     .unsubscribe_from_channels(
198                         self.app_name.clone(),
199                         self.stream_name.clone(),
200                         self.session_id,
201                     )
202                     .await?;
203                 return Err(err);
204             }
205         }
206 
207         Ok(())
208     }
209 
210     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
211         let mut controlmessage =
212             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
213         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
214 
215         Ok(())
216     }
217 
218     pub async fn process_messages(
219         &mut self,
220         rtmp_msg: &mut RtmpMessageData,
221         msg_stream_id: &u32,
222         timestamp: &u32,
223     ) -> Result<(), SessionError> {
224         match rtmp_msg {
225             RtmpMessageData::Amf0Command {
226                 command_name,
227                 transaction_id,
228                 command_object,
229                 others,
230             } => {
231                 self.on_amf0_command_message(
232                     msg_stream_id,
233                     command_name,
234                     transaction_id,
235                     command_object,
236                     others,
237                 )
238                 .await?
239             }
240             RtmpMessageData::SetChunkSize { chunk_size } => {
241                 self.on_set_chunk_size(*chunk_size as usize)?;
242             }
243             RtmpMessageData::AudioData { data } => {
244                 self.common.on_audio_data(data, timestamp)?;
245             }
246             RtmpMessageData::VideoData { data } => {
247                 self.common.on_video_data(data, timestamp)?;
248             }
249             RtmpMessageData::AmfData { raw_data } => {
250                 self.common.on_meta_data(raw_data, timestamp)?;
251             }
252 
253             _ => {}
254         }
255         Ok(())
256     }
257 
258     pub async fn on_amf0_command_message(
259         &mut self,
260         stream_id: &u32,
261         command_name: &Amf0ValueType,
262         transaction_id: &Amf0ValueType,
263         command_object: &Amf0ValueType,
264         others: &mut Vec<Amf0ValueType>,
265     ) -> Result<(), SessionError> {
266         let empty_cmd_name = &String::new();
267         let cmd_name = match command_name {
268             Amf0ValueType::UTF8String(str) => str,
269             _ => empty_cmd_name,
270         };
271 
272         let transaction_id = match transaction_id {
273             Amf0ValueType::Number(number) => number,
274             _ => &0.0,
275         };
276 
277         let empty_cmd_obj: IndexMap<String, Amf0ValueType> = IndexMap::new();
278         let obj = match command_object {
279             Amf0ValueType::Object(obj) => obj,
280             _ => &empty_cmd_obj,
281         };
282 
283         match cmd_name.as_str() {
284             "connect" => {
285                 log::info!("[ S<-C ] [connect] ");
286                 self.on_connect(transaction_id, obj).await?;
287             }
288             "createStream" => {
289                 log::info!("[ S<-C ] [create stream] ");
290                 self.on_create_stream(transaction_id).await?;
291             }
292             "deleteStream" => {
293                 if !others.is_empty() {
294                     let stream_id = match others.pop() {
295                         Some(Amf0ValueType::Number(streamid)) => streamid,
296                         _ => 0.0,
297                     };
298 
299                     log::info!(
300                         "[ S<-C ] [delete stream] app_name: {}, stream_name: {}",
301                         self.app_name,
302                         self.stream_name
303                     );
304 
305                     self.on_delete_stream(transaction_id, &stream_id).await?;
306                     self.state = ServerSessionState::DeleteStream;
307                 }
308             }
309             "play" => {
310                 log::info!(
311                     "[ S<-C ] [play]  app_name: {}, stream_name: {}",
312                     self.app_name,
313                     self.stream_name
314                 );
315                 self.unpacketizer.session_type = config::SERVER_PULL;
316                 self.on_play(transaction_id, stream_id, others).await?;
317             }
318             "publish" => {
319                 self.unpacketizer.session_type = config::SERVER_PUSH;
320                 self.on_publish(transaction_id, stream_id, others).await?;
321             }
322             _ => {}
323         }
324 
325         Ok(())
326     }
327 
328     fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> {
329         log::info!(
330             "[ S<-C ] [set chunk size]  app_name: {}, stream_name: {}, chunk size: {}",
331             self.app_name,
332             self.stream_name,
333             chunk_size
334         );
335         self.unpacketizer.update_max_chunk_size(chunk_size);
336         Ok(())
337     }
338 
339     fn parse_connect_properties(&mut self, command_obj: &IndexMap<String, Amf0ValueType>) {
340         for (property, value) in command_obj {
341             match property.as_str() {
342                 "app" => {
343                     if let Amf0ValueType::UTF8String(app) = value {
344                         self.connect_properties.app = Some(app.clone());
345                     }
346                 }
347                 "flashVer" => {
348                     if let Amf0ValueType::UTF8String(flash_ver) = value {
349                         self.connect_properties.flash_ver = Some(flash_ver.clone());
350                     }
351                 }
352                 "swfUrl" => {
353                     if let Amf0ValueType::UTF8String(swf_url) = value {
354                         self.connect_properties.swf_url = Some(swf_url.clone());
355                     }
356                 }
357                 "tcUrl" => {
358                     if let Amf0ValueType::UTF8String(tc_url) = value {
359                         self.connect_properties.tc_url = Some(tc_url.clone());
360                     }
361                 }
362                 "fpad" => {
363                     if let Amf0ValueType::Boolean(fpad) = value {
364                         self.connect_properties.fpad = Some(*fpad);
365                     }
366                 }
367                 "audioCodecs" => {
368                     if let Amf0ValueType::Number(audio_codecs) = value {
369                         self.connect_properties.audio_codecs = Some(*audio_codecs);
370                     }
371                 }
372                 "videoCodecs" => {
373                     if let Amf0ValueType::Number(video_codecs) = value {
374                         self.connect_properties.video_codecs = Some(*video_codecs);
375                     }
376                 }
377                 "videoFunction" => {
378                     if let Amf0ValueType::Number(video_function) = value {
379                         self.connect_properties.video_function = Some(*video_function);
380                     }
381                 }
382                 "pageUrl" => {
383                     if let Amf0ValueType::UTF8String(page_url) = value {
384                         self.connect_properties.page_url = Some(page_url.clone());
385                     }
386                 }
387                 "objectEncoding" => {
388                     if let Amf0ValueType::Number(object_encoding) = value {
389                         self.connect_properties.object_encoding = Some(*object_encoding);
390                     }
391                 }
392                 _ => {
393                     log::warn!("unknown connect properties: {}:{:?}", property, value);
394                 }
395             }
396         }
397     }
398 
399     async fn on_connect(
400         &mut self,
401         transaction_id: &f64,
402         command_obj: &IndexMap<String, Amf0ValueType>,
403     ) -> Result<(), SessionError> {
404         self.parse_connect_properties(command_obj);
405         log::info!("connect properties: {:?}", self.connect_properties);
406         let mut control_message =
407             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
408         log::info!("[ S->C ] [set window_acknowledgement_size]");
409         control_message
410             .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)
411             .await?;
412 
413         log::info!("[ S->C ] [set set_peer_bandwidth]",);
414         control_message
415             .write_set_peer_bandwidth(
416                 define::PEER_BANDWIDTH,
417                 define::peer_bandwidth_limit_type::DYNAMIC,
418             )
419             .await?;
420 
421         let obj_encoding = command_obj.get("objectEncoding");
422         let encoding = match obj_encoding {
423             Some(Amf0ValueType::Number(encoding)) => encoding,
424             _ => &define::OBJENCODING_AMF0,
425         };
426 
427         let app_name = command_obj.get("app");
428         self.app_name = match app_name {
429             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
430             _ => {
431                 return Err(SessionError {
432                     value: SessionErrorValue::NoAppName,
433                 });
434             }
435         };
436 
437         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
438         log::info!("[ S->C ] [set connect_response]",);
439         netconnection
440             .write_connect_response(
441                 transaction_id,
442                 define::FMSVER,
443                 &define::CAPABILITIES,
444                 &String::from("NetConnection.Connect.Success"),
445                 define::LEVEL,
446                 &String::from("Connection Succeeded."),
447                 encoding,
448             )
449             .await?;
450 
451         Ok(())
452     }
453 
454     pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
455         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
456         netconnection
457             .write_create_stream_response(transaction_id, &define::STREAM_ID)
458             .await?;
459 
460         log::info!(
461             "[ S->C ] [create_stream_response]  app_name: {}",
462             self.app_name,
463         );
464 
465         Ok(())
466     }
467 
468     pub async fn on_delete_stream(
469         &mut self,
470         transaction_id: &f64,
471         stream_id: &f64,
472     ) -> Result<(), SessionError> {
473         self.common
474             .unpublish_to_channels(
475                 self.app_name.clone(),
476                 self.stream_name.clone(),
477                 self.session_id,
478             )
479             .await?;
480 
481         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
482         netstream
483             .write_on_status(
484                 transaction_id,
485                 "status",
486                 "NetStream.DeleteStream.Suceess",
487                 "",
488             )
489             .await?;
490 
491         //self.unsubscribe_from_channels().await?;
492         log::info!(
493             "[ S->C ] [delete stream success]  app_name: {}, stream_name: {}",
494             self.app_name,
495             self.stream_name
496         );
497         log::trace!("{}", stream_id);
498 
499         Ok(())
500     }
501 
502     fn get_request_url(&mut self, raw_stream_name: String) -> String {
503         if let Some(tc_url) = &self.connect_properties.tc_url {
504             format!("{tc_url}/{raw_stream_name}")
505         } else {
506             format!("{}/{}", self.app_name.clone(), raw_stream_name)
507         }
508     }
509 
510     #[allow(clippy::never_loop)]
511     pub async fn on_play(
512         &mut self,
513         transaction_id: &f64,
514         stream_id: &u32,
515         other_values: &mut Vec<Amf0ValueType>,
516     ) -> Result<(), SessionError> {
517         let length = other_values.len() as u8;
518         let mut index: u8 = 0;
519 
520         let mut stream_name: Option<String> = None;
521         let mut start: Option<f64> = None;
522         let mut duration: Option<f64> = None;
523         let mut reset: Option<bool> = None;
524 
525         loop {
526             if index >= length {
527                 break;
528             }
529             index += 1;
530             stream_name = match other_values.remove(0) {
531                 Amf0ValueType::UTF8String(val) => Some(val),
532                 _ => None,
533             };
534 
535             if index >= length {
536                 break;
537             }
538             index += 1;
539             start = match other_values.remove(0) {
540                 Amf0ValueType::Number(val) => Some(val),
541                 _ => None,
542             };
543 
544             if index >= length {
545                 break;
546             }
547             index += 1;
548             duration = match other_values.remove(0) {
549                 Amf0ValueType::Number(val) => Some(val),
550                 _ => None,
551             };
552 
553             if index >= length {
554                 break;
555             }
556             //index = index + 1;
557             reset = match other_values.remove(0) {
558                 Amf0ValueType::Boolean(val) => Some(val),
559                 _ => None,
560             };
561             break;
562         }
563 
564         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
565         event_messages.write_stream_begin(*stream_id).await?;
566         log::info!(
567             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}",
568             self.app_name,
569             self.stream_name
570         );
571         log::trace!(
572             "{} {} {}",
573             start.is_some(),
574             duration.is_some(),
575             reset.is_some()
576         );
577 
578         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
579         netstream
580             .write_on_status(transaction_id, "status", "NetStream.Play.Reset", "reset")
581             .await?;
582 
583         netstream
584             .write_on_status(
585                 transaction_id,
586                 "status",
587                 "NetStream.Play.Start",
588                 "play start",
589             )
590             .await?;
591 
592         netstream
593             .write_on_status(
594                 transaction_id,
595                 "status",
596                 "NetStream.Data.Start",
597                 "data start.",
598             )
599             .await?;
600 
601         netstream
602             .write_on_status(
603                 transaction_id,
604                 "status",
605                 "NetStream.Play.PublishNotify",
606                 "play publish notify.",
607             )
608             .await?;
609 
610         event_messages.write_stream_is_record(*stream_id).await?;
611 
612         let raw_stream_name = stream_name.unwrap();
613 
614         (self.stream_name, self.url_parameters) = RtmpUrlParser::default()
615             .set_raw_stream_name(raw_stream_name.clone())
616             .parse_raw_stream_name();
617 
618         log::info!(
619             "[ S->C ] [stream is record]  app_name: {}, stream_name: {}, url parameters: {}",
620             self.app_name,
621             self.stream_name,
622             self.url_parameters
623         );
624 
625         /*Now it can update the request url*/
626         self.common.request_url = self.get_request_url(raw_stream_name);
627         self.common
628             .subscribe_from_channels(
629                 self.app_name.clone(),
630                 self.stream_name.clone(),
631                 self.session_id,
632             )
633             .await?;
634 
635         self.state = ServerSessionState::Play;
636 
637         Ok(())
638     }
639 
640     pub async fn on_publish(
641         &mut self,
642         transaction_id: &f64,
643         stream_id: &u32,
644         other_values: &mut Vec<Amf0ValueType>,
645     ) -> Result<(), SessionError> {
646         let length = other_values.len();
647 
648         if length < 2 {
649             return Err(SessionError {
650                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
651             });
652         }
653 
654         let raw_stream_name = match other_values.remove(0) {
655             Amf0ValueType::UTF8String(val) => val,
656             _ => {
657                 return Err(SessionError {
658                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
659                 });
660             }
661         };
662 
663         (self.stream_name, self.url_parameters) = RtmpUrlParser::default()
664             .set_raw_stream_name(raw_stream_name.clone())
665             .parse_raw_stream_name();
666 
667         /*Now it can update the request url*/
668         self.common.request_url = self.get_request_url(raw_stream_name);
669 
670         let _ = match other_values.remove(0) {
671             Amf0ValueType::UTF8String(val) => val,
672             _ => {
673                 return Err(SessionError {
674                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
675                 });
676             }
677         };
678 
679         log::info!(
680             "[ S<-C ] [publish]  app_name: {}, stream_name: {}, url parameters: {}",
681             self.app_name,
682             self.stream_name,
683             self.url_parameters
684         );
685 
686         log::info!(
687             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}, url parameters: {}",
688             self.app_name,
689             self.stream_name,
690             self.url_parameters
691         );
692 
693         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
694         event_messages.write_stream_begin(*stream_id).await?;
695 
696         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
697         netstream
698             .write_on_status(transaction_id, "status", "NetStream.Publish.Start", "")
699             .await?;
700         log::info!(
701             "[ S->C ] [NetStream.Publish.Start]  app_name: {}, stream_name: {}",
702             self.app_name,
703             self.stream_name
704         );
705 
706         self.common
707             .publish_to_channels(
708                 self.app_name.clone(),
709                 self.stream_name.clone(),
710                 self.session_id,
711             )
712             .await?;
713 
714         Ok(())
715     }
716 }
717