1 use {
2     super::{
3         common::Common,
4         define,
5         define::SessionType,
6         errors::{SessionError, SessionErrorValue},
7     },
8     crate::{
9         amf0::Amf0ValueType,
10         channels::define::ChannelEventProducer,
11         chunk::{
12             define::CHUNK_SIZE,
13             unpacketizer::{ChunkUnpacketizer, UnpackResult},
14         },
15         config, handshake,
16         handshake::{define::ServerHandshakeState, handshake_server::HandshakeServer},
17         messages::{define::RtmpMessageData, parser::MessageParser},
18         netconnection::writer::NetConnection,
19         netstream::writer::NetStreamWriter,
20         protocol_control_messages::writer::ProtocolControlMessagesWriter,
21         user_control_messages::writer::EventMessagesWriter,
22     },
23     bytes::BytesMut,
24     bytesio::{bytes_writer::AsyncBytesWriter, bytesio::BytesIO},
25     std::{collections::HashMap, sync::Arc, time::Duration},
26     tokio::{net::TcpStream, sync::Mutex},
27     uuid::Uuid,
28 };
29 
30 enum ServerSessionState {
31     Handshake,
32     ReadChunk,
33     // OnConnect,
34     // OnCreateStream,
35     //Publish,
36     Play,
37 }
38 
39 pub struct ServerSession {
40     pub app_name: String,
41     pub stream_name: String,
42     io: Arc<Mutex<BytesIO>>,
43     handshaker: HandshakeServer,
44     unpacketizer: ChunkUnpacketizer,
45     state: ServerSessionState,
46     pub common: Common,
47     bytesio_data: BytesMut,
48     has_remaing_data: bool,
49     /* Used to mark the subscriber's the data producer
50     in channels and delete it from map when unsubscribe
51     is called. */
52     pub subscriber_id: Uuid,
53     connect_command_object: Option<HashMap<String, Amf0ValueType>>,
54 }
55 
56 impl ServerSession {
57     pub fn new(stream: TcpStream, event_producer: ChannelEventProducer) -> Self {
58         let net_io = Arc::new(Mutex::new(BytesIO::new(stream)));
59         let subscriber_id = Uuid::new_v4();
60         Self {
61             app_name: String::from(""),
62             stream_name: String::from(""),
63             io: Arc::clone(&net_io),
64             handshaker: HandshakeServer::new(Arc::clone(&net_io)),
65             unpacketizer: ChunkUnpacketizer::new(),
66             state: ServerSessionState::Handshake,
67             common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Server),
68             subscriber_id,
69             bytesio_data: BytesMut::new(),
70             has_remaing_data: false,
71             connect_command_object: None,
72         }
73     }
74 
75     pub async fn run(&mut self) -> Result<(), SessionError> {
76         loop {
77             match self.state {
78                 ServerSessionState::Handshake => {
79                     self.handshake().await?;
80                 }
81                 ServerSessionState::ReadChunk => {
82                     self.read_parse_chunks().await?;
83                 }
84                 ServerSessionState::Play => {
85                     self.play().await?;
86                 }
87             }
88         }
89 
90         //Ok(())
91     }
92 
93     async fn handshake(&mut self) -> Result<(), SessionError> {
94         let mut bytes_len = 0;
95 
96         while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE {
97             self.bytesio_data = self.io.lock().await.read().await?;
98             bytes_len += self.bytesio_data.len();
99             self.handshaker.extend_data(&self.bytesio_data[..]);
100         }
101 
102         self.handshaker.handshake().await?;
103 
104         if let ServerHandshakeState::Finish = self.handshaker.state() {
105             self.state = ServerSessionState::ReadChunk;
106             let left_bytes = self.handshaker.get_remaining_bytes();
107             if !left_bytes.is_empty() {
108                 self.unpacketizer.extend_data(&left_bytes[..]);
109                 self.has_remaing_data = true;
110             }
111             log::info!("[ S->C ] [send_set_chunk_size] ");
112             self.send_set_chunk_size().await?;
113             return Ok(());
114         }
115 
116         Ok(())
117     }
118 
119     async fn read_parse_chunks(&mut self) -> Result<(), SessionError> {
120         if !self.has_remaing_data {
121             match self
122                 .io
123                 .lock()
124                 .await
125                 .read_timeout(Duration::from_secs(2))
126                 .await
127             {
128                 Ok(data) => {
129                     self.bytesio_data = data;
130                 }
131                 Err(err) => {
132                     self.common
133                         .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone())
134                         .await?;
135 
136                     return Err(SessionError {
137                         value: SessionErrorValue::BytesIOError(err),
138                     });
139                 }
140             }
141 
142             self.unpacketizer.extend_data(&self.bytesio_data[..]);
143         }
144 
145         self.has_remaing_data = false;
146 
147         loop {
148             let result = self.unpacketizer.read_chunks();
149 
150             if let Ok(rv) = result {
151                 if let UnpackResult::Chunks(chunks) = rv {
152                     for chunk_info in chunks {
153                         let timestamp = chunk_info.message_header.timestamp;
154                         let msg_stream_id = chunk_info.message_header.msg_streamd_id;
155 
156                         let mut msg = MessageParser::new(chunk_info).parse()?;
157                         self.process_messages(&mut msg, &msg_stream_id, &timestamp)
158                             .await?;
159                     }
160                 }
161             } else {
162                 break;
163             }
164         }
165         Ok(())
166     }
167 
168     async fn play(&mut self) -> Result<(), SessionError> {
169         match self.common.send_channel_data().await {
170             Ok(_) => {}
171 
172             Err(err) => {
173                 self.common
174                     .unsubscribe_from_channels(
175                         self.app_name.clone(),
176                         self.stream_name.clone(),
177                         self.subscriber_id,
178                     )
179                     .await?;
180                 return Err(err);
181             }
182         }
183 
184         Ok(())
185     }
186 
187     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
188         let mut controlmessage =
189             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
190         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
191 
192         Ok(())
193     }
194 
195     pub async fn process_messages(
196         &mut self,
197         rtmp_msg: &mut RtmpMessageData,
198         msg_stream_id: &u32,
199         timestamp: &u32,
200     ) -> Result<(), SessionError> {
201         match rtmp_msg {
202             RtmpMessageData::Amf0Command {
203                 command_name,
204                 transaction_id,
205                 command_object,
206                 others,
207             } => {
208                 self.on_amf0_command_message(
209                     msg_stream_id,
210                     command_name,
211                     transaction_id,
212                     command_object,
213                     others,
214                 )
215                 .await?
216             }
217             RtmpMessageData::SetChunkSize { chunk_size } => {
218                 self.on_set_chunk_size(*chunk_size as usize)?;
219             }
220             RtmpMessageData::AudioData { data } => {
221                 self.common.on_audio_data(data, timestamp)?;
222             }
223             RtmpMessageData::VideoData { data } => {
224                 self.common.on_video_data(data, timestamp)?;
225             }
226             RtmpMessageData::AmfData { raw_data } => {
227                 self.common.on_meta_data(raw_data, timestamp)?;
228             }
229 
230             _ => {}
231         }
232         Ok(())
233     }
234 
235     pub async fn on_amf0_command_message(
236         &mut self,
237         stream_id: &u32,
238         command_name: &Amf0ValueType,
239         transaction_id: &Amf0ValueType,
240         command_object: &Amf0ValueType,
241         others: &mut Vec<Amf0ValueType>,
242     ) -> Result<(), SessionError> {
243         let empty_cmd_name = &String::new();
244         let cmd_name = match command_name {
245             Amf0ValueType::UTF8String(str) => str,
246             _ => empty_cmd_name,
247         };
248 
249         let transaction_id = match transaction_id {
250             Amf0ValueType::Number(number) => number,
251             _ => &0.0,
252         };
253 
254         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
255         let obj = match command_object {
256             Amf0ValueType::Object(obj) => obj,
257             _ => &empty_cmd_obj,
258         };
259 
260         match cmd_name.as_str() {
261             "connect" => {
262                 log::info!("[ S<-C ] [connect] ");
263                 self.on_connect(transaction_id, obj).await?;
264             }
265             "createStream" => {
266                 log::info!("[ S<-C ] [create stream] ");
267                 self.on_create_stream(transaction_id).await?;
268             }
269             "deleteStream" => {
270                 if !others.is_empty() {
271                     let stream_id = match others.pop() {
272                         Some(Amf0ValueType::Number(streamid)) => streamid,
273                         _ => 0.0,
274                     };
275 
276                     log::info!(
277                         "[ S<-C ] [delete stream] app_name: {}, stream_name: {}",
278                         self.app_name,
279                         self.stream_name
280                     );
281 
282                     self.on_delete_stream(transaction_id, &stream_id).await?;
283                 }
284             }
285             "play" => {
286                 log::info!(
287                     "[ S<-C ] [play]  app_name: {}, stream_name: {}",
288                     self.app_name,
289                     self.stream_name
290                 );
291                 self.unpacketizer.session_type = config::SERVER_PULL;
292                 self.on_play(transaction_id, stream_id, others).await?;
293             }
294             "publish" => {
295                 self.unpacketizer.session_type = config::SERVER_PUSH;
296                 self.on_publish(transaction_id, stream_id, others).await?;
297             }
298             _ => {}
299         }
300 
301         Ok(())
302     }
303 
304     fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> {
305         log::info!(
306             "[ S<-C ] [set chunk size]  app_name: {}, stream_name: {}, chunk size: {}",
307             self.app_name,
308             self.stream_name,
309             chunk_size
310         );
311         self.unpacketizer.update_max_chunk_size(chunk_size);
312         Ok(())
313     }
314 
315     async fn on_connect(
316         &mut self,
317         transaction_id: &f64,
318         command_obj: &HashMap<String, Amf0ValueType>,
319     ) -> Result<(), SessionError> {
320         self.connect_command_object = Some(command_obj.clone());
321         let mut control_message =
322             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
323         log::info!("[ S->C ] [set window_acknowledgement_size]");
324         control_message
325             .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)
326             .await?;
327 
328         log::info!("[ S->C ] [set set_peer_bandwidth]",);
329         control_message
330             .write_set_peer_bandwidth(
331                 define::PEER_BANDWIDTH,
332                 define::peer_bandwidth_limit_type::DYNAMIC,
333             )
334             .await?;
335 
336         let obj_encoding = command_obj.get("objectEncoding");
337         let encoding = match obj_encoding {
338             Some(Amf0ValueType::Number(encoding)) => encoding,
339             _ => &define::OBJENCODING_AMF0,
340         };
341 
342         let app_name = command_obj.get("app");
343         self.app_name = match app_name {
344             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
345             _ => {
346                 return Err(SessionError {
347                     value: SessionErrorValue::NoAppName,
348                 });
349             }
350         };
351 
352         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
353         log::info!("[ S->C ] [set connect_response]",);
354         netconnection
355             .write_connect_response(
356                 transaction_id,
357                 define::FMSVER,
358                 &define::CAPABILITIES,
359                 &String::from("NetConnection.Connect.Success"),
360                 define::LEVEL,
361                 &String::from("Connection Succeeded."),
362                 encoding,
363             )
364             .await?;
365 
366         Ok(())
367     }
368 
369     pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
370         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
371         netconnection
372             .write_create_stream_response(transaction_id, &define::STREAM_ID)
373             .await?;
374 
375         log::info!(
376             "[ S->C ] [create_stream_response]  app_name: {}",
377             self.app_name,
378         );
379 
380         Ok(())
381     }
382 
383     pub async fn on_delete_stream(
384         &mut self,
385         transaction_id: &f64,
386         stream_id: &f64,
387     ) -> Result<(), SessionError> {
388         self.common
389             .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone())
390             .await?;
391 
392         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
393         netstream
394             .write_on_status(
395                 transaction_id,
396                 "status",
397                 "NetStream.DeleteStream.Suceess",
398                 "",
399             )
400             .await?;
401 
402         //self.unsubscribe_from_channels().await?;
403         log::info!(
404             "[ S->C ] [delete stream success]  app_name: {}, stream_name: {}",
405             self.app_name,
406             self.stream_name
407         );
408         log::trace!("{}", stream_id);
409 
410         Ok(())
411     }
412 
413     #[allow(clippy::never_loop)]
414     pub async fn on_play(
415         &mut self,
416         transaction_id: &f64,
417         stream_id: &u32,
418         other_values: &mut Vec<Amf0ValueType>,
419     ) -> Result<(), SessionError> {
420         let length = other_values.len() as u8;
421         let mut index: u8 = 0;
422 
423         let mut stream_name: Option<String> = None;
424         let mut start: Option<f64> = None;
425         let mut duration: Option<f64> = None;
426         let mut reset: Option<bool> = None;
427 
428         loop {
429             if index >= length {
430                 break;
431             }
432             index += 1;
433             stream_name = match other_values.remove(0) {
434                 Amf0ValueType::UTF8String(val) => Some(val),
435                 _ => None,
436             };
437 
438             if index >= length {
439                 break;
440             }
441             index += 1;
442             start = match other_values.remove(0) {
443                 Amf0ValueType::Number(val) => Some(val),
444                 _ => None,
445             };
446 
447             if index >= length {
448                 break;
449             }
450             index += 1;
451             duration = match other_values.remove(0) {
452                 Amf0ValueType::Number(val) => Some(val),
453                 _ => None,
454             };
455 
456             if index >= length {
457                 break;
458             }
459             //index = index + 1;
460             reset = match other_values.remove(0) {
461                 Amf0ValueType::Boolean(val) => Some(val),
462                 _ => None,
463             };
464             break;
465         }
466 
467         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
468         event_messages.write_stream_begin(*stream_id).await?;
469         log::info!(
470             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}",
471             self.app_name,
472             self.stream_name
473         );
474         log::trace!(
475             "{} {} {}",
476             start.is_some(),
477             duration.is_some(),
478             reset.is_some()
479         );
480 
481         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
482         netstream
483             .write_on_status(transaction_id, "status", "NetStream.Play.Reset", "reset")
484             .await?;
485 
486         netstream
487             .write_on_status(
488                 transaction_id,
489                 "status",
490                 "NetStream.Play.Start",
491                 "play start",
492             )
493             .await?;
494 
495         netstream
496             .write_on_status(
497                 transaction_id,
498                 "status",
499                 "NetStream.Data.Start",
500                 "data start.",
501             )
502             .await?;
503 
504         netstream
505             .write_on_status(
506                 transaction_id,
507                 "status",
508                 "NetStream.Play.PublishNotify",
509                 "play publish notify.",
510             )
511             .await?;
512 
513         event_messages.write_stream_is_record(*stream_id).await?;
514         log::info!(
515             "[ S->C ] [stream is record]  app_name: {}, stream_name: {}",
516             self.app_name,
517             self.stream_name
518         );
519         self.stream_name = stream_name.clone().unwrap();
520         self.common
521             .subscribe_from_channels(
522                 self.app_name.clone(),
523                 stream_name.unwrap(),
524                 self.subscriber_id,
525             )
526             .await?;
527 
528         self.state = ServerSessionState::Play;
529 
530         Ok(())
531     }
532 
533     pub async fn on_publish(
534         &mut self,
535         transaction_id: &f64,
536         stream_id: &u32,
537         other_values: &mut Vec<Amf0ValueType>,
538     ) -> Result<(), SessionError> {
539         let length = other_values.len();
540 
541         if length < 2 {
542             return Err(SessionError {
543                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
544             });
545         }
546 
547         let stream_name = match other_values.remove(0) {
548             Amf0ValueType::UTF8String(val) => val,
549             _ => {
550                 return Err(SessionError {
551                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
552                 });
553             }
554         };
555 
556         self.stream_name = stream_name;
557 
558         let _ = match other_values.remove(0) {
559             Amf0ValueType::UTF8String(val) => val,
560             _ => {
561                 return Err(SessionError {
562                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
563                 });
564             }
565         };
566 
567         log::info!(
568             "[ S<-C ] [publish]  app_name: {}, stream_name: {}",
569             self.app_name,
570             self.stream_name
571         );
572 
573         log::info!(
574             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}",
575             self.app_name,
576             self.stream_name
577         );
578 
579         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
580         event_messages.write_stream_begin(*stream_id).await?;
581 
582         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
583         netstream
584             .write_on_status(transaction_id, "status", "NetStream.Publish.Start", "")
585             .await?;
586         log::info!(
587             "[ S->C ] [NetStream.Publish.Start]  app_name: {}, stream_name: {}",
588             self.app_name,
589             self.stream_name
590         );
591 
592         self.common
593             .publish_to_channels(self.app_name.clone(), self.stream_name.clone())
594             .await?;
595 
596         Ok(())
597     }
598 }
599