1 use {
2     super::{
3         common::Common,
4         define,
5         define::SessionType,
6         errors::{SessionError, SessionErrorValue},
7     },
8     crate::{
9         amf0::Amf0ValueType,
10         channels::define::ChannelEventProducer,
11         chunk::{
12             define::CHUNK_SIZE,
13             unpacketizer::{ChunkUnpacketizer, UnpackResult},
14         },
15         config, handshake,
16         handshake::{define::ServerHandshakeState, handshake_server::HandshakeServer},
17         messages::{define::RtmpMessageData, parser::MessageParser},
18         netconnection::writer::NetConnection,
19         netstream::writer::NetStreamWriter,
20         protocol_control_messages::writer::ProtocolControlMessagesWriter,
21         user_control_messages::writer::EventMessagesWriter,
22     },
23     bytes::BytesMut,
24     bytesio::{bytes_writer::AsyncBytesWriter, bytesio::BytesIO},
25     std::{collections::HashMap, sync::Arc, time::Duration},
26     tokio::{net::TcpStream, sync::Mutex},
27     uuid::Uuid,
28 };
29 
30 enum ServerSessionState {
31     Handshake,
32     ReadChunk,
33     // OnConnect,
34     // OnCreateStream,
35     //Publish,
36     DeleteStream,
37     Play,
38 }
39 
40 pub struct ServerSession {
41     pub app_name: String,
42     pub stream_name: String,
43     io: Arc<Mutex<BytesIO>>,
44     handshaker: HandshakeServer,
45     unpacketizer: ChunkUnpacketizer,
46     state: ServerSessionState,
47     pub common: Common,
48     bytesio_data: BytesMut,
49     has_remaing_data: bool,
50     /* Used to mark the subscriber's the data producer
51     in channels and delete it from map when unsubscribe
52     is called. */
53     pub subscriber_id: Uuid,
54     connect_command_object: Option<HashMap<String, Amf0ValueType>>,
55 }
56 
57 impl ServerSession {
58     pub fn new(stream: TcpStream, event_producer: ChannelEventProducer) -> Self {
59         let net_io = Arc::new(Mutex::new(BytesIO::new(stream)));
60         let subscriber_id = Uuid::new_v4();
61         Self {
62             app_name: String::from(""),
63             stream_name: String::from(""),
64             io: Arc::clone(&net_io),
65             handshaker: HandshakeServer::new(Arc::clone(&net_io)),
66             unpacketizer: ChunkUnpacketizer::new(),
67             state: ServerSessionState::Handshake,
68             common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Server),
69             subscriber_id,
70             bytesio_data: BytesMut::new(),
71             has_remaing_data: false,
72             connect_command_object: None,
73         }
74     }
75 
76     pub async fn run(&mut self) -> Result<(), SessionError> {
77         loop {
78             match self.state {
79                 ServerSessionState::Handshake => {
80                     self.handshake().await?;
81                 }
82                 ServerSessionState::ReadChunk => {
83                     self.read_parse_chunks().await?;
84                 }
85                 ServerSessionState::Play => {
86                     self.play().await?;
87                 }
88                 ServerSessionState::DeleteStream => {
89                     return Ok(());
90                 }
91             }
92         }
93 
94         //Ok(())
95     }
96 
97     async fn handshake(&mut self) -> Result<(), SessionError> {
98         let mut bytes_len = 0;
99 
100         while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE {
101             self.bytesio_data = self.io.lock().await.read().await?;
102             bytes_len += self.bytesio_data.len();
103             self.handshaker.extend_data(&self.bytesio_data[..]);
104         }
105 
106         self.handshaker.handshake().await?;
107 
108         if let ServerHandshakeState::Finish = self.handshaker.state() {
109             self.state = ServerSessionState::ReadChunk;
110             let left_bytes = self.handshaker.get_remaining_bytes();
111             if !left_bytes.is_empty() {
112                 self.unpacketizer.extend_data(&left_bytes[..]);
113                 self.has_remaing_data = true;
114             }
115             log::info!("[ S->C ] [send_set_chunk_size] ");
116             self.send_set_chunk_size().await?;
117             return Ok(());
118         }
119 
120         Ok(())
121     }
122 
123     async fn read_parse_chunks(&mut self) -> Result<(), SessionError> {
124         if !self.has_remaing_data {
125             match self
126                 .io
127                 .lock()
128                 .await
129                 .read_timeout(Duration::from_secs(2))
130                 .await
131             {
132                 Ok(data) => {
133                     self.bytesio_data = data;
134                 }
135                 Err(err) => {
136                     self.common
137                         .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone())
138                         .await?;
139 
140                     return Err(SessionError {
141                         value: SessionErrorValue::BytesIOError(err),
142                     });
143                 }
144             }
145 
146             self.unpacketizer.extend_data(&self.bytesio_data[..]);
147         }
148 
149         self.has_remaing_data = false;
150 
151         loop {
152             let result = self.unpacketizer.read_chunks();
153 
154             if let Ok(rv) = result {
155                 if let UnpackResult::Chunks(chunks) = rv {
156                     for chunk_info in chunks {
157                         let timestamp = chunk_info.message_header.timestamp;
158                         let msg_stream_id = chunk_info.message_header.msg_streamd_id;
159 
160                         let mut msg = MessageParser::new(chunk_info).parse()?;
161                         self.process_messages(&mut msg, &msg_stream_id, &timestamp)
162                             .await?;
163                     }
164                 }
165             } else {
166                 break;
167             }
168         }
169         Ok(())
170     }
171 
172     async fn play(&mut self) -> Result<(), SessionError> {
173         match self.common.send_channel_data().await {
174             Ok(_) => {}
175 
176             Err(err) => {
177                 self.common
178                     .unsubscribe_from_channels(
179                         self.app_name.clone(),
180                         self.stream_name.clone(),
181                         self.subscriber_id,
182                     )
183                     .await?;
184                 return Err(err);
185             }
186         }
187 
188         Ok(())
189     }
190 
191     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
192         let mut controlmessage =
193             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
194         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
195 
196         Ok(())
197     }
198 
199     pub async fn process_messages(
200         &mut self,
201         rtmp_msg: &mut RtmpMessageData,
202         msg_stream_id: &u32,
203         timestamp: &u32,
204     ) -> Result<(), SessionError> {
205         match rtmp_msg {
206             RtmpMessageData::Amf0Command {
207                 command_name,
208                 transaction_id,
209                 command_object,
210                 others,
211             } => {
212                 self.on_amf0_command_message(
213                     msg_stream_id,
214                     command_name,
215                     transaction_id,
216                     command_object,
217                     others,
218                 )
219                 .await?
220             }
221             RtmpMessageData::SetChunkSize { chunk_size } => {
222                 self.on_set_chunk_size(*chunk_size as usize)?;
223             }
224             RtmpMessageData::AudioData { data } => {
225                 self.common.on_audio_data(data, timestamp)?;
226             }
227             RtmpMessageData::VideoData { data } => {
228                 self.common.on_video_data(data, timestamp)?;
229             }
230             RtmpMessageData::AmfData { raw_data } => {
231                 self.common.on_meta_data(raw_data, timestamp)?;
232             }
233 
234             _ => {}
235         }
236         Ok(())
237     }
238 
239     pub async fn on_amf0_command_message(
240         &mut self,
241         stream_id: &u32,
242         command_name: &Amf0ValueType,
243         transaction_id: &Amf0ValueType,
244         command_object: &Amf0ValueType,
245         others: &mut Vec<Amf0ValueType>,
246     ) -> Result<(), SessionError> {
247         let empty_cmd_name = &String::new();
248         let cmd_name = match command_name {
249             Amf0ValueType::UTF8String(str) => str,
250             _ => empty_cmd_name,
251         };
252 
253         let transaction_id = match transaction_id {
254             Amf0ValueType::Number(number) => number,
255             _ => &0.0,
256         };
257 
258         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
259         let obj = match command_object {
260             Amf0ValueType::Object(obj) => obj,
261             _ => &empty_cmd_obj,
262         };
263 
264         match cmd_name.as_str() {
265             "connect" => {
266                 log::info!("[ S<-C ] [connect] ");
267                 self.on_connect(transaction_id, obj).await?;
268             }
269             "createStream" => {
270                 log::info!("[ S<-C ] [create stream] ");
271                 self.on_create_stream(transaction_id).await?;
272             }
273             "deleteStream" => {
274                 if !others.is_empty() {
275                     let stream_id = match others.pop() {
276                         Some(Amf0ValueType::Number(streamid)) => streamid,
277                         _ => 0.0,
278                     };
279 
280                     log::info!(
281                         "[ S<-C ] [delete stream] app_name: {}, stream_name: {}",
282                         self.app_name,
283                         self.stream_name
284                     );
285 
286                     self.on_delete_stream(transaction_id, &stream_id).await?;
287                     self.state = ServerSessionState::DeleteStream;
288                 }
289             }
290             "play" => {
291                 log::info!(
292                     "[ S<-C ] [play]  app_name: {}, stream_name: {}",
293                     self.app_name,
294                     self.stream_name
295                 );
296                 self.unpacketizer.session_type = config::SERVER_PULL;
297                 self.on_play(transaction_id, stream_id, others).await?;
298             }
299             "publish" => {
300                 self.unpacketizer.session_type = config::SERVER_PUSH;
301                 self.on_publish(transaction_id, stream_id, others).await?;
302             }
303             _ => {}
304         }
305 
306         Ok(())
307     }
308 
309     fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> {
310         log::info!(
311             "[ S<-C ] [set chunk size]  app_name: {}, stream_name: {}, chunk size: {}",
312             self.app_name,
313             self.stream_name,
314             chunk_size
315         );
316         self.unpacketizer.update_max_chunk_size(chunk_size);
317         Ok(())
318     }
319 
320     async fn on_connect(
321         &mut self,
322         transaction_id: &f64,
323         command_obj: &HashMap<String, Amf0ValueType>,
324     ) -> Result<(), SessionError> {
325         self.connect_command_object = Some(command_obj.clone());
326         let mut control_message =
327             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
328         log::info!("[ S->C ] [set window_acknowledgement_size]");
329         control_message
330             .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)
331             .await?;
332 
333         log::info!("[ S->C ] [set set_peer_bandwidth]",);
334         control_message
335             .write_set_peer_bandwidth(
336                 define::PEER_BANDWIDTH,
337                 define::peer_bandwidth_limit_type::DYNAMIC,
338             )
339             .await?;
340 
341         let obj_encoding = command_obj.get("objectEncoding");
342         let encoding = match obj_encoding {
343             Some(Amf0ValueType::Number(encoding)) => encoding,
344             _ => &define::OBJENCODING_AMF0,
345         };
346 
347         let app_name = command_obj.get("app");
348         self.app_name = match app_name {
349             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
350             _ => {
351                 return Err(SessionError {
352                     value: SessionErrorValue::NoAppName,
353                 });
354             }
355         };
356 
357         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
358         log::info!("[ S->C ] [set connect_response]",);
359         netconnection
360             .write_connect_response(
361                 transaction_id,
362                 define::FMSVER,
363                 &define::CAPABILITIES,
364                 &String::from("NetConnection.Connect.Success"),
365                 define::LEVEL,
366                 &String::from("Connection Succeeded."),
367                 encoding,
368             )
369             .await?;
370 
371         Ok(())
372     }
373 
374     pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
375         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
376         netconnection
377             .write_create_stream_response(transaction_id, &define::STREAM_ID)
378             .await?;
379 
380         log::info!(
381             "[ S->C ] [create_stream_response]  app_name: {}",
382             self.app_name,
383         );
384 
385         Ok(())
386     }
387 
388     pub async fn on_delete_stream(
389         &mut self,
390         transaction_id: &f64,
391         stream_id: &f64,
392     ) -> Result<(), SessionError> {
393         self.common
394             .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone())
395             .await?;
396 
397         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
398         netstream
399             .write_on_status(
400                 transaction_id,
401                 "status",
402                 "NetStream.DeleteStream.Suceess",
403                 "",
404             )
405             .await?;
406 
407         //self.unsubscribe_from_channels().await?;
408         log::info!(
409             "[ S->C ] [delete stream success]  app_name: {}, stream_name: {}",
410             self.app_name,
411             self.stream_name
412         );
413         log::trace!("{}", stream_id);
414 
415         Ok(())
416     }
417 
418     #[allow(clippy::never_loop)]
419     pub async fn on_play(
420         &mut self,
421         transaction_id: &f64,
422         stream_id: &u32,
423         other_values: &mut Vec<Amf0ValueType>,
424     ) -> Result<(), SessionError> {
425         let length = other_values.len() as u8;
426         let mut index: u8 = 0;
427 
428         let mut stream_name: Option<String> = None;
429         let mut start: Option<f64> = None;
430         let mut duration: Option<f64> = None;
431         let mut reset: Option<bool> = None;
432 
433         loop {
434             if index >= length {
435                 break;
436             }
437             index += 1;
438             stream_name = match other_values.remove(0) {
439                 Amf0ValueType::UTF8String(val) => Some(val),
440                 _ => None,
441             };
442 
443             if index >= length {
444                 break;
445             }
446             index += 1;
447             start = match other_values.remove(0) {
448                 Amf0ValueType::Number(val) => Some(val),
449                 _ => None,
450             };
451 
452             if index >= length {
453                 break;
454             }
455             index += 1;
456             duration = match other_values.remove(0) {
457                 Amf0ValueType::Number(val) => Some(val),
458                 _ => None,
459             };
460 
461             if index >= length {
462                 break;
463             }
464             //index = index + 1;
465             reset = match other_values.remove(0) {
466                 Amf0ValueType::Boolean(val) => Some(val),
467                 _ => None,
468             };
469             break;
470         }
471 
472         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
473         event_messages.write_stream_begin(*stream_id).await?;
474         log::info!(
475             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}",
476             self.app_name,
477             self.stream_name
478         );
479         log::trace!(
480             "{} {} {}",
481             start.is_some(),
482             duration.is_some(),
483             reset.is_some()
484         );
485 
486         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
487         netstream
488             .write_on_status(transaction_id, "status", "NetStream.Play.Reset", "reset")
489             .await?;
490 
491         netstream
492             .write_on_status(
493                 transaction_id,
494                 "status",
495                 "NetStream.Play.Start",
496                 "play start",
497             )
498             .await?;
499 
500         netstream
501             .write_on_status(
502                 transaction_id,
503                 "status",
504                 "NetStream.Data.Start",
505                 "data start.",
506             )
507             .await?;
508 
509         netstream
510             .write_on_status(
511                 transaction_id,
512                 "status",
513                 "NetStream.Play.PublishNotify",
514                 "play publish notify.",
515             )
516             .await?;
517 
518         event_messages.write_stream_is_record(*stream_id).await?;
519         log::info!(
520             "[ S->C ] [stream is record]  app_name: {}, stream_name: {}",
521             self.app_name,
522             self.stream_name
523         );
524         self.stream_name = stream_name.clone().unwrap();
525         self.common
526             .subscribe_from_channels(
527                 self.app_name.clone(),
528                 stream_name.unwrap(),
529                 self.subscriber_id,
530             )
531             .await?;
532 
533         self.state = ServerSessionState::Play;
534 
535         Ok(())
536     }
537 
538     pub async fn on_publish(
539         &mut self,
540         transaction_id: &f64,
541         stream_id: &u32,
542         other_values: &mut Vec<Amf0ValueType>,
543     ) -> Result<(), SessionError> {
544         let length = other_values.len();
545 
546         if length < 2 {
547             return Err(SessionError {
548                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
549             });
550         }
551 
552         let stream_name = match other_values.remove(0) {
553             Amf0ValueType::UTF8String(val) => val,
554             _ => {
555                 return Err(SessionError {
556                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
557                 });
558             }
559         };
560 
561         self.stream_name = stream_name;
562 
563         let _ = match other_values.remove(0) {
564             Amf0ValueType::UTF8String(val) => val,
565             _ => {
566                 return Err(SessionError {
567                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
568                 });
569             }
570         };
571 
572         log::info!(
573             "[ S<-C ] [publish]  app_name: {}, stream_name: {}",
574             self.app_name,
575             self.stream_name
576         );
577 
578         log::info!(
579             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}",
580             self.app_name,
581             self.stream_name
582         );
583 
584         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
585         event_messages.write_stream_begin(*stream_id).await?;
586 
587         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
588         netstream
589             .write_on_status(transaction_id, "status", "NetStream.Publish.Start", "")
590             .await?;
591         log::info!(
592             "[ S->C ] [NetStream.Publish.Start]  app_name: {}, stream_name: {}",
593             self.app_name,
594             self.stream_name
595         );
596 
597         self.common
598             .publish_to_channels(self.app_name.clone(), self.stream_name.clone())
599             .await?;
600 
601         Ok(())
602     }
603 }
604