1 use {
2     super::{
3         common::Common,
4         define,
5         define::SessionType,
6         errors::{SessionError, SessionErrorValue},
7     },
8     crate::{
9         amf0::Amf0ValueType,
10         channels::define::ChannelEventProducer,
11         chunk::{
12             define::{chunk_type, csid_type, CHUNK_SIZE},
13             packetizer::ChunkPacketizer,
14             unpacketizer::{ChunkUnpacketizer, UnpackResult},
15             ChunkInfo,
16         },
17         config,
18         handshake::handshake::{HandshakeServer, ServerHandshakeState},
19         messages::{
20             define::{msg_type_id, RtmpMessageData},
21             parser::MessageParser,
22         },
23         netconnection::commands::NetConnection,
24         netstream::writer::NetStreamWriter,
25         protocol_control_messages::writer::ProtocolControlMessagesWriter,
26         user_control_messages::writer::EventMessagesWriter,
27     },
28     bytes::BytesMut,
29     bytesio::{
30         bytes_writer::{AsyncBytesWriter, BytesWriter},
31         bytesio::BytesIO,
32     },
33     std::{collections::HashMap, sync::Arc},
34     tokio::{net::TcpStream, sync::Mutex},
35     uuid::Uuid,
36 };
37 
38 enum ServerSessionState {
39     Handshake,
40     ReadChunk,
41     // OnConnect,
42     // OnCreateStream,
43     //Publish,
44     Play,
45 }
46 
47 pub struct ServerSession {
48     pub app_name: String,
49     pub stream_name: String,
50 
51     io: Arc<Mutex<BytesIO>>,
52     handshaker: HandshakeServer,
53 
54     packetizer: ChunkPacketizer,
55     unpacketizer: ChunkUnpacketizer,
56 
57     state: ServerSessionState,
58 
59     pub common: Common,
60 
61     bytesio_data: BytesMut,
62     need_process: bool,
63 
64     /* Used to mark the subscriber's the data producer
65     in channels and delete it from map when unsubscribe
66     is called. */
67     pub subscriber_id: Uuid,
68 
69     connect_command_object: Option<HashMap<String, Amf0ValueType>>,
70 }
71 
72 impl ServerSession {
73     pub fn new(stream: TcpStream, event_producer: ChannelEventProducer) -> Self {
74         let net_io = Arc::new(Mutex::new(BytesIO::new(stream)));
75         let subscriber_id = Uuid::new_v4();
76         Self {
77             app_name: String::from(""),
78             stream_name: String::from(""),
79 
80             io: Arc::clone(&net_io),
81             handshaker: HandshakeServer::new(Arc::clone(&net_io)),
82 
83             packetizer: ChunkPacketizer::new(Arc::clone(&net_io)),
84             unpacketizer: ChunkUnpacketizer::new(),
85 
86             state: ServerSessionState::Handshake,
87 
88             common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Server),
89 
90             subscriber_id,
91             bytesio_data: BytesMut::new(),
92             need_process: false,
93 
94             connect_command_object: None,
95         }
96     }
97 
98     pub async fn run(&mut self) -> Result<(), SessionError> {
99         loop {
100             match self.state {
101                 ServerSessionState::Handshake => {
102                     self.handshake().await?;
103                 }
104                 ServerSessionState::ReadChunk => {
105                     self.read_parse_chunks().await?;
106                 }
107                 ServerSessionState::Play => {
108                     self.play().await?;
109                 }
110             }
111         }
112 
113         //Ok(())
114     }
115 
116     async fn handshake(&mut self) -> Result<(), SessionError> {
117         self.bytesio_data = self.io.lock().await.read().await?;
118         self.handshaker.extend_data(&self.bytesio_data[..]);
119         self.handshaker.handshake().await?;
120 
121         match self.handshaker.state() {
122             ServerHandshakeState::Finish => {
123                 self.state = ServerSessionState::ReadChunk;
124 
125                 let left_bytes = self.handshaker.get_remaining_bytes();
126                 if left_bytes.len() > 0 {
127                     self.unpacketizer.extend_data(&left_bytes[..]);
128                     self.need_process = true;
129                 }
130                 self.send_set_chunk_size().await?;
131                 return Ok(());
132             }
133             _ => {}
134         }
135 
136         Ok(())
137     }
138 
139     async fn read_parse_chunks(&mut self) -> Result<(), SessionError> {
140         if !self.need_process {
141             self.bytesio_data = self.io.lock().await.read().await?;
142             self.unpacketizer.extend_data(&self.bytesio_data[..]);
143         }
144 
145         self.need_process = false;
146 
147         loop {
148             let result = self.unpacketizer.read_chunks();
149 
150             if let Ok(rv) = result {
151                 match rv {
152                     UnpackResult::Chunks(chunks) => {
153                         for chunk_info in chunks.iter() {
154                             let mut msg = MessageParser::new(chunk_info.clone()).parse()?;
155 
156                             let msg_stream_id = chunk_info.message_header.msg_streamd_id;
157                             let timestamp = chunk_info.message_header.timestamp;
158                             self.process_messages(&mut msg, &msg_stream_id, &timestamp)
159                                 .await?;
160                         }
161                     }
162                     _ => {}
163                 }
164             } else {
165                 break;
166             }
167         }
168         Ok(())
169     }
170 
171     async fn play(&mut self) -> Result<(), SessionError> {
172         match self.common.send_channel_data().await {
173             Ok(_) => {}
174 
175             Err(err) => {
176                 self.common
177                     .unsubscribe_from_channels(
178                         self.app_name.clone(),
179                         self.stream_name.clone(),
180                         self.subscriber_id,
181                     )
182                     .await?;
183                 return Err(err);
184             }
185         }
186 
187         Ok(())
188     }
189 
190     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
191         let mut controlmessage =
192             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
193         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
194 
195         Ok(())
196     }
197 
198     pub async fn process_messages(
199         &mut self,
200         rtmp_msg: &mut RtmpMessageData,
201         msg_stream_id: &u32,
202         timestamp: &u32,
203     ) -> Result<(), SessionError> {
204         match rtmp_msg {
205             RtmpMessageData::Amf0Command {
206                 command_name,
207                 transaction_id,
208                 command_object,
209                 others,
210             } => {
211                 self.on_amf0_command_message(
212                     msg_stream_id,
213                     command_name,
214                     transaction_id,
215                     command_object,
216                     others,
217                 )
218                 .await?
219             }
220             RtmpMessageData::SetChunkSize { chunk_size } => {
221                 self.on_set_chunk_size(chunk_size.clone() as usize)?;
222             }
223             RtmpMessageData::AudioData { data } => {
224                 self.common.on_audio_data(data, timestamp)?;
225             }
226             RtmpMessageData::VideoData { data } => {
227                 self.common.on_video_data(data, timestamp)?;
228             }
229             RtmpMessageData::AmfData { raw_data } => {
230                 self.common.on_meta_data(raw_data, timestamp)?;
231             }
232 
233             _ => {}
234         }
235         Ok(())
236     }
237 
238     pub async fn on_amf0_command_message(
239         &mut self,
240         stream_id: &u32,
241         command_name: &Amf0ValueType,
242         transaction_id: &Amf0ValueType,
243         command_object: &Amf0ValueType,
244         others: &mut Vec<Amf0ValueType>,
245     ) -> Result<(), SessionError> {
246         let empty_cmd_name = &String::new();
247         let cmd_name = match command_name {
248             Amf0ValueType::UTF8String(str) => str,
249             _ => empty_cmd_name,
250         };
251 
252         let transaction_id = match transaction_id {
253             Amf0ValueType::Number(number) => number,
254             _ => &0.0,
255         };
256 
257         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
258         let obj = match command_object {
259             Amf0ValueType::Object(obj) => obj,
260             _ => &empty_cmd_obj,
261         };
262 
263         match cmd_name.as_str() {
264             "connect" => {
265                 log::info!("[ S<-C ] [connect] ");
266                 self.on_connect(&transaction_id, &obj).await?;
267             }
268             "createStream" => {
269                 log::info!("[ S<-C ] [create stream] ");
270                 self.on_create_stream(transaction_id).await?;
271             }
272             "deleteStream" => {
273                 if others.len() > 0 {
274                     let stream_id = match others.pop() {
275                         Some(val) => match val {
276                             Amf0ValueType::Number(streamid) => streamid,
277                             _ => 0.0,
278                         },
279                         _ => 0.0,
280                     };
281                     log::info!(
282                         "[ S<-C ] [delete stream] app_name: {}, stream_name: {}",
283                         self.app_name,
284                         self.stream_name
285                     );
286 
287                     self.on_delete_stream(transaction_id, &stream_id).await?;
288                 }
289             }
290             "play" => {
291                 log::info!(
292                     "[ S<-C ] [play]  app_name: {}, stream_name: {}",
293                     self.app_name,
294                     self.stream_name
295                 );
296                 self.unpacketizer.session_type = config::SERVER_PULL;
297                 self.on_play(transaction_id, stream_id, others).await?;
298             }
299             "publish" => {
300                 self.unpacketizer.session_type = config::SERVER_PUSH;
301                 self.on_publish(transaction_id, stream_id, others).await?;
302             }
303             _ => {}
304         }
305 
306         Ok(())
307     }
308 
309     fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> {
310         self.unpacketizer.update_max_chunk_size(chunk_size);
311         Ok(())
312     }
313 
314     async fn on_connect(
315         &mut self,
316         transaction_id: &f64,
317         command_obj: &HashMap<String, Amf0ValueType>,
318     ) -> Result<(), SessionError> {
319         self.connect_command_object = Some(command_obj.clone());
320         let mut control_message =
321             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
322         log::info!("[ S->C ] [set window_acknowledgement_size]");
323         control_message
324             .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)
325             .await?;
326         log::info!("[ S->C ] [set set_peer_bandwidth]",);
327         control_message
328             .write_set_peer_bandwidth(
329                 define::PEER_BANDWIDTH,
330                 define::peer_bandwidth_limit_type::DYNAMIC,
331             )
332             .await?;
333 
334         let obj_encoding = command_obj.get("objectEncoding");
335         let encoding = match obj_encoding {
336             Some(Amf0ValueType::Number(encoding)) => encoding,
337             _ => &define::OBJENCODING_AMF0,
338         };
339 
340         let app_name = command_obj.get("app");
341         self.app_name = match app_name {
342             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
343             _ => {
344                 return Err(SessionError {
345                     value: SessionErrorValue::NoAppName,
346                 });
347             }
348         };
349 
350         let mut netconnection = NetConnection::new(BytesWriter::new());
351         let data = netconnection.connect_response(
352             &transaction_id,
353             &define::FMSVER.to_string(),
354             &define::CAPABILITIES,
355             &String::from("NetConnection.Connect.Success"),
356             &define::LEVEL.to_string(),
357             &String::from("Connection Succeeded."),
358             encoding,
359         )?;
360 
361         let mut chunk_info = ChunkInfo::new(
362             csid_type::COMMAND_AMF0_AMF3,
363             chunk_type::TYPE_0,
364             0,
365             data.len() as u32,
366             msg_type_id::COMMAND_AMF0,
367             0,
368             data,
369         );
370 
371         self.packetizer.write_chunk(&mut chunk_info).await?;
372 
373         Ok(())
374     }
375 
376     pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
377         let mut netconnection = NetConnection::new(BytesWriter::new());
378         let data = netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?;
379 
380         let mut chunk_info = ChunkInfo::new(
381             csid_type::COMMAND_AMF0_AMF3,
382             chunk_type::TYPE_0,
383             0,
384             data.len() as u32,
385             msg_type_id::COMMAND_AMF0,
386             0,
387             data,
388         );
389 
390         self.packetizer.write_chunk(&mut chunk_info).await?;
391 
392         log::info!(
393             "[ S->C ] [create_stream_response]  app_name: {}",
394             self.app_name,
395         );
396 
397         Ok(())
398     }
399 
400     pub async fn on_delete_stream(
401         &mut self,
402         transaction_id: &f64,
403         stream_id: &f64,
404     ) -> Result<(), SessionError> {
405         self.common
406             .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone())
407             .await?;
408 
409         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
410         netstream
411             .on_status(
412                 transaction_id,
413                 &"status".to_string(),
414                 &"NetStream.DeleteStream.Suceess".to_string(),
415                 &"".to_string(),
416             )
417             .await?;
418 
419         //self.unsubscribe_from_channels().await?;
420         log::info!(
421             "[ S->C ] [delete stream success]  app_name: {}, stream_name: {}",
422             self.app_name,
423             self.stream_name
424         );
425         log::trace!("{}", stream_id);
426 
427         Ok(())
428     }
429     pub async fn on_play(
430         &mut self,
431         transaction_id: &f64,
432         stream_id: &u32,
433         other_values: &mut Vec<Amf0ValueType>,
434     ) -> Result<(), SessionError> {
435         let length = other_values.len() as u8;
436         let mut index: u8 = 0;
437 
438         let mut stream_name: Option<String> = None;
439         let mut start: Option<f64> = None;
440         let mut duration: Option<f64> = None;
441         let mut reset: Option<bool> = None;
442 
443         loop {
444             if index >= length {
445                 break;
446             }
447             index = index + 1;
448             stream_name = match other_values.remove(0) {
449                 Amf0ValueType::UTF8String(val) => Some(val),
450                 _ => None,
451             };
452 
453             if index >= length {
454                 break;
455             }
456             index = index + 1;
457             start = match other_values.remove(0) {
458                 Amf0ValueType::Number(val) => Some(val),
459                 _ => None,
460             };
461 
462             if index >= length {
463                 break;
464             }
465             index = index + 1;
466             duration = match other_values.remove(0) {
467                 Amf0ValueType::Number(val) => Some(val),
468                 _ => None,
469             };
470 
471             if index >= length {
472                 break;
473             }
474             //index = index + 1;
475             reset = match other_values.remove(0) {
476                 Amf0ValueType::Boolean(val) => Some(val),
477                 _ => None,
478             };
479             break;
480         }
481 
482         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
483         event_messages.write_stream_begin(stream_id.clone()).await?;
484         log::info!(
485             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}",
486             self.app_name,
487             self.stream_name
488         );
489         log::trace!(
490             "{} {} {}",
491             start.is_some(),
492             duration.is_some(),
493             reset.is_some()
494         );
495 
496         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
497         netstream
498             .on_status(
499                 transaction_id,
500                 &"status".to_string(),
501                 &"NetStream.Play.Reset".to_string(),
502                 &"reset".to_string(),
503             )
504             .await?;
505 
506         netstream
507             .on_status(
508                 transaction_id,
509                 &"status".to_string(),
510                 &"NetStream.Play.Start".to_string(),
511                 &"play start".to_string(),
512             )
513             .await?;
514 
515         netstream
516             .on_status(
517                 transaction_id,
518                 &"status".to_string(),
519                 &"NetStream.Data.Start".to_string(),
520                 &"data start.".to_string(),
521             )
522             .await?;
523 
524         netstream
525             .on_status(
526                 transaction_id,
527                 &"status".to_string(),
528                 &"NetStream.Play.PublishNotify".to_string(),
529                 &"play publish notify.".to_string(),
530             )
531             .await?;
532 
533         event_messages
534             .write_stream_is_record(stream_id.clone())
535             .await?;
536         log::info!(
537             "[ S->C ] [stream is record]  app_name: {}, stream_name: {}",
538             self.app_name,
539             self.stream_name
540         );
541         self.stream_name = stream_name.clone().unwrap();
542         self.common
543             .subscribe_from_channels(
544                 self.app_name.clone(),
545                 stream_name.unwrap(),
546                 self.subscriber_id,
547             )
548             .await?;
549 
550         self.state = ServerSessionState::Play;
551 
552         Ok(())
553     }
554 
555     pub async fn on_publish(
556         &mut self,
557         transaction_id: &f64,
558         stream_id: &u32,
559         other_values: &mut Vec<Amf0ValueType>,
560     ) -> Result<(), SessionError> {
561         let length = other_values.len();
562 
563         if length < 2 {
564             return Err(SessionError {
565                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
566             });
567         }
568 
569         let stream_name = match other_values.remove(0) {
570             Amf0ValueType::UTF8String(val) => val,
571             _ => {
572                 return Err(SessionError {
573                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
574                 });
575             }
576         };
577 
578         self.stream_name = stream_name;
579 
580         let _ = match other_values.remove(0) {
581             Amf0ValueType::UTF8String(val) => val,
582             _ => {
583                 return Err(SessionError {
584                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
585                 });
586             }
587         };
588 
589         log::info!(
590             "[ S<-C ] [publish]  app_name: {}, stream_name: {}",
591             self.app_name,
592             self.stream_name
593         );
594 
595         log::info!(
596             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}",
597             self.app_name,
598             self.stream_name
599         );
600 
601         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
602         event_messages.write_stream_begin(stream_id.clone()).await?;
603 
604         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
605         netstream
606             .on_status(
607                 transaction_id,
608                 &"status".to_string(),
609                 &"NetStream.Publish.Start".to_string(),
610                 &"".to_string(),
611             )
612             .await?;
613         log::info!(
614             "[ S->C ] [NetStream.Publish.Start]  app_name: {}, stream_name: {}",
615             self.app_name,
616             self.stream_name
617         );
618 
619         self.common
620             .publish_to_channels(self.app_name.clone(), self.stream_name.clone())
621             .await?;
622 
623         Ok(())
624     }
625 }
626