1 use {
2     super::{
3         common::Common,
4         define,
5         define::SessionType,
6         errors::{SessionError, SessionErrorValue},
7     },
8     crate::{
9         amf0::Amf0ValueType,
10         channels::define::ChannelEventProducer,
11         chunk::{
12             define::{chunk_type, csid_type, CHUNK_SIZE},
13             packetizer::ChunkPacketizer,
14             unpacketizer::{ChunkUnpacketizer, UnpackResult},
15             ChunkInfo,
16         },
17         config,
18         handshake::handshake::{HandshakeServer, ServerHandshakeState},
19         messages::{
20             define::{msg_type_id, RtmpMessageData},
21             parser::MessageParser,
22         },
23         netconnection::commands::NetConnection,
24         netstream::writer::NetStreamWriter,
25         protocol_control_messages::writer::ProtocolControlMessagesWriter,
26         user_control_messages::writer::EventMessagesWriter,
27     },
28     bytes::BytesMut,
29     networkio::{
30         bytes_writer::{AsyncBytesWriter, BytesWriter},
31         networkio::NetworkIO,
32     },
33     std::{collections::HashMap, sync::Arc},
34     tokio::{net::TcpStream, sync::Mutex},
35 };
36 
37 enum ServerSessionState {
38     Handshake,
39     ReadChunk,
40     // OnConnect,
41     // OnCreateStream,
42     //Publish,
43     Play,
44 }
45 
46 pub struct ServerSession {
47     app_name: String,
48     stream_name: String,
49 
50     io: Arc<Mutex<NetworkIO>>,
51     handshaker: HandshakeServer,
52 
53     packetizer: ChunkPacketizer,
54     unpacketizer: ChunkUnpacketizer,
55 
56     state: ServerSessionState,
57 
58     common: Common,
59 
60     netio_data: BytesMut,
61     need_process: bool,
62 
63     pub session_id: u64,
64     pub session_type: u8,
65 
66     connect_command_object: Option<HashMap<String, Amf0ValueType>>,
67 }
68 
69 impl ServerSession {
70     pub fn new(stream: TcpStream, event_producer: ChannelEventProducer, session_id: u64) -> Self {
71         let net_io = Arc::new(Mutex::new(NetworkIO::new(stream)));
72 
73         Self {
74             app_name: String::from(""),
75             stream_name: String::from(""),
76 
77             io: Arc::clone(&net_io),
78             handshaker: HandshakeServer::new(Arc::clone(&net_io)),
79 
80             packetizer: ChunkPacketizer::new(Arc::clone(&net_io)),
81             unpacketizer: ChunkUnpacketizer::new(),
82 
83             state: ServerSessionState::Handshake,
84 
85             common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Server),
86 
87             session_id: session_id,
88             netio_data: BytesMut::new(),
89             need_process: false,
90             session_type: 0,
91             connect_command_object: None,
92         }
93     }
94 
95     pub async fn run(&mut self) -> Result<(), SessionError> {
96         loop {
97             match self.state {
98                 ServerSessionState::Handshake => {
99                     self.handshake().await?;
100                 }
101                 ServerSessionState::ReadChunk => {
102                     self.read_parse_chunks().await?;
103                 }
104                 ServerSessionState::Play => {
105                     self.play().await?;
106                 }
107             }
108         }
109 
110         //Ok(())
111     }
112 
113     async fn handshake(&mut self) -> Result<(), SessionError> {
114         self.netio_data = self.io.lock().await.read().await?;
115         self.handshaker.extend_data(&self.netio_data[..]);
116         self.handshaker.handshake().await?;
117 
118         match self.handshaker.state() {
119             ServerHandshakeState::Finish => {
120                 self.state = ServerSessionState::ReadChunk;
121 
122                 let left_bytes = self.handshaker.get_remaining_bytes();
123                 if left_bytes.len() > 0 {
124                     self.unpacketizer.extend_data(&left_bytes[..]);
125                     self.need_process = true;
126                 }
127                 self.send_set_chunk_size().await?;
128                 return Ok(());
129             }
130             _ => {}
131         }
132 
133         Ok(())
134     }
135 
136     async fn read_parse_chunks(&mut self) -> Result<(), SessionError> {
137         if !self.need_process {
138             self.netio_data = self.io.lock().await.read().await?;
139             self.unpacketizer.extend_data(&self.netio_data[..]);
140         }
141 
142         self.need_process = false;
143 
144         loop {
145             let result = self.unpacketizer.read_chunks();
146 
147             if let Ok(rv) = result {
148                 match rv {
149                     UnpackResult::Chunks(chunks) => {
150                         for chunk_info in chunks.iter() {
151                             let mut msg = MessageParser::new(chunk_info.clone(), self.session_type)
152                                 .parse()?;
153 
154                             let msg_stream_id = chunk_info.message_header.msg_streamd_id;
155                             let timestamp = chunk_info.message_header.timestamp;
156                             self.process_messages(&mut msg, &msg_stream_id, &timestamp)
157                                 .await?;
158                         }
159                     }
160                     _ => {}
161                 }
162             } else {
163                 break;
164             }
165         }
166         Ok(())
167     }
168 
169     async fn play(&mut self) -> Result<(), SessionError> {
170         match self.common.send_channel_data().await {
171             Ok(_) => {}
172 
173             Err(err) => {
174                 self.common
175                     .unsubscribe_from_channels(
176                         self.app_name.clone(),
177                         self.stream_name.clone(),
178                         self.session_id,
179                     )
180                     .await?;
181                 return Err(err);
182             }
183         }
184 
185         Ok(())
186     }
187 
188     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
189         let mut controlmessage =
190             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
191         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
192 
193         Ok(())
194     }
195 
196     pub async fn process_messages(
197         &mut self,
198         rtmp_msg: &mut RtmpMessageData,
199         msg_stream_id: &u32,
200         timestamp: &u32,
201     ) -> Result<(), SessionError> {
202         match rtmp_msg {
203             RtmpMessageData::Amf0Command {
204                 command_name,
205                 transaction_id,
206                 command_object,
207                 others,
208             } => {
209                 self.on_amf0_command_message(
210                     msg_stream_id,
211                     command_name,
212                     transaction_id,
213                     command_object,
214                     others,
215                 )
216                 .await?
217             }
218             RtmpMessageData::SetChunkSize { chunk_size } => {
219                 self.on_set_chunk_size(chunk_size.clone() as usize)?;
220             }
221             RtmpMessageData::AudioData { data } => {
222                 self.common.on_audio_data(data, timestamp)?;
223             }
224             RtmpMessageData::VideoData { data } => {
225                 self.common.on_video_data(data, timestamp)?;
226             }
227             RtmpMessageData::AmfData { raw_data } => {
228                 self.common.on_meta_data(raw_data, timestamp)?;
229             }
230 
231             _ => {}
232         }
233         Ok(())
234     }
235 
236     pub async fn on_amf0_command_message(
237         &mut self,
238         stream_id: &u32,
239         command_name: &Amf0ValueType,
240         transaction_id: &Amf0ValueType,
241         command_object: &Amf0ValueType,
242         others: &mut Vec<Amf0ValueType>,
243     ) -> Result<(), SessionError> {
244         let empty_cmd_name = &String::new();
245         let cmd_name = match command_name {
246             Amf0ValueType::UTF8String(str) => str,
247             _ => empty_cmd_name,
248         };
249 
250         let transaction_id = match transaction_id {
251             Amf0ValueType::Number(number) => number,
252             _ => &0.0,
253         };
254 
255         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
256         let obj = match command_object {
257             Amf0ValueType::Object(obj) => obj,
258             _ => &empty_cmd_obj,
259         };
260 
261         match cmd_name.as_str() {
262             "connect" => {
263                 self.on_connect(&transaction_id, &obj).await?;
264             }
265             "createStream" => {
266                 self.on_create_stream(transaction_id).await?;
267             }
268             "deleteStream" => {
269                 if others.len() > 0 {
270                     let stream_id = match others.pop() {
271                         Some(val) => match val {
272                             Amf0ValueType::Number(streamid) => streamid,
273                             _ => 0.0,
274                         },
275                         _ => 0.0,
276                     };
277                     self.on_delete_stream(transaction_id, &stream_id).await?;
278                 }
279             }
280             "play" => {
281                 self.session_type = config::SERVER_PULL;
282                 self.unpacketizer.session_type = config::SERVER_PULL;
283                 self.on_play(transaction_id, stream_id, others).await?;
284             }
285             "publish" => {
286                 self.session_type = config::SERVER_PUSH;
287                 self.unpacketizer.session_type = config::SERVER_PUSH;
288                 self.on_publish(transaction_id, stream_id, others).await?;
289             }
290             _ => {}
291         }
292 
293         Ok(())
294     }
295 
296     fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> {
297         self.unpacketizer.update_max_chunk_size(chunk_size);
298         Ok(())
299     }
300 
301     async fn on_connect(
302         &mut self,
303         transaction_id: &f64,
304         command_obj: &HashMap<String, Amf0ValueType>,
305     ) -> Result<(), SessionError> {
306         log::info!(
307             "[ C->S ] connect and the transaction id: {}",
308             transaction_id
309         );
310 
311         self.connect_command_object = Some(command_obj.clone());
312         let mut control_message =
313             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
314         control_message
315             .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)
316             .await?;
317         control_message
318             .write_set_peer_bandwidth(
319                 define::PEER_BANDWIDTH,
320                 define::peer_bandwidth_limit_type::DYNAMIC,
321             )
322             .await?;
323         //control_message.write_set_chunk_size(CHUNK_SIZE).await?;
324 
325         let obj_encoding = command_obj.get("objectEncoding");
326         let encoding = match obj_encoding {
327             Some(Amf0ValueType::Number(encoding)) => encoding,
328             _ => &define::OBJENCODING_AMF0,
329         };
330 
331         let app_name = command_obj.get("app");
332         self.app_name = match app_name {
333             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
334             _ => {
335                 return Err(SessionError {
336                     value: SessionErrorValue::NoAppName,
337                 });
338             }
339         };
340 
341         let mut netconnection = NetConnection::new(BytesWriter::new());
342         let data = netconnection.connect_response(
343             &transaction_id,
344             &define::FMSVER.to_string(),
345             &define::CAPABILITIES,
346             &String::from("NetConnection.Connect.Success"),
347             &define::LEVEL.to_string(),
348             &String::from("Connection Succeeded."),
349             encoding,
350         )?;
351 
352         let mut chunk_info = ChunkInfo::new(
353             csid_type::COMMAND_AMF0_AMF3,
354             chunk_type::TYPE_0,
355             0,
356             data.len() as u32,
357             msg_type_id::COMMAND_AMF0,
358             0,
359             data,
360         );
361 
362         self.packetizer.write_chunk(&mut chunk_info).await?;
363 
364         Ok(())
365     }
366 
367     pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
368         log::info!(
369             "[ C->S ] create stream and the transaction id: {}",
370             transaction_id
371         );
372 
373         let mut netconnection = NetConnection::new(BytesWriter::new());
374         let data = netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?;
375 
376         let mut chunk_info = ChunkInfo::new(
377             csid_type::COMMAND_AMF0_AMF3,
378             chunk_type::TYPE_0,
379             0,
380             data.len() as u32,
381             msg_type_id::COMMAND_AMF0,
382             0,
383             data,
384         );
385 
386         self.packetizer.write_chunk(&mut chunk_info).await?;
387 
388         Ok(())
389     }
390 
391     pub async fn on_delete_stream(
392         &mut self,
393         transaction_id: &f64,
394         stream_id: &f64,
395     ) -> Result<(), SessionError> {
396         log::info!(
397             "[ C->S ] delete stream and the transaction id :{}, the stream id : {}",
398             transaction_id,
399             stream_id
400         );
401 
402         self.common
403             .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone())
404             .await?;
405 
406         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
407         netstream
408             .on_status(
409                 transaction_id,
410                 &"status".to_string(),
411                 &"NetStream.DeleteStream.Suceess".to_string(),
412                 &"".to_string(),
413             )
414             .await?;
415 
416         //self.unsubscribe_from_channels().await?;
417 
418         Ok(())
419     }
420     pub async fn on_play(
421         &mut self,
422         transaction_id: &f64,
423         stream_id: &u32,
424         other_values: &mut Vec<Amf0ValueType>,
425     ) -> Result<(), SessionError> {
426         let length = other_values.len() as u8;
427         let mut index: u8 = 0;
428 
429         let mut stream_name: Option<String> = None;
430         let mut start: Option<f64> = None;
431         let mut duration: Option<f64> = None;
432         let mut reset: Option<bool> = None;
433 
434         loop {
435             if index >= length {
436                 break;
437             }
438             index = index + 1;
439             stream_name = match other_values.remove(0) {
440                 Amf0ValueType::UTF8String(val) => Some(val),
441                 _ => None,
442             };
443 
444             if index >= length {
445                 break;
446             }
447             index = index + 1;
448             start = match other_values.remove(0) {
449                 Amf0ValueType::Number(val) => Some(val),
450                 _ => None,
451             };
452 
453             if index >= length {
454                 break;
455             }
456             index = index + 1;
457             duration = match other_values.remove(0) {
458                 Amf0ValueType::Number(val) => Some(val),
459                 _ => None,
460             };
461 
462             if index >= length {
463                 break;
464             }
465             //index = index + 1;
466             reset = match other_values.remove(0) {
467                 Amf0ValueType::Boolean(val) => Some(val),
468                 _ => None,
469             };
470             break;
471         }
472         log::info!(
473             "on_play, start: {}, duration: {}, reset: {}",
474             start.is_some(),
475             duration.is_some(),
476             reset.is_some()
477         );
478 
479         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
480         event_messages.write_stream_begin(stream_id.clone()).await?;
481 
482         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
483         netstream
484             .on_status(
485                 transaction_id,
486                 &"status".to_string(),
487                 &"NetStream.Play.Reset".to_string(),
488                 &"reset".to_string(),
489             )
490             .await?;
491 
492         netstream
493             .on_status(
494                 transaction_id,
495                 &"status".to_string(),
496                 &"NetStream.Play.Start".to_string(),
497                 &"play start".to_string(),
498             )
499             .await?;
500 
501         netstream
502             .on_status(
503                 transaction_id,
504                 &"status".to_string(),
505                 &"NetStream.Data.Start".to_string(),
506                 &"data start.".to_string(),
507             )
508             .await?;
509 
510         netstream
511             .on_status(
512                 transaction_id,
513                 &"status".to_string(),
514                 &"NetStream.Play.PublishNotify".to_string(),
515                 &"play publish notify.".to_string(),
516             )
517             .await?;
518 
519         event_messages
520             .write_stream_is_record(stream_id.clone())
521             .await?;
522 
523         self.stream_name = stream_name.clone().unwrap();
524         self.common
525             .subscribe_from_channels(self.app_name.clone(), stream_name.unwrap(), self.session_id)
526             .await?;
527 
528         self.state = ServerSessionState::Play;
529 
530         Ok(())
531     }
532 
533     pub async fn on_publish(
534         &mut self,
535         transaction_id: &f64,
536         stream_id: &u32,
537         other_values: &mut Vec<Amf0ValueType>,
538     ) -> Result<(), SessionError> {
539         let length = other_values.len();
540 
541         if length < 2 {
542             return Err(SessionError {
543                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
544             });
545         }
546 
547         let stream_name = match other_values.remove(0) {
548             Amf0ValueType::UTF8String(val) => val,
549             _ => {
550                 return Err(SessionError {
551                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
552                 });
553             }
554         };
555 
556         self.stream_name = stream_name;
557 
558         let _ = match other_values.remove(0) {
559             Amf0ValueType::UTF8String(val) => val,
560             _ => {
561                 return Err(SessionError {
562                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
563                 });
564             }
565         };
566 
567         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
568         event_messages.write_stream_begin(stream_id.clone()).await?;
569 
570         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
571         netstream
572             .on_status(
573                 transaction_id,
574                 &"status".to_string(),
575                 &"NetStream.Publish.Start".to_string(),
576                 &"".to_string(),
577             )
578             .await?;
579 
580         self.common
581             .publish_to_channels(self.app_name.clone(), self.stream_name.clone())
582             .await?;
583 
584         Ok(())
585     }
586 }
587