1 use {
2     super::{
3         common::Common,
4         define,
5         define::SessionType,
6         errors::{SessionError, SessionErrorValue},
7     },
8     crate::{
9         amf0::Amf0ValueType,
10         channels::define::ChannelEventProducer,
11         chunk::{
12             define::{chunk_type, csid_type, CHUNK_SIZE},
13             packetizer::ChunkPacketizer,
14             unpacketizer::{ChunkUnpacketizer, UnpackResult},
15             ChunkInfo,
16         },
17         config,
18         handshake::handshake::{ServerHandshakeState, SimpleHandshakeServer},
19         messages::{
20             define::{msg_type_id, RtmpMessageData},
21             parser::MessageParser,
22         },
23         netconnection::commands::NetConnection,
24         netstream::writer::NetStreamWriter,
25         protocol_control_messages::writer::ProtocolControlMessagesWriter,
26         user_control_messages::writer::EventMessagesWriter,
27     },
28     bytes::BytesMut,
29     networkio::{
30         bytes_writer::{AsyncBytesWriter, BytesWriter},
31         networkio::NetworkIO,
32     },
33     std::{collections::HashMap, sync::Arc},
34     tokio::{net::TcpStream, sync::Mutex},
35 };
36 
37 enum ServerSessionState {
38     Handshake,
39     ReadChunk,
40     // OnConnect,
41     // OnCreateStream,
42     //Publish,
43     Play,
44 }
45 
46 pub struct ServerSession {
47     app_name: String,
48     stream_name: String,
49 
50     io: Arc<Mutex<NetworkIO>>,
51     simple_handshaker: SimpleHandshakeServer,
52     //complex_handshaker: ComplexHandshakeServer,
53     packetizer: ChunkPacketizer,
54     unpacketizer: ChunkUnpacketizer,
55 
56     state: ServerSessionState,
57 
58     common: Common,
59 
60     netio_data: BytesMut,
61     need_process: bool,
62 
63     pub session_id: u64,
64     pub session_type: u8,
65 
66     connect_command_object: Option<HashMap<String, Amf0ValueType>>,
67 }
68 
69 impl ServerSession {
70     pub fn new(stream: TcpStream, event_producer: ChannelEventProducer, session_id: u64) -> Self {
71         let net_io = Arc::new(Mutex::new(NetworkIO::new(stream)));
72 
73         Self {
74             app_name: String::from(""),
75             stream_name: String::from(""),
76 
77             io: Arc::clone(&net_io),
78             simple_handshaker: SimpleHandshakeServer::new(Arc::clone(&net_io)),
79             //complex_handshaker: ComplexHandshakeServer::new(Arc::clone(&net_io)),
80             packetizer: ChunkPacketizer::new(Arc::clone(&net_io)),
81             unpacketizer: ChunkUnpacketizer::new(),
82 
83             state: ServerSessionState::Handshake,
84 
85             common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Server),
86 
87             session_id: session_id,
88             netio_data: BytesMut::new(),
89             need_process: false,
90             session_type: 0,
91             connect_command_object: None,
92         }
93     }
94 
95     pub async fn run(&mut self) -> Result<(), SessionError> {
96         loop {
97             match self.state {
98                 ServerSessionState::Handshake => {
99                     self.handshake().await?;
100                 }
101                 ServerSessionState::ReadChunk => {
102                     self.read_parse_chunks().await?;
103                 }
104                 ServerSessionState::Play => {
105                     self.play().await?;
106                 }
107             }
108         }
109 
110         //Ok(())
111     }
112 
113     async fn handshake(&mut self) -> Result<(), SessionError> {
114         self.netio_data = self.io.lock().await.read().await?;
115         self.simple_handshaker.extend_data(&self.netio_data[..]);
116         self.simple_handshaker.handshake().await?;
117 
118         match self.simple_handshaker.state {
119             ServerHandshakeState::Finish => {
120                 self.state = ServerSessionState::ReadChunk;
121 
122                 let left_bytes = self.simple_handshaker.get_remaining_bytes();
123                 if left_bytes.len() > 0 {
124                     self.unpacketizer.extend_data(&left_bytes[..]);
125                     self.need_process = true;
126                 }
127 
128                 return Ok(());
129             }
130             _ => {}
131         }
132 
133         Ok(())
134     }
135 
136     async fn read_parse_chunks(&mut self) -> Result<(), SessionError> {
137         self.send_set_chunk_size().await?;
138         if !self.need_process {
139             self.netio_data = self.io.lock().await.read().await?;
140             self.unpacketizer.extend_data(&self.netio_data[..]);
141         }
142 
143         self.need_process = false;
144 
145         loop {
146             let result = self.unpacketizer.read_chunks();
147 
148             if let Ok(rv) = result {
149                 match rv {
150                     UnpackResult::Chunks(chunks) => {
151                         for chunk_info in chunks.iter() {
152                             let mut msg = MessageParser::new(chunk_info.clone(), self.session_type)
153                                 .parse()?;
154 
155                             let msg_stream_id = chunk_info.message_header.msg_streamd_id;
156                             let timestamp = chunk_info.message_header.timestamp;
157                             self.process_messages(&mut msg, &msg_stream_id, &timestamp)
158                                 .await?;
159                         }
160                     }
161                     _ => {}
162                 }
163             } else {
164                 break;
165             }
166         }
167         Ok(())
168     }
169 
170     async fn play(&mut self) -> Result<(), SessionError> {
171         match self.common.send_channel_data().await {
172             Ok(_) => {}
173 
174             Err(err) => {
175                 self.common
176                     .unsubscribe_from_channels(
177                         self.app_name.clone(),
178                         self.stream_name.clone(),
179                         self.session_id,
180                     )
181                     .await?;
182                 return Err(err);
183             }
184         }
185 
186         Ok(())
187     }
188 
189     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
190         let mut controlmessage =
191             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
192         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
193 
194         Ok(())
195     }
196 
197     pub async fn process_messages(
198         &mut self,
199         rtmp_msg: &mut RtmpMessageData,
200         msg_stream_id: &u32,
201         timestamp: &u32,
202     ) -> Result<(), SessionError> {
203         match rtmp_msg {
204             RtmpMessageData::Amf0Command {
205                 command_name,
206                 transaction_id,
207                 command_object,
208                 others,
209             } => {
210                 self.on_amf0_command_message(
211                     msg_stream_id,
212                     command_name,
213                     transaction_id,
214                     command_object,
215                     others,
216                 )
217                 .await?
218             }
219             RtmpMessageData::SetChunkSize { chunk_size } => {
220                 self.on_set_chunk_size(chunk_size.clone() as usize)?;
221             }
222             RtmpMessageData::AudioData { data } => {
223                 self.common.on_audio_data(data, timestamp)?;
224             }
225             RtmpMessageData::VideoData { data } => {
226                 self.common.on_video_data(data, timestamp)?;
227             }
228             RtmpMessageData::AmfData { raw_data } => {
229                 self.common.on_meta_data(raw_data, timestamp)?;
230             }
231 
232             _ => {}
233         }
234         Ok(())
235     }
236 
237     pub async fn on_amf0_command_message(
238         &mut self,
239         stream_id: &u32,
240         command_name: &Amf0ValueType,
241         transaction_id: &Amf0ValueType,
242         command_object: &Amf0ValueType,
243         others: &mut Vec<Amf0ValueType>,
244     ) -> Result<(), SessionError> {
245         let empty_cmd_name = &String::new();
246         let cmd_name = match command_name {
247             Amf0ValueType::UTF8String(str) => str,
248             _ => empty_cmd_name,
249         };
250 
251         let transaction_id = match transaction_id {
252             Amf0ValueType::Number(number) => number,
253             _ => &0.0,
254         };
255 
256         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
257         let obj = match command_object {
258             Amf0ValueType::Object(obj) => obj,
259             _ => &empty_cmd_obj,
260         };
261 
262         match cmd_name.as_str() {
263             "connect" => {
264                 print!("connect .......");
265                 self.on_connect(&transaction_id, &obj).await?;
266             }
267             "createStream" => {
268                 self.on_create_stream(transaction_id).await?;
269             }
270             "deleteStream" => {
271                 print!("deletestream....\n");
272                 if others.len() > 0 {
273                     let stream_id = match others.pop() {
274                         Some(val) => match val {
275                             Amf0ValueType::Number(streamid) => streamid,
276                             _ => 0.0,
277                         },
278                         _ => 0.0,
279                     };
280                     print!("deletestream....{}\n", stream_id);
281                     self.on_delete_stream(transaction_id, &stream_id).await?;
282                 }
283             }
284             "play" => {
285                 self.session_type = config::SERVER_PULL;
286                 self.unpacketizer.session_type = config::SERVER_PULL;
287                 self.on_play(transaction_id, stream_id, others).await?;
288             }
289             "publish" => {
290                 self.session_type = config::SERVER_PUSH;
291                 self.unpacketizer.session_type = config::SERVER_PUSH;
292                 self.on_publish(transaction_id, stream_id, others).await?;
293             }
294             _ => {}
295         }
296 
297         Ok(())
298     }
299 
300     fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> {
301         self.unpacketizer.update_max_chunk_size(chunk_size);
302         Ok(())
303     }
304 
305     async fn on_connect(
306         &mut self,
307         transaction_id: &f64,
308         command_obj: &HashMap<String, Amf0ValueType>,
309     ) -> Result<(), SessionError> {
310         self.connect_command_object = Some(command_obj.clone());
311         let mut control_message =
312             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
313         control_message
314             .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)
315             .await?;
316         control_message
317             .write_set_peer_bandwidth(
318                 define::PEER_BANDWIDTH,
319                 define::peer_bandwidth_limit_type::DYNAMIC,
320             )
321             .await?;
322         //control_message.write_set_chunk_size(CHUNK_SIZE).await?;
323 
324         let obj_encoding = command_obj.get("objectEncoding");
325         let encoding = match obj_encoding {
326             Some(Amf0ValueType::Number(encoding)) => encoding,
327             _ => &define::OBJENCODING_AMF0,
328         };
329 
330         let app_name = command_obj.get("app");
331         self.app_name = match app_name {
332             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
333             _ => {
334                 return Err(SessionError {
335                     value: SessionErrorValue::NoAppName,
336                 });
337             }
338         };
339 
340         let mut netconnection = NetConnection::new(BytesWriter::new());
341         let data = netconnection.connect_response(
342             &transaction_id,
343             &define::FMSVER.to_string(),
344             &define::CAPABILITIES,
345             &String::from("NetConnection.Connect.Success"),
346             &define::LEVEL.to_string(),
347             &String::from("Connection Succeeded."),
348             encoding,
349         )?;
350 
351         let mut chunk_info = ChunkInfo::new(
352             csid_type::COMMAND_AMF0_AMF3,
353             chunk_type::TYPE_0,
354             0,
355             data.len() as u32,
356             msg_type_id::COMMAND_AMF0,
357             0,
358             data,
359         );
360 
361         self.packetizer.write_chunk(&mut chunk_info).await?;
362 
363         Ok(())
364     }
365 
366     pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
367         let mut netconnection = NetConnection::new(BytesWriter::new());
368         let data = netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?;
369 
370         let mut chunk_info = ChunkInfo::new(
371             csid_type::COMMAND_AMF0_AMF3,
372             chunk_type::TYPE_0,
373             0,
374             data.len() as u32,
375             msg_type_id::COMMAND_AMF0,
376             0,
377             data,
378         );
379 
380         self.packetizer.write_chunk(&mut chunk_info).await?;
381 
382         Ok(())
383     }
384 
385     pub async fn on_delete_stream(
386         &mut self,
387         transaction_id: &f64,
388         stream_id: &f64,
389     ) -> Result<(), SessionError> {
390         self.common
391             .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone())
392             .await?;
393 
394         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
395         netstream
396             .on_status(
397                 transaction_id,
398                 &"status".to_string(),
399                 &"NetStream.DeleteStream.Suceess".to_string(),
400                 &"".to_string(),
401             )
402             .await?;
403 
404         print!("stream id{}", stream_id);
405 
406         //self.unsubscribe_from_channels().await?;
407 
408         Ok(())
409     }
410     pub async fn on_play(
411         &mut self,
412         transaction_id: &f64,
413         stream_id: &u32,
414         other_values: &mut Vec<Amf0ValueType>,
415     ) -> Result<(), SessionError> {
416         let length = other_values.len() as u8;
417         let mut index: u8 = 0;
418 
419         let mut stream_name: Option<String> = None;
420         let mut start: Option<f64> = None;
421         let mut duration: Option<f64> = None;
422         let mut reset: Option<bool> = None;
423 
424         loop {
425             if index >= length {
426                 break;
427             }
428             index = index + 1;
429             stream_name = match other_values.remove(0) {
430                 Amf0ValueType::UTF8String(val) => Some(val),
431                 _ => None,
432             };
433 
434             if index >= length {
435                 break;
436             }
437             index = index + 1;
438             start = match other_values.remove(0) {
439                 Amf0ValueType::Number(val) => Some(val),
440                 _ => None,
441             };
442 
443             if index >= length {
444                 break;
445             }
446             index = index + 1;
447             duration = match other_values.remove(0) {
448                 Amf0ValueType::Number(val) => Some(val),
449                 _ => None,
450             };
451 
452             if index >= length {
453                 break;
454             }
455             //index = index + 1;
456             reset = match other_values.remove(0) {
457                 Amf0ValueType::Boolean(val) => Some(val),
458                 _ => None,
459             };
460             break;
461         }
462         print!("start {}", start.is_some());
463         print!("druation {}", duration.is_some());
464         print!("reset {}", reset.is_some());
465 
466         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
467         event_messages.write_stream_begin(stream_id.clone()).await?;
468 
469         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
470         netstream
471             .on_status(
472                 transaction_id,
473                 &"status".to_string(),
474                 &"NetStream.Play.Reset".to_string(),
475                 &"reset".to_string(),
476             )
477             .await?;
478 
479         netstream
480             .on_status(
481                 transaction_id,
482                 &"status".to_string(),
483                 &"NetStream.Play.Start".to_string(),
484                 &"play start".to_string(),
485             )
486             .await?;
487 
488         netstream
489             .on_status(
490                 transaction_id,
491                 &"status".to_string(),
492                 &"NetStream.Data.Start".to_string(),
493                 &"data start.".to_string(),
494             )
495             .await?;
496 
497         netstream
498             .on_status(
499                 transaction_id,
500                 &"status".to_string(),
501                 &"NetStream.Play.PublishNotify".to_string(),
502                 &"play publish notify.".to_string(),
503             )
504             .await?;
505 
506         event_messages
507             .write_stream_is_record(stream_id.clone())
508             .await?;
509 
510         self.stream_name = stream_name.clone().unwrap();
511         self.common
512             .subscribe_from_channels(self.app_name.clone(), stream_name.unwrap(), self.session_id)
513             .await?;
514 
515         self.state = ServerSessionState::Play;
516 
517         Ok(())
518     }
519 
520     pub async fn on_publish(
521         &mut self,
522         transaction_id: &f64,
523         stream_id: &u32,
524         other_values: &mut Vec<Amf0ValueType>,
525     ) -> Result<(), SessionError> {
526         let length = other_values.len();
527 
528         if length < 2 {
529             return Err(SessionError {
530                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
531             });
532         }
533 
534         let stream_name = match other_values.remove(0) {
535             Amf0ValueType::UTF8String(val) => val,
536             _ => {
537                 return Err(SessionError {
538                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
539                 });
540             }
541         };
542 
543         self.stream_name = stream_name;
544 
545         let _ = match other_values.remove(0) {
546             Amf0ValueType::UTF8String(val) => val,
547             _ => {
548                 return Err(SessionError {
549                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
550                 });
551             }
552         };
553 
554         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
555         event_messages.write_stream_begin(stream_id.clone()).await?;
556 
557         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
558         netstream
559             .on_status(
560                 transaction_id,
561                 &"status".to_string(),
562                 &"NetStream.Publish.Start".to_string(),
563                 &"".to_string(),
564             )
565             .await?;
566 
567         self.common
568             .publish_to_channels(self.app_name.clone(), self.stream_name.clone())
569             .await?;
570 
571         Ok(())
572     }
573 }
574