1 use {
2     super::{
3         common::Common,
4         define,
5         errors::{SessionError, SessionErrorValue},
6     },
7     crate::{
8         amf0::Amf0ValueType,
9         channels::define::{
10             ChannelData, ChannelDataConsumer, ChannelDataProducer, ChannelEvent,
11             ChannelEventProducer,
12         },
13         chunk::{
14             define::{chunk_type, csid_type, CHUNK_SIZE},
15             packetizer::ChunkPacketizer,
16             unpacketizer::{ChunkUnpacketizer, UnpackResult},
17             ChunkInfo,
18         },
19         config,
20         handshake::handshake::{ServerHandshakeState, SimpleHandshakeServer},
21         messages::{
22             define::{msg_type_id, RtmpMessageData},
23             parser::MessageParser,
24         },
25         netconnection::commands::NetConnection,
26         netstream::writer::NetStreamWriter,
27         protocol_control_messages::writer::ProtocolControlMessagesWriter,
28         user_control_messages::writer::EventMessagesWriter,
29     },
30     bytes::BytesMut,
31     networkio::{
32         bytes_writer::{AsyncBytesWriter, BytesWriter},
33         networkio::NetworkIO,
34     },
35     std::{collections::HashMap, sync::Arc},
36     tokio::{
37         net::TcpStream,
38         sync::{mpsc, oneshot, Mutex},
39     },
40 };
41 
42 enum ServerSessionState {
43     Handshake,
44     ReadChunk,
45     // OnConnect,
46     // OnCreateStream,
47     //Publish,
48     Play,
49 }
50 
51 pub struct ServerSession {
52     app_name: String,
53     stream_name: String,
54 
55     io: Arc<Mutex<NetworkIO>>,
56     simple_handshaker: SimpleHandshakeServer,
57     //complex_handshaker: ComplexHandshakeServer,
58     packetizer: ChunkPacketizer,
59     unpacketizer: ChunkUnpacketizer,
60 
61     state: ServerSessionState,
62 
63     common: Common,
64 
65     netio_data: BytesMut,
66     need_process: bool,
67 
68     pub session_id: u64,
69     pub session_type: u8,
70 
71     connect_command_object: Option<HashMap<String, Amf0ValueType>>,
72 }
73 
74 impl ServerSession {
75     pub fn new(stream: TcpStream, event_producer: ChannelEventProducer, session_id: u64) -> Self {
76         let net_io = Arc::new(Mutex::new(NetworkIO::new(stream)));
77 
78         Self {
79             app_name: String::from(""),
80             stream_name: String::from(""),
81 
82             io: Arc::clone(&net_io),
83             simple_handshaker: SimpleHandshakeServer::new(Arc::clone(&net_io)),
84             //complex_handshaker: ComplexHandshakeServer::new(Arc::clone(&net_io)),
85             packetizer: ChunkPacketizer::new(Arc::clone(&net_io)),
86             unpacketizer: ChunkUnpacketizer::new(),
87 
88             state: ServerSessionState::Handshake,
89 
90             common: Common::new(Arc::clone(&net_io), event_producer),
91 
92             session_id: session_id,
93             netio_data: BytesMut::new(),
94             need_process: false,
95             session_type: 0,
96             connect_command_object: None,
97         }
98     }
99 
100     pub async fn run(&mut self) -> Result<(), SessionError> {
101         loop {
102             match self.state {
103                 ServerSessionState::Handshake => {
104                     self.handshake().await?;
105                 }
106                 ServerSessionState::ReadChunk => {
107                     self.read_parse_chunks().await?;
108                 }
109                 ServerSessionState::Play => {
110                     self.play().await?;
111                 }
112             }
113         }
114 
115         //Ok(())
116     }
117 
118     async fn handshake(&mut self) -> Result<(), SessionError> {
119         self.netio_data = self.io.lock().await.read().await?;
120         self.simple_handshaker.extend_data(&self.netio_data[..]);
121         self.simple_handshaker.handshake().await?;
122 
123         match self.simple_handshaker.state {
124             ServerHandshakeState::Finish => {
125                 self.state = ServerSessionState::ReadChunk;
126 
127                 let left_bytes = self.simple_handshaker.get_remaining_bytes();
128                 if left_bytes.len() > 0 {
129                     self.unpacketizer.extend_data(&left_bytes[..]);
130                     self.need_process = true;
131                 }
132 
133                 return Ok(());
134             }
135             _ => {}
136         }
137 
138         Ok(())
139     }
140 
141     async fn read_parse_chunks(&mut self) -> Result<(), SessionError> {
142         if !self.need_process {
143             self.netio_data = self.io.lock().await.read().await?;
144             self.unpacketizer.extend_data(&self.netio_data[..]);
145         }
146 
147         self.need_process = false;
148 
149         loop {
150             let result = self.unpacketizer.read_chunks();
151 
152             if let Ok(rv) = result {
153                 match rv {
154                     UnpackResult::Chunks(chunks) => {
155                         for chunk_info in chunks.iter() {
156                             let mut msg = MessageParser::new(chunk_info.clone(), self.session_type)
157                                 .parse()?;
158 
159                             let msg_stream_id = chunk_info.message_header.msg_streamd_id;
160                             let timestamp = chunk_info.message_header.timestamp;
161                             self.process_messages(&mut msg, &msg_stream_id, &timestamp)
162                                 .await?;
163                         }
164                     }
165                     _ => {}
166                 }
167             } else {
168                 break;
169             }
170         }
171         Ok(())
172     }
173 
174     async fn play(&mut self) -> Result<(), SessionError> {
175         match self.common.send_channel_data().await {
176             Ok(_) => {}
177 
178             Err(err) => {
179                 self.common
180                     .unsubscribe_from_channels(
181                         self.app_name.clone(),
182                         self.stream_name.clone(),
183                         self.session_id,
184                     )
185                     .await?;
186                 return Err(err);
187             }
188         }
189 
190         Ok(())
191     }
192 
193     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
194         let mut controlmessage =
195             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
196         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
197 
198         Ok(())
199     }
200 
201     pub async fn process_messages(
202         &mut self,
203         rtmp_msg: &mut RtmpMessageData,
204         msg_stream_id: &u32,
205         timestamp: &u32,
206     ) -> Result<(), SessionError> {
207         match rtmp_msg {
208             RtmpMessageData::Amf0Command {
209                 command_name,
210                 transaction_id,
211                 command_object,
212                 others,
213             } => {
214                 self.on_amf0_command_message(
215                     msg_stream_id,
216                     command_name,
217                     transaction_id,
218                     command_object,
219                     others,
220                 )
221                 .await?
222             }
223             RtmpMessageData::SetChunkSize { chunk_size } => {
224                 self.on_set_chunk_size(chunk_size.clone() as usize)?;
225             }
226             RtmpMessageData::AudioData { data } => {
227                 self.common.on_audio_data(data, timestamp)?;
228             }
229             RtmpMessageData::VideoData { data } => {
230                 self.common.on_video_data(data, timestamp)?;
231             }
232             RtmpMessageData::AmfData { raw_data } => {
233                 self.common.on_amf_data(raw_data)?;
234             }
235 
236             _ => {}
237         }
238         Ok(())
239     }
240 
241     pub async fn on_amf0_command_message(
242         &mut self,
243         stream_id: &u32,
244         command_name: &Amf0ValueType,
245         transaction_id: &Amf0ValueType,
246         command_object: &Amf0ValueType,
247         others: &mut Vec<Amf0ValueType>,
248     ) -> Result<(), SessionError> {
249         let empty_cmd_name = &String::new();
250         let cmd_name = match command_name {
251             Amf0ValueType::UTF8String(str) => str,
252             _ => empty_cmd_name,
253         };
254 
255         let transaction_id = match transaction_id {
256             Amf0ValueType::Number(number) => number,
257             _ => &0.0,
258         };
259 
260         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
261         let obj = match command_object {
262             Amf0ValueType::Object(obj) => obj,
263             _ => &empty_cmd_obj,
264         };
265 
266         match cmd_name.as_str() {
267             "connect" => {
268                 print!("connect .......");
269                 self.on_connect(&transaction_id, &obj).await?;
270             }
271             "createStream" => {
272                 self.on_create_stream(transaction_id).await?;
273             }
274             "deleteStream" => {
275                 print!("deletestream....\n");
276                 if others.len() > 0 {
277                     let stream_id = match others.pop() {
278                         Some(val) => match val {
279                             Amf0ValueType::Number(streamid) => streamid,
280                             _ => 0.0,
281                         },
282                         _ => 0.0,
283                     };
284 
285                     print!("deletestream....{}\n", stream_id);
286 
287                     self.on_delete_stream(transaction_id, &stream_id).await?;
288                 }
289             }
290             "play" => {
291                 self.session_type = config::SERVER_PULL;
292                 self.unpacketizer.session_type = config::SERVER_PULL;
293                 self.on_play(transaction_id, stream_id, others).await?;
294             }
295             "publish" => {
296                 self.session_type = config::SERVER_PUSH;
297                 self.unpacketizer.session_type = config::SERVER_PUSH;
298                 self.on_publish(transaction_id, stream_id, others).await?;
299             }
300             _ => {}
301         }
302 
303         Ok(())
304     }
305 
306     fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> {
307         self.unpacketizer.update_max_chunk_size(chunk_size);
308         Ok(())
309     }
310 
311     async fn on_connect(
312         &mut self,
313         transaction_id: &f64,
314         command_obj: &HashMap<String, Amf0ValueType>,
315     ) -> Result<(), SessionError> {
316         self.connect_command_object = Some(command_obj.clone());
317         let mut control_message =
318             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
319         control_message
320             .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)
321             .await?;
322         control_message
323             .write_set_peer_bandwidth(
324                 define::PEER_BANDWIDTH,
325                 define::peer_bandwidth_limit_type::DYNAMIC,
326             )
327             .await?;
328         control_message.write_set_chunk_size(CHUNK_SIZE).await?;
329 
330         let obj_encoding = command_obj.get("objectEncoding");
331         let encoding = match obj_encoding {
332             Some(Amf0ValueType::Number(encoding)) => encoding,
333             _ => &define::OBJENCODING_AMF0,
334         };
335 
336         let app_name = command_obj.get("app");
337         self.app_name = match app_name {
338             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
339             _ => {
340                 return Err(SessionError {
341                     value: SessionErrorValue::NoAppName,
342                 });
343             }
344         };
345 
346         let mut netconnection = NetConnection::new(BytesWriter::new());
347         let data = netconnection.connect_response(
348             &transaction_id,
349             &define::FMSVER.to_string(),
350             &define::CAPABILITIES,
351             &String::from("NetConnection.Connect.Success"),
352             &define::LEVEL.to_string(),
353             &String::from("Connection Succeeded."),
354             encoding,
355         )?;
356 
357         let mut chunk_info = ChunkInfo::new(
358             csid_type::COMMAND_AMF0_AMF3,
359             chunk_type::TYPE_0,
360             0,
361             data.len() as u32,
362             msg_type_id::COMMAND_AMF0,
363             0,
364             data,
365         );
366 
367         self.packetizer.write_chunk(&mut chunk_info).await?;
368 
369         Ok(())
370     }
371 
372     pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
373         let mut netconnection = NetConnection::new(BytesWriter::new());
374         let data = netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?;
375 
376         let mut chunk_info = ChunkInfo::new(
377             csid_type::COMMAND_AMF0_AMF3,
378             chunk_type::TYPE_0,
379             0,
380             data.len() as u32,
381             msg_type_id::COMMAND_AMF0,
382             0,
383             data,
384         );
385 
386         self.packetizer.write_chunk(&mut chunk_info).await?;
387 
388         Ok(())
389     }
390 
391     pub async fn on_delete_stream(
392         &mut self,
393         transaction_id: &f64,
394         stream_id: &f64,
395     ) -> Result<(), SessionError> {
396         self.common
397             .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone())
398             .await?;
399 
400         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
401         netstream
402             .on_status(
403                 transaction_id,
404                 &"status".to_string(),
405                 &"NetStream.DeleteStream.Suceess".to_string(),
406                 &"".to_string(),
407             )
408             .await?;
409 
410         print!("stream id{}", stream_id);
411 
412         //self.unsubscribe_from_channels().await?;
413 
414         Ok(())
415     }
416     pub async fn on_play(
417         &mut self,
418         transaction_id: &f64,
419         stream_id: &u32,
420         other_values: &mut Vec<Amf0ValueType>,
421     ) -> Result<(), SessionError> {
422         let length = other_values.len() as u8;
423         let mut index: u8 = 0;
424 
425         let mut stream_name: Option<String> = None;
426         let mut start: Option<f64> = None;
427         let mut duration: Option<f64> = None;
428         let mut reset: Option<bool> = None;
429 
430         loop {
431             if index >= length {
432                 break;
433             }
434             index = index + 1;
435             stream_name = match other_values.remove(0) {
436                 Amf0ValueType::UTF8String(val) => Some(val),
437                 _ => None,
438             };
439 
440             if index >= length {
441                 break;
442             }
443             index = index + 1;
444             start = match other_values.remove(0) {
445                 Amf0ValueType::Number(val) => Some(val),
446                 _ => None,
447             };
448 
449             if index >= length {
450                 break;
451             }
452             index = index + 1;
453             duration = match other_values.remove(0) {
454                 Amf0ValueType::Number(val) => Some(val),
455                 _ => None,
456             };
457 
458             if index >= length {
459                 break;
460             }
461             //index = index + 1;
462             reset = match other_values.remove(0) {
463                 Amf0ValueType::Boolean(val) => Some(val),
464                 _ => None,
465             };
466             break;
467         }
468         print!("start {}", start.is_some());
469         print!("druation {}", duration.is_some());
470         print!("reset {}", reset.is_some());
471 
472         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
473         event_messages.write_stream_begin(stream_id.clone()).await?;
474 
475         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
476         netstream
477             .on_status(
478                 transaction_id,
479                 &"status".to_string(),
480                 &"NetStream.Play.Reset".to_string(),
481                 &"reset".to_string(),
482             )
483             .await?;
484 
485         netstream
486             .on_status(
487                 transaction_id,
488                 &"status".to_string(),
489                 &"NetStream.Play.Start".to_string(),
490                 &"play start".to_string(),
491             )
492             .await?;
493 
494         netstream
495             .on_status(
496                 transaction_id,
497                 &"status".to_string(),
498                 &"NetStream.Data.Start".to_string(),
499                 &"data start.".to_string(),
500             )
501             .await?;
502 
503         netstream
504             .on_status(
505                 transaction_id,
506                 &"status".to_string(),
507                 &"NetStream.Play.PublishNotify".to_string(),
508                 &"play publish notify.".to_string(),
509             )
510             .await?;
511 
512         event_messages
513             .write_stream_is_record(stream_id.clone())
514             .await?;
515 
516         self.stream_name = stream_name.clone().unwrap();
517         self.common
518             .subscribe_from_channels(self.app_name.clone(), stream_name.unwrap(), self.session_id)
519             .await?;
520 
521         self.state = ServerSessionState::Play;
522 
523         Ok(())
524     }
525 
526     pub async fn on_publish(
527         &mut self,
528         transaction_id: &f64,
529         stream_id: &u32,
530         other_values: &mut Vec<Amf0ValueType>,
531     ) -> Result<(), SessionError> {
532         let length = other_values.len();
533 
534         if length < 2 {
535             return Err(SessionError {
536                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
537             });
538         }
539 
540         let stream_name = match other_values.remove(0) {
541             Amf0ValueType::UTF8String(val) => val,
542             _ => {
543                 return Err(SessionError {
544                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
545                 });
546             }
547         };
548 
549         self.stream_name = stream_name;
550 
551         let _ = match other_values.remove(0) {
552             Amf0ValueType::UTF8String(val) => val,
553             _ => {
554                 return Err(SessionError {
555                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
556                 });
557             }
558         };
559 
560         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
561         event_messages.write_stream_begin(stream_id.clone()).await?;
562 
563         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
564         netstream
565             .on_status(
566                 transaction_id,
567                 &"status".to_string(),
568                 &"NetStream.Publish.Start".to_string(),
569                 &"".to_string(),
570             )
571             .await?;
572 
573         //print!("before publish_to_channels\n");
574         self.common
575             .publish_to_channels(
576                 self.app_name.clone(),
577                 self.stream_name.clone(),
578                 self.connect_command_object.clone().unwrap(),
579             )
580             .await?;
581         //print!("after publish_to_channels\n");
582 
583         Ok(())
584     }
585 }
586