1 use {
2     super::{
3         common::Common,
4         define,
5         define::SessionType,
6         errors::{SessionError, SessionErrorValue},
7     },
8     crate::{
9         amf0::Amf0ValueType,
10         channels::define::ChannelEventProducer,
11         chunk::{
12             define::CHUNK_SIZE,
13             unpacketizer::{ChunkUnpacketizer, UnpackResult},
14         },
15         config, handshake,
16         handshake::{define::ServerHandshakeState, handshake_server::HandshakeServer},
17         messages::{define::RtmpMessageData, parser::MessageParser},
18         netconnection::writer::NetConnection,
19         netstream::writer::NetStreamWriter,
20         protocol_control_messages::writer::ProtocolControlMessagesWriter,
21         user_control_messages::writer::EventMessagesWriter,
22     },
23     bytes::BytesMut,
24     bytesio::{bytes_writer::AsyncBytesWriter, bytesio::BytesIO},
25     std::{collections::HashMap, sync::Arc, time::Duration},
26     tokio::{net::TcpStream, sync::Mutex},
27     uuid::Uuid,
28 };
29 
30 enum ServerSessionState {
31     Handshake,
32     ReadChunk,
33     // OnConnect,
34     // OnCreateStream,
35     //Publish,
36     Play,
37 }
38 
39 pub struct ServerSession {
40     pub app_name: String,
41     pub stream_name: String,
42     io: Arc<Mutex<BytesIO>>,
43     handshaker: HandshakeServer,
44     unpacketizer: ChunkUnpacketizer,
45     state: ServerSessionState,
46     pub common: Common,
47     bytesio_data: BytesMut,
48     has_remaing_data: bool,
49     /* Used to mark the subscriber's the data producer
50     in channels and delete it from map when unsubscribe
51     is called. */
52     pub subscriber_id: Uuid,
53     connect_command_object: Option<HashMap<String, Amf0ValueType>>,
54 }
55 
56 impl ServerSession {
57     pub fn new(stream: TcpStream, event_producer: ChannelEventProducer) -> Self {
58         let net_io = Arc::new(Mutex::new(BytesIO::new(stream)));
59         let subscriber_id = Uuid::new_v4();
60         Self {
61             app_name: String::from(""),
62             stream_name: String::from(""),
63             io: Arc::clone(&net_io),
64             handshaker: HandshakeServer::new(Arc::clone(&net_io)),
65             unpacketizer: ChunkUnpacketizer::new(),
66             state: ServerSessionState::Handshake,
67             common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Server),
68             subscriber_id,
69             bytesio_data: BytesMut::new(),
70             has_remaing_data: false,
71             connect_command_object: None,
72         }
73     }
74 
75     pub async fn run(&mut self) -> Result<(), SessionError> {
76         loop {
77             match self.state {
78                 ServerSessionState::Handshake => {
79                     self.handshake().await?;
80                 }
81                 ServerSessionState::ReadChunk => {
82                     self.read_parse_chunks().await?;
83                 }
84                 ServerSessionState::Play => {
85                     self.play().await?;
86                 }
87             }
88         }
89 
90         //Ok(())
91     }
92 
93     async fn handshake(&mut self) -> Result<(), SessionError> {
94         let mut bytes_len = 0;
95 
96         while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE {
97             self.bytesio_data = self.io.lock().await.read().await?;
98             bytes_len += self.bytesio_data.len();
99             self.handshaker.extend_data(&self.bytesio_data[..]);
100         }
101 
102         self.handshaker.handshake().await?;
103 
104         match self.handshaker.state() {
105             ServerHandshakeState::Finish => {
106                 self.state = ServerSessionState::ReadChunk;
107 
108                 let left_bytes = self.handshaker.get_remaining_bytes();
109                 if !left_bytes.is_empty() {
110                     self.unpacketizer.extend_data(&left_bytes[..]);
111                     self.has_remaing_data = true;
112                 }
113                 log::info!("[ S->C ] [send_set_chunk_size] ");
114                 self.send_set_chunk_size().await?;
115                 return Ok(());
116             }
117             _ => {}
118         }
119 
120         Ok(())
121     }
122 
123     async fn read_parse_chunks(&mut self) -> Result<(), SessionError> {
124         if !self.has_remaing_data {
125             match self
126                 .io
127                 .lock()
128                 .await
129                 .read_timeout(Duration::from_secs(2))
130                 .await
131             {
132                 Ok(data) => {
133                     self.bytesio_data = data;
134                 }
135                 Err(err) => {
136                     self.common
137                         .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone())
138                         .await?;
139 
140                     return Err(SessionError {
141                         value: SessionErrorValue::BytesIOError(err),
142                     });
143                 }
144             }
145 
146             self.unpacketizer.extend_data(&self.bytesio_data[..]);
147         }
148 
149         self.has_remaing_data = false;
150 
151         loop {
152             let result = self.unpacketizer.read_chunks();
153 
154             if let Ok(rv) = result {
155                 match rv {
156                     UnpackResult::Chunks(chunks) => {
157                         for chunk_info in chunks {
158                             let timestamp = chunk_info.message_header.timestamp;
159                             let msg_stream_id = chunk_info.message_header.msg_streamd_id;
160 
161                             let mut msg = MessageParser::new(chunk_info).parse()?;
162                             self.process_messages(&mut msg, &msg_stream_id, &timestamp)
163                                 .await?;
164                         }
165                     }
166                     _ => {}
167                 }
168             } else {
169                 break;
170             }
171         }
172         Ok(())
173     }
174 
175     async fn play(&mut self) -> Result<(), SessionError> {
176         match self.common.send_channel_data().await {
177             Ok(_) => {}
178 
179             Err(err) => {
180                 self.common
181                     .unsubscribe_from_channels(
182                         self.app_name.clone(),
183                         self.stream_name.clone(),
184                         self.subscriber_id,
185                     )
186                     .await?;
187                 return Err(err);
188             }
189         }
190 
191         Ok(())
192     }
193 
194     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
195         let mut controlmessage =
196             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
197         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
198 
199         Ok(())
200     }
201 
202     pub async fn process_messages(
203         &mut self,
204         rtmp_msg: &mut RtmpMessageData,
205         msg_stream_id: &u32,
206         timestamp: &u32,
207     ) -> Result<(), SessionError> {
208         match rtmp_msg {
209             RtmpMessageData::Amf0Command {
210                 command_name,
211                 transaction_id,
212                 command_object,
213                 others,
214             } => {
215                 self.on_amf0_command_message(
216                     msg_stream_id,
217                     command_name,
218                     transaction_id,
219                     command_object,
220                     others,
221                 )
222                 .await?
223             }
224             RtmpMessageData::SetChunkSize { chunk_size } => {
225                 self.on_set_chunk_size(*chunk_size as usize)?;
226             }
227             RtmpMessageData::AudioData { data } => {
228                 self.common.on_audio_data(data, timestamp)?;
229             }
230             RtmpMessageData::VideoData { data } => {
231                 self.common.on_video_data(data, timestamp)?;
232             }
233             RtmpMessageData::AmfData { raw_data } => {
234                 self.common.on_meta_data(raw_data, timestamp)?;
235             }
236 
237             _ => {}
238         }
239         Ok(())
240     }
241 
242     pub async fn on_amf0_command_message(
243         &mut self,
244         stream_id: &u32,
245         command_name: &Amf0ValueType,
246         transaction_id: &Amf0ValueType,
247         command_object: &Amf0ValueType,
248         others: &mut Vec<Amf0ValueType>,
249     ) -> Result<(), SessionError> {
250         let empty_cmd_name = &String::new();
251         let cmd_name = match command_name {
252             Amf0ValueType::UTF8String(str) => str,
253             _ => empty_cmd_name,
254         };
255 
256         let transaction_id = match transaction_id {
257             Amf0ValueType::Number(number) => number,
258             _ => &0.0,
259         };
260 
261         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
262         let obj = match command_object {
263             Amf0ValueType::Object(obj) => obj,
264             _ => &empty_cmd_obj,
265         };
266 
267         match cmd_name.as_str() {
268             "connect" => {
269                 log::info!("[ S<-C ] [connect] ");
270                 self.on_connect(transaction_id, obj).await?;
271             }
272             "createStream" => {
273                 log::info!("[ S<-C ] [create stream] ");
274                 self.on_create_stream(transaction_id).await?;
275             }
276             "deleteStream" => {
277                 if !others.is_empty() {
278                     let stream_id = match others.pop() {
279                         Some(val) => match val {
280                             Amf0ValueType::Number(streamid) => streamid,
281                             _ => 0.0,
282                         },
283                         _ => 0.0,
284                     };
285                     log::info!(
286                         "[ S<-C ] [delete stream] app_name: {}, stream_name: {}",
287                         self.app_name,
288                         self.stream_name
289                     );
290 
291                     self.on_delete_stream(transaction_id, &stream_id).await?;
292                 }
293             }
294             "play" => {
295                 log::info!(
296                     "[ S<-C ] [play]  app_name: {}, stream_name: {}",
297                     self.app_name,
298                     self.stream_name
299                 );
300                 self.unpacketizer.session_type = config::SERVER_PULL;
301                 self.on_play(transaction_id, stream_id, others).await?;
302             }
303             "publish" => {
304                 self.unpacketizer.session_type = config::SERVER_PUSH;
305                 self.on_publish(transaction_id, stream_id, others).await?;
306             }
307             _ => {}
308         }
309 
310         Ok(())
311     }
312 
313     fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> {
314         log::info!(
315             "[ S<-C ] [set chunk size]  app_name: {}, stream_name: {}, chunk size: {}",
316             self.app_name,
317             self.stream_name,
318             chunk_size
319         );
320         self.unpacketizer.update_max_chunk_size(chunk_size);
321         Ok(())
322     }
323 
324     async fn on_connect(
325         &mut self,
326         transaction_id: &f64,
327         command_obj: &HashMap<String, Amf0ValueType>,
328     ) -> Result<(), SessionError> {
329         self.connect_command_object = Some(command_obj.clone());
330         let mut control_message =
331             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
332         log::info!("[ S->C ] [set window_acknowledgement_size]");
333         control_message
334             .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)
335             .await?;
336 
337         log::info!("[ S->C ] [set set_peer_bandwidth]",);
338         control_message
339             .write_set_peer_bandwidth(
340                 define::PEER_BANDWIDTH,
341                 define::peer_bandwidth_limit_type::DYNAMIC,
342             )
343             .await?;
344 
345         let obj_encoding = command_obj.get("objectEncoding");
346         let encoding = match obj_encoding {
347             Some(Amf0ValueType::Number(encoding)) => encoding,
348             _ => &define::OBJENCODING_AMF0,
349         };
350 
351         let app_name = command_obj.get("app");
352         self.app_name = match app_name {
353             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
354             _ => {
355                 return Err(SessionError {
356                     value: SessionErrorValue::NoAppName,
357                 });
358             }
359         };
360 
361         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
362         log::info!("[ S->C ] [set connect_response]",);
363         netconnection
364             .write_connect_response(
365                 transaction_id,
366                 &define::FMSVER.to_string(),
367                 &define::CAPABILITIES,
368                 &String::from("NetConnection.Connect.Success"),
369                 &define::LEVEL.to_string(),
370                 &String::from("Connection Succeeded."),
371                 encoding,
372             )
373             .await?;
374 
375         Ok(())
376     }
377 
378     pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
379         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
380         netconnection
381             .write_create_stream_response(transaction_id, &define::STREAM_ID)
382             .await?;
383 
384         log::info!(
385             "[ S->C ] [create_stream_response]  app_name: {}",
386             self.app_name,
387         );
388 
389         Ok(())
390     }
391 
392     pub async fn on_delete_stream(
393         &mut self,
394         transaction_id: &f64,
395         stream_id: &f64,
396     ) -> Result<(), SessionError> {
397         self.common
398             .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone())
399             .await?;
400 
401         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
402         netstream
403             .write_on_status(
404                 transaction_id,
405                 &"status".to_string(),
406                 &"NetStream.DeleteStream.Suceess".to_string(),
407                 &"".to_string(),
408             )
409             .await?;
410 
411         //self.unsubscribe_from_channels().await?;
412         log::info!(
413             "[ S->C ] [delete stream success]  app_name: {}, stream_name: {}",
414             self.app_name,
415             self.stream_name
416         );
417         log::trace!("{}", stream_id);
418 
419         Ok(())
420     }
421     pub async fn on_play(
422         &mut self,
423         transaction_id: &f64,
424         stream_id: &u32,
425         other_values: &mut Vec<Amf0ValueType>,
426     ) -> Result<(), SessionError> {
427         let length = other_values.len() as u8;
428         let mut index: u8 = 0;
429 
430         let mut stream_name: Option<String> = None;
431         let mut start: Option<f64> = None;
432         let mut duration: Option<f64> = None;
433         let mut reset: Option<bool> = None;
434 
435         loop {
436             if index >= length {
437                 break;
438             }
439             index += 1;
440             stream_name = match other_values.remove(0) {
441                 Amf0ValueType::UTF8String(val) => Some(val),
442                 _ => None,
443             };
444 
445             if index >= length {
446                 break;
447             }
448             index += 1;
449             start = match other_values.remove(0) {
450                 Amf0ValueType::Number(val) => Some(val),
451                 _ => None,
452             };
453 
454             if index >= length {
455                 break;
456             }
457             index += 1;
458             duration = match other_values.remove(0) {
459                 Amf0ValueType::Number(val) => Some(val),
460                 _ => None,
461             };
462 
463             if index >= length {
464                 break;
465             }
466             //index = index + 1;
467             reset = match other_values.remove(0) {
468                 Amf0ValueType::Boolean(val) => Some(val),
469                 _ => None,
470             };
471             break;
472         }
473 
474         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
475         event_messages.write_stream_begin(*stream_id).await?;
476         log::info!(
477             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}",
478             self.app_name,
479             self.stream_name
480         );
481         log::trace!(
482             "{} {} {}",
483             start.is_some(),
484             duration.is_some(),
485             reset.is_some()
486         );
487 
488         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
489         netstream
490             .write_on_status(
491                 transaction_id,
492                 &"status".to_string(),
493                 &"NetStream.Play.Reset".to_string(),
494                 &"reset".to_string(),
495             )
496             .await?;
497 
498         netstream
499             .write_on_status(
500                 transaction_id,
501                 &"status".to_string(),
502                 &"NetStream.Play.Start".to_string(),
503                 &"play start".to_string(),
504             )
505             .await?;
506 
507         netstream
508             .write_on_status(
509                 transaction_id,
510                 &"status".to_string(),
511                 &"NetStream.Data.Start".to_string(),
512                 &"data start.".to_string(),
513             )
514             .await?;
515 
516         netstream
517             .write_on_status(
518                 transaction_id,
519                 &"status".to_string(),
520                 &"NetStream.Play.PublishNotify".to_string(),
521                 &"play publish notify.".to_string(),
522             )
523             .await?;
524 
525         event_messages
526             .write_stream_is_record(*stream_id)
527             .await?;
528         log::info!(
529             "[ S->C ] [stream is record]  app_name: {}, stream_name: {}",
530             self.app_name,
531             self.stream_name
532         );
533         self.stream_name = stream_name.clone().unwrap();
534         self.common
535             .subscribe_from_channels(
536                 self.app_name.clone(),
537                 stream_name.unwrap(),
538                 self.subscriber_id,
539             )
540             .await?;
541 
542         self.state = ServerSessionState::Play;
543 
544         Ok(())
545     }
546 
547     pub async fn on_publish(
548         &mut self,
549         transaction_id: &f64,
550         stream_id: &u32,
551         other_values: &mut Vec<Amf0ValueType>,
552     ) -> Result<(), SessionError> {
553         let length = other_values.len();
554 
555         if length < 2 {
556             return Err(SessionError {
557                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
558             });
559         }
560 
561         let stream_name = match other_values.remove(0) {
562             Amf0ValueType::UTF8String(val) => val,
563             _ => {
564                 return Err(SessionError {
565                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
566                 });
567             }
568         };
569 
570         self.stream_name = stream_name;
571 
572         let _ = match other_values.remove(0) {
573             Amf0ValueType::UTF8String(val) => val,
574             _ => {
575                 return Err(SessionError {
576                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
577                 });
578             }
579         };
580 
581         log::info!(
582             "[ S<-C ] [publish]  app_name: {}, stream_name: {}",
583             self.app_name,
584             self.stream_name
585         );
586 
587         log::info!(
588             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}",
589             self.app_name,
590             self.stream_name
591         );
592 
593         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
594         event_messages.write_stream_begin(*stream_id).await?;
595 
596         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
597         netstream
598             .write_on_status(
599                 transaction_id,
600                 &"status".to_string(),
601                 &"NetStream.Publish.Start".to_string(),
602                 &"".to_string(),
603             )
604             .await?;
605         log::info!(
606             "[ S->C ] [NetStream.Publish.Start]  app_name: {}, stream_name: {}",
607             self.app_name,
608             self.stream_name
609         );
610 
611         self.common
612             .publish_to_channels(self.app_name.clone(), self.stream_name.clone())
613             .await?;
614 
615         Ok(())
616     }
617 }
618