1 use {
2     super::{
3         common::Common,
4         define,
5         define::SessionType,
6         errors::{SessionError, SessionErrorValue},
7     },
8     crate::{
9         amf0::Amf0ValueType,
10         channels::define::{
11             ChannelData, ChannelDataConsumer, ChannelDataProducer, ChannelEvent,
12             ChannelEventProducer,
13         },
14         chunk::{
15             define::{chunk_type, csid_type, CHUNK_SIZE},
16             packetizer::ChunkPacketizer,
17             unpacketizer::{ChunkUnpacketizer, UnpackResult},
18             ChunkInfo,
19         },
20         config,
21         handshake::handshake::{ServerHandshakeState, SimpleHandshakeServer},
22         messages::{
23             define::{msg_type_id, RtmpMessageData},
24             parser::MessageParser,
25         },
26         netconnection::commands::NetConnection,
27         netstream::writer::NetStreamWriter,
28         protocol_control_messages::writer::ProtocolControlMessagesWriter,
29         user_control_messages::writer::EventMessagesWriter,
30     },
31     bytes::BytesMut,
32     networkio::{
33         bytes_writer::{AsyncBytesWriter, BytesWriter},
34         networkio::NetworkIO,
35     },
36     std::{collections::HashMap, sync::Arc},
37     tokio::{
38         net::TcpStream,
39         sync::{mpsc, oneshot, Mutex},
40     },
41 };
42 
43 enum ServerSessionState {
44     Handshake,
45     ReadChunk,
46     // OnConnect,
47     // OnCreateStream,
48     //Publish,
49     Play,
50 }
51 
52 pub struct ServerSession {
53     app_name: String,
54     stream_name: String,
55 
56     io: Arc<Mutex<NetworkIO>>,
57     simple_handshaker: SimpleHandshakeServer,
58     //complex_handshaker: ComplexHandshakeServer,
59     packetizer: ChunkPacketizer,
60     unpacketizer: ChunkUnpacketizer,
61 
62     state: ServerSessionState,
63 
64     common: Common,
65 
66     netio_data: BytesMut,
67     need_process: bool,
68 
69     pub session_id: u64,
70     pub session_type: u8,
71 
72     connect_command_object: Option<HashMap<String, Amf0ValueType>>,
73 }
74 
75 impl ServerSession {
76     pub fn new(stream: TcpStream, event_producer: ChannelEventProducer, session_id: u64) -> Self {
77         let net_io = Arc::new(Mutex::new(NetworkIO::new(stream)));
78 
79         Self {
80             app_name: String::from(""),
81             stream_name: String::from(""),
82 
83             io: Arc::clone(&net_io),
84             simple_handshaker: SimpleHandshakeServer::new(Arc::clone(&net_io)),
85             //complex_handshaker: ComplexHandshakeServer::new(Arc::clone(&net_io)),
86             packetizer: ChunkPacketizer::new(Arc::clone(&net_io)),
87             unpacketizer: ChunkUnpacketizer::new(),
88 
89             state: ServerSessionState::Handshake,
90 
91             common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Server),
92 
93             session_id: session_id,
94             netio_data: BytesMut::new(),
95             need_process: false,
96             session_type: 0,
97             connect_command_object: None,
98         }
99     }
100 
101     pub async fn run(&mut self) -> Result<(), SessionError> {
102         loop {
103             match self.state {
104                 ServerSessionState::Handshake => {
105                     self.handshake().await?;
106                 }
107                 ServerSessionState::ReadChunk => {
108                     self.read_parse_chunks().await?;
109                 }
110                 ServerSessionState::Play => {
111                     self.play().await?;
112                 }
113             }
114         }
115 
116         //Ok(())
117     }
118 
119     async fn handshake(&mut self) -> Result<(), SessionError> {
120         self.netio_data = self.io.lock().await.read().await?;
121         self.simple_handshaker.extend_data(&self.netio_data[..]);
122         self.simple_handshaker.handshake().await?;
123 
124         match self.simple_handshaker.state {
125             ServerHandshakeState::Finish => {
126                 self.state = ServerSessionState::ReadChunk;
127 
128                 let left_bytes = self.simple_handshaker.get_remaining_bytes();
129                 if left_bytes.len() > 0 {
130                     self.unpacketizer.extend_data(&left_bytes[..]);
131                     self.need_process = true;
132                 }
133 
134                 return Ok(());
135             }
136             _ => {}
137         }
138 
139         Ok(())
140     }
141 
142     async fn read_parse_chunks(&mut self) -> Result<(), SessionError> {
143         self.send_set_chunk_size().await?;
144         if !self.need_process {
145             self.netio_data = self.io.lock().await.read().await?;
146             self.unpacketizer.extend_data(&self.netio_data[..]);
147         }
148 
149         self.need_process = false;
150 
151         loop {
152             let result = self.unpacketizer.read_chunks();
153 
154             if let Ok(rv) = result {
155                 match rv {
156                     UnpackResult::Chunks(chunks) => {
157                         for chunk_info in chunks.iter() {
158                             let mut msg = MessageParser::new(chunk_info.clone(), self.session_type)
159                                 .parse()?;
160 
161                             let msg_stream_id = chunk_info.message_header.msg_streamd_id;
162                             let timestamp = chunk_info.message_header.timestamp;
163                             self.process_messages(&mut msg, &msg_stream_id, &timestamp)
164                                 .await?;
165                         }
166                     }
167                     _ => {}
168                 }
169             } else {
170                 break;
171             }
172         }
173         Ok(())
174     }
175 
176     async fn play(&mut self) -> Result<(), SessionError> {
177         match self.common.send_channel_data().await {
178             Ok(_) => {}
179 
180             Err(err) => {
181                 self.common
182                     .unsubscribe_from_channels(
183                         self.app_name.clone(),
184                         self.stream_name.clone(),
185                         self.session_id,
186                     )
187                     .await?;
188                 return Err(err);
189             }
190         }
191 
192         Ok(())
193     }
194 
195     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
196         let mut controlmessage =
197             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
198         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
199 
200         Ok(())
201     }
202 
203     pub async fn process_messages(
204         &mut self,
205         rtmp_msg: &mut RtmpMessageData,
206         msg_stream_id: &u32,
207         timestamp: &u32,
208     ) -> Result<(), SessionError> {
209         match rtmp_msg {
210             RtmpMessageData::Amf0Command {
211                 command_name,
212                 transaction_id,
213                 command_object,
214                 others,
215             } => {
216                 self.on_amf0_command_message(
217                     msg_stream_id,
218                     command_name,
219                     transaction_id,
220                     command_object,
221                     others,
222                 )
223                 .await?
224             }
225             RtmpMessageData::SetChunkSize { chunk_size } => {
226                 self.on_set_chunk_size(chunk_size.clone() as usize)?;
227             }
228             RtmpMessageData::AudioData { data } => {
229                 self.common.on_audio_data(data, timestamp)?;
230             }
231             RtmpMessageData::VideoData { data } => {
232                 self.common.on_video_data(data, timestamp)?;
233             }
234             RtmpMessageData::AmfData { raw_data } => {
235                 self.common.on_amf_data(raw_data)?;
236             }
237 
238             _ => {}
239         }
240         Ok(())
241     }
242 
243     pub async fn on_amf0_command_message(
244         &mut self,
245         stream_id: &u32,
246         command_name: &Amf0ValueType,
247         transaction_id: &Amf0ValueType,
248         command_object: &Amf0ValueType,
249         others: &mut Vec<Amf0ValueType>,
250     ) -> Result<(), SessionError> {
251         let empty_cmd_name = &String::new();
252         let cmd_name = match command_name {
253             Amf0ValueType::UTF8String(str) => str,
254             _ => empty_cmd_name,
255         };
256 
257         let transaction_id = match transaction_id {
258             Amf0ValueType::Number(number) => number,
259             _ => &0.0,
260         };
261 
262         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
263         let obj = match command_object {
264             Amf0ValueType::Object(obj) => obj,
265             _ => &empty_cmd_obj,
266         };
267 
268         match cmd_name.as_str() {
269             "connect" => {
270                 print!("connect .......");
271                 self.on_connect(&transaction_id, &obj).await?;
272             }
273             "createStream" => {
274                 self.on_create_stream(transaction_id).await?;
275             }
276             "deleteStream" => {
277                 print!("deletestream....\n");
278                 if others.len() > 0 {
279                     let stream_id = match others.pop() {
280                         Some(val) => match val {
281                             Amf0ValueType::Number(streamid) => streamid,
282                             _ => 0.0,
283                         },
284                         _ => 0.0,
285                     };
286                     print!("deletestream....{}\n", stream_id);
287                     self.on_delete_stream(transaction_id, &stream_id).await?;
288                 }
289             }
290             "play" => {
291                 self.session_type = config::SERVER_PULL;
292                 self.unpacketizer.session_type = config::SERVER_PULL;
293                 self.on_play(transaction_id, stream_id, others).await?;
294             }
295             "publish" => {
296                 self.session_type = config::SERVER_PUSH;
297                 self.unpacketizer.session_type = config::SERVER_PUSH;
298                 self.on_publish(transaction_id, stream_id, others).await?;
299             }
300             _ => {}
301         }
302 
303         Ok(())
304     }
305 
306     fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> {
307         self.unpacketizer.update_max_chunk_size(chunk_size);
308         Ok(())
309     }
310 
311     async fn on_connect(
312         &mut self,
313         transaction_id: &f64,
314         command_obj: &HashMap<String, Amf0ValueType>,
315     ) -> Result<(), SessionError> {
316         self.connect_command_object = Some(command_obj.clone());
317         let mut control_message =
318             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
319         control_message
320             .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)
321             .await?;
322         control_message
323             .write_set_peer_bandwidth(
324                 define::PEER_BANDWIDTH,
325                 define::peer_bandwidth_limit_type::DYNAMIC,
326             )
327             .await?;
328         //control_message.write_set_chunk_size(CHUNK_SIZE).await?;
329 
330         let obj_encoding = command_obj.get("objectEncoding");
331         let encoding = match obj_encoding {
332             Some(Amf0ValueType::Number(encoding)) => encoding,
333             _ => &define::OBJENCODING_AMF0,
334         };
335 
336         let app_name = command_obj.get("app");
337         self.app_name = match app_name {
338             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
339             _ => {
340                 return Err(SessionError {
341                     value: SessionErrorValue::NoAppName,
342                 });
343             }
344         };
345 
346         let mut netconnection = NetConnection::new(BytesWriter::new());
347         let data = netconnection.connect_response(
348             &transaction_id,
349             &define::FMSVER.to_string(),
350             &define::CAPABILITIES,
351             &String::from("NetConnection.Connect.Success"),
352             &define::LEVEL.to_string(),
353             &String::from("Connection Succeeded."),
354             encoding,
355         )?;
356 
357         let mut chunk_info = ChunkInfo::new(
358             csid_type::COMMAND_AMF0_AMF3,
359             chunk_type::TYPE_0,
360             0,
361             data.len() as u32,
362             msg_type_id::COMMAND_AMF0,
363             0,
364             data,
365         );
366 
367         self.packetizer.write_chunk(&mut chunk_info).await?;
368 
369         Ok(())
370     }
371 
372     pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
373         let mut netconnection = NetConnection::new(BytesWriter::new());
374         let data = netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?;
375 
376         let mut chunk_info = ChunkInfo::new(
377             csid_type::COMMAND_AMF0_AMF3,
378             chunk_type::TYPE_0,
379             0,
380             data.len() as u32,
381             msg_type_id::COMMAND_AMF0,
382             0,
383             data,
384         );
385 
386         self.packetizer.write_chunk(&mut chunk_info).await?;
387 
388         Ok(())
389     }
390 
391     pub async fn on_delete_stream(
392         &mut self,
393         transaction_id: &f64,
394         stream_id: &f64,
395     ) -> Result<(), SessionError> {
396         self.common
397             .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone())
398             .await?;
399 
400         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
401         netstream
402             .on_status(
403                 transaction_id,
404                 &"status".to_string(),
405                 &"NetStream.DeleteStream.Suceess".to_string(),
406                 &"".to_string(),
407             )
408             .await?;
409 
410         print!("stream id{}", stream_id);
411 
412         //self.unsubscribe_from_channels().await?;
413 
414         Ok(())
415     }
416     pub async fn on_play(
417         &mut self,
418         transaction_id: &f64,
419         stream_id: &u32,
420         other_values: &mut Vec<Amf0ValueType>,
421     ) -> Result<(), SessionError> {
422         let length = other_values.len() as u8;
423         let mut index: u8 = 0;
424 
425         let mut stream_name: Option<String> = None;
426         let mut start: Option<f64> = None;
427         let mut duration: Option<f64> = None;
428         let mut reset: Option<bool> = None;
429 
430         loop {
431             if index >= length {
432                 break;
433             }
434             index = index + 1;
435             stream_name = match other_values.remove(0) {
436                 Amf0ValueType::UTF8String(val) => Some(val),
437                 _ => None,
438             };
439 
440             if index >= length {
441                 break;
442             }
443             index = index + 1;
444             start = match other_values.remove(0) {
445                 Amf0ValueType::Number(val) => Some(val),
446                 _ => None,
447             };
448 
449             if index >= length {
450                 break;
451             }
452             index = index + 1;
453             duration = match other_values.remove(0) {
454                 Amf0ValueType::Number(val) => Some(val),
455                 _ => None,
456             };
457 
458             if index >= length {
459                 break;
460             }
461             //index = index + 1;
462             reset = match other_values.remove(0) {
463                 Amf0ValueType::Boolean(val) => Some(val),
464                 _ => None,
465             };
466             break;
467         }
468         print!("start {}", start.is_some());
469         print!("druation {}", duration.is_some());
470         print!("reset {}", reset.is_some());
471 
472         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
473         event_messages.write_stream_begin(stream_id.clone()).await?;
474 
475         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
476         netstream
477             .on_status(
478                 transaction_id,
479                 &"status".to_string(),
480                 &"NetStream.Play.Reset".to_string(),
481                 &"reset".to_string(),
482             )
483             .await?;
484 
485         netstream
486             .on_status(
487                 transaction_id,
488                 &"status".to_string(),
489                 &"NetStream.Play.Start".to_string(),
490                 &"play start".to_string(),
491             )
492             .await?;
493 
494         netstream
495             .on_status(
496                 transaction_id,
497                 &"status".to_string(),
498                 &"NetStream.Data.Start".to_string(),
499                 &"data start.".to_string(),
500             )
501             .await?;
502 
503         netstream
504             .on_status(
505                 transaction_id,
506                 &"status".to_string(),
507                 &"NetStream.Play.PublishNotify".to_string(),
508                 &"play publish notify.".to_string(),
509             )
510             .await?;
511 
512         event_messages
513             .write_stream_is_record(stream_id.clone())
514             .await?;
515 
516         self.stream_name = stream_name.clone().unwrap();
517         self.common
518             .subscribe_from_channels(self.app_name.clone(), stream_name.unwrap(), self.session_id)
519             .await?;
520 
521         self.state = ServerSessionState::Play;
522 
523         Ok(())
524     }
525 
526     pub async fn on_publish(
527         &mut self,
528         transaction_id: &f64,
529         stream_id: &u32,
530         other_values: &mut Vec<Amf0ValueType>,
531     ) -> Result<(), SessionError> {
532         let length = other_values.len();
533 
534         if length < 2 {
535             return Err(SessionError {
536                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
537             });
538         }
539 
540         let stream_name = match other_values.remove(0) {
541             Amf0ValueType::UTF8String(val) => val,
542             _ => {
543                 return Err(SessionError {
544                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
545                 });
546             }
547         };
548 
549         self.stream_name = stream_name;
550 
551         let _ = match other_values.remove(0) {
552             Amf0ValueType::UTF8String(val) => val,
553             _ => {
554                 return Err(SessionError {
555                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
556                 });
557             }
558         };
559 
560         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
561         event_messages.write_stream_begin(stream_id.clone()).await?;
562 
563         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
564         netstream
565             .on_status(
566                 transaction_id,
567                 &"status".to_string(),
568                 &"NetStream.Publish.Start".to_string(),
569                 &"".to_string(),
570             )
571             .await?;
572 
573         //print!("before publish_to_channels\n");
574         self.common
575             .publish_to_channels(
576                 self.app_name.clone(),
577                 self.stream_name.clone(),
578                 self.connect_command_object.clone().unwrap(),
579             )
580             .await?;
581         //print!("after publish_to_channels\n");
582 
583         Ok(())
584     }
585 }
586