1 use {
2     super::{
3         define,
4         errors::{SessionError, SessionErrorValue},
5     },
6     crate::{
7         amf0::Amf0ValueType,
8         channels::define::{
9             ChannelData, ChannelDataConsumer, ChannelDataPublisher, ChannelEvent,
10             ChannelEventPublisher,
11         },
12         chunk::{
13             define::{chunk_type, csid_type, CHUNK_SIZE},
14             packetizer::ChunkPacketizer,
15             unpacketizer::{ChunkUnpacketizer, UnpackResult},
16             ChunkInfo,
17         },
18         config,
19         handshake::handshake::{ServerHandshakeState, SimpleHandshakeServer},
20         messages::{
21             define::{msg_type_id, RtmpMessageData},
22             parser::MessageParser,
23         },
24         netconnection::commands::NetConnection,
25         netstream::writer::NetStreamWriter,
26         protocol_control_messages::writer::ProtocolControlMessagesWriter,
27         user_control_messages::writer::EventMessagesWriter,
28     },
29     bytes::BytesMut,
30     netio::{
31         bytes_writer::{AsyncBytesWriter, BytesWriter},
32         netio::NetworkIO,
33     },
34     std::{collections::HashMap, sync::Arc},
35     tokio::{
36         net::TcpStream,
37         sync::{mpsc, oneshot, Mutex},
38     },
39 };
40 
41 enum ServerSessionState {
42     Handshake,
43     ReadChunk,
44     // OnConnect,
45     // OnCreateStream,
46     //Publish,
47     Play,
48 }
49 
50 pub struct ServerSession {
51     app_name: String,
52     stream_name: String,
53 
54     io: Arc<Mutex<NetworkIO>>,
55     simple_handshaker: SimpleHandshakeServer,
56     //complex_handshaker: ComplexHandshakeServer,
57     packetizer: ChunkPacketizer,
58     unpacketizer: ChunkUnpacketizer,
59 
60     state: ServerSessionState,
61 
62     event_producer: ChannelEventPublisher,
63 
64     //send video, audio or metadata from publish server session to player server sessions
65     data_producer: ChannelDataPublisher,
66     //receive video, audio or metadata from publish server session and send out to player
67     data_consumer: ChannelDataConsumer,
68 
69     netio_data: BytesMut,
70     need_process: bool,
71 
72     pub session_id: u64,
73     pub session_type: u8,
74 }
75 
76 impl ServerSession {
77     pub fn new(stream: TcpStream, event_producer: ChannelEventPublisher, session_id: u64) -> Self {
78         let net_io = Arc::new(Mutex::new(NetworkIO::new(stream)));
79         //only used for init,since I don't found a better way to deal with this.
80         let (init_producer, init_consumer) = mpsc::unbounded_channel();
81 
82         Self {
83             app_name: String::from(""),
84             stream_name: String::from(""),
85 
86             io: Arc::clone(&net_io),
87             simple_handshaker: SimpleHandshakeServer::new(Arc::clone(&net_io)),
88             //complex_handshaker: ComplexHandshakeServer::new(Arc::clone(&net_io)),
89             packetizer: ChunkPacketizer::new(Arc::clone(&net_io)),
90             unpacketizer: ChunkUnpacketizer::new(),
91 
92             state: ServerSessionState::Handshake,
93 
94             event_producer,
95             data_producer: init_producer,
96             data_consumer: init_consumer,
97             session_id: session_id,
98             netio_data: BytesMut::new(),
99             need_process: false,
100             session_type: 0,
101         }
102     }
103 
104     pub async fn run(&mut self) -> Result<(), SessionError> {
105         loop {
106             match self.state {
107                 ServerSessionState::Handshake => {
108                     self.handshake().await?;
109                 }
110                 ServerSessionState::ReadChunk => {
111                     self.read_parse_chunks().await?;
112                 }
113                 ServerSessionState::Play => {
114                     self.play().await?;
115                 }
116             }
117         }
118 
119         //Ok(())
120     }
121 
122     async fn handshake(&mut self) -> Result<(), SessionError> {
123         self.netio_data = self.io.lock().await.read().await?;
124         self.simple_handshaker.extend_data(&self.netio_data[..]);
125         self.simple_handshaker.handshake().await?;
126 
127         match self.simple_handshaker.state {
128             ServerHandshakeState::Finish => {
129                 self.state = ServerSessionState::ReadChunk;
130 
131                 let left_bytes = self.simple_handshaker.get_remaining_bytes();
132                 if left_bytes.len() > 0 {
133                     self.unpacketizer.extend_data(&left_bytes[..]);
134                     self.need_process = true;
135                 }
136 
137                 return Ok(());
138             }
139             _ => {}
140         }
141 
142         Ok(())
143     }
144 
145     async fn read_parse_chunks(&mut self) -> Result<(), SessionError> {
146         if !self.need_process {
147             self.netio_data = self.io.lock().await.read().await?;
148             self.unpacketizer.extend_data(&self.netio_data[..]);
149         }
150 
151         self.need_process = false;
152 
153         loop {
154             let result = self.unpacketizer.read_chunks();
155 
156             if let Ok(rv) = result {
157                 match rv {
158                     UnpackResult::Chunks(chunks) => {
159                         for chunk_info in chunks.iter() {
160                             let mut msg = MessageParser::new(chunk_info.clone(), self.session_type)
161                                 .parse()?;
162 
163                             let msg_stream_id = chunk_info.message_header.msg_streamd_id;
164                             let timestamp = chunk_info.message_header.timestamp;
165                             self.process_messages(&mut msg, &msg_stream_id, &timestamp)
166                                 .await?;
167                         }
168                     }
169                     _ => {}
170                 }
171             } else {
172                 break;
173             }
174         }
175         Ok(())
176     }
177 
178     async fn play(&mut self) -> Result<(), SessionError> {
179         match self.send_media_data().await {
180             Ok(_) => {}
181             Err(err) => {
182                 // let len = self.unpacketizer.reader.get_remaining_bytes().len();
183                 // print!("send meidi data err len:{}\n", len);
184 
185                 // utils::print::print(self.unpacketizer.reader.get_remaining_bytes());
186 
187                 // if len > 0 {
188                 //     self.need_process = true;
189                 // }
190 
191                 // self.state = ServerSessionState::ReadChunk;
192 
193                 self.unsubscribe_from_channels().await?;
194 
195                 return Err(err);
196             }
197         }
198 
199         Ok(())
200     }
201 
202     async fn send_media_data(&mut self) -> Result<(), SessionError> {
203         loop {
204             if let Some(data) = self.data_consumer.recv().await {
205                 match data {
206                     ChannelData::Audio { timestamp, data } => {
207                         //print!("send audio data\n");
208                         self.send_audio(data, timestamp).await?;
209                     }
210                     ChannelData::Video { timestamp, data } => {
211                         //print!("send video data\n");
212                         self.send_video(data, timestamp).await?;
213                     }
214                     ChannelData::MetaData { body } => {
215                         print!("send meta data\n");
216                         self.send_metadata(body).await?;
217                     }
218                 }
219             }
220         }
221     }
222 
223     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
224         let mut controlmessage =
225             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
226         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
227 
228         Ok(())
229     }
230     pub async fn send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> {
231         let mut chunk_info = ChunkInfo::new(
232             csid_type::AUDIO,
233             chunk_type::TYPE_0,
234             timestamp,
235             data.len() as u32,
236             msg_type_id::AUDIO,
237             0,
238             data,
239         );
240 
241         self.packetizer.write_chunk(&mut chunk_info).await?;
242 
243         Ok(())
244     }
245 
246     pub async fn send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> {
247         let mut chunk_info = ChunkInfo::new(
248             csid_type::VIDEO,
249             chunk_type::TYPE_0,
250             timestamp,
251             data.len() as u32,
252             msg_type_id::VIDEO,
253             0,
254             data,
255         );
256 
257         self.packetizer.write_chunk(&mut chunk_info).await?;
258 
259         Ok(())
260     }
261 
262     async fn send_metadata(&mut self, data: BytesMut) -> Result<(), SessionError> {
263         let mut chunk_info = ChunkInfo::new(
264             csid_type::DATA_AMF0_AMF3,
265             chunk_type::TYPE_0,
266             0,
267             data.len() as u32,
268             msg_type_id::DATA_AMF0,
269             0,
270             data,
271         );
272 
273         self.packetizer.write_chunk(&mut chunk_info).await?;
274         Ok(())
275     }
276     pub async fn process_messages(
277         &mut self,
278         rtmp_msg: &mut RtmpMessageData,
279         msg_stream_id: &u32,
280         timestamp: &u32,
281     ) -> Result<(), SessionError> {
282         match rtmp_msg {
283             RtmpMessageData::Amf0Command {
284                 command_name,
285                 transaction_id,
286                 command_object,
287                 others,
288             } => {
289                 self.on_amf0_command_message(
290                     msg_stream_id,
291                     command_name,
292                     transaction_id,
293                     command_object,
294                     others,
295                 )
296                 .await?
297             }
298             RtmpMessageData::SetChunkSize { chunk_size } => {
299                 self.on_set_chunk_size(chunk_size.clone() as usize)?;
300             }
301             RtmpMessageData::AudioData { data } => {
302                 self.on_audio_data(data, timestamp)?;
303             }
304             RtmpMessageData::VideoData { data } => {
305                 self.on_video_data(data, timestamp)?;
306             }
307             RtmpMessageData::AmfData { raw_data } => {
308                 self.on_amf_data(raw_data)?;
309             }
310 
311             _ => {}
312         }
313         Ok(())
314     }
315 
316     pub async fn on_amf0_command_message(
317         &mut self,
318         stream_id: &u32,
319         command_name: &Amf0ValueType,
320         transaction_id: &Amf0ValueType,
321         command_object: &Amf0ValueType,
322         others: &mut Vec<Amf0ValueType>,
323     ) -> Result<(), SessionError> {
324         let empty_cmd_name = &String::new();
325         let cmd_name = match command_name {
326             Amf0ValueType::UTF8String(str) => str,
327             _ => empty_cmd_name,
328         };
329 
330         let transaction_id = match transaction_id {
331             Amf0ValueType::Number(number) => number,
332             _ => &0.0,
333         };
334 
335         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
336         let obj = match command_object {
337             Amf0ValueType::Object(obj) => obj,
338             _ => &empty_cmd_obj,
339         };
340 
341         match cmd_name.as_str() {
342             "connect" => {
343                 print!("connect .......");
344                 self.on_connect(&transaction_id, &obj).await?;
345             }
346             "createStream" => {
347                 self.on_create_stream(transaction_id).await?;
348             }
349             "deleteStream" => {
350                 print!("deletestream....\n");
351                 if others.len() > 0 {
352                     let stream_id = match others.pop() {
353                         Some(val) => match val {
354                             Amf0ValueType::Number(streamid) => streamid,
355                             _ => 0.0,
356                         },
357                         _ => 0.0,
358                     };
359 
360                     print!("deletestream....{}\n", stream_id);
361 
362                     self.on_delete_stream(transaction_id, &stream_id).await?;
363                 }
364             }
365             "play" => {
366                 self.session_type = config::SERVER_PULL;
367                 self.unpacketizer.session_type = config::SERVER_PULL;
368                 self.on_play(transaction_id, stream_id, others).await?;
369             }
370             "publish" => {
371                 self.session_type = config::SERVER_PUSH;
372                 self.unpacketizer.session_type = config::SERVER_PUSH;
373                 self.on_publish(transaction_id, stream_id, others).await?;
374             }
375             _ => {}
376         }
377 
378         Ok(())
379     }
380 
381     fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> {
382         self.unpacketizer.update_max_chunk_size(chunk_size);
383         Ok(())
384     }
385 
386     async fn on_connect(
387         &mut self,
388         transaction_id: &f64,
389         command_obj: &HashMap<String, Amf0ValueType>,
390     ) -> Result<(), SessionError> {
391         let mut control_message =
392             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
393         control_message
394             .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)
395             .await?;
396         control_message
397             .write_set_peer_bandwidth(
398                 define::PEER_BANDWIDTH,
399                 define::peer_bandwidth_limit_type::DYNAMIC,
400             )
401             .await?;
402         control_message.write_set_chunk_size(CHUNK_SIZE).await?;
403 
404         let obj_encoding = command_obj.get("objectEncoding");
405         let encoding = match obj_encoding {
406             Some(Amf0ValueType::Number(encoding)) => encoding,
407             _ => &define::OBJENCODING_AMF0,
408         };
409 
410         let app_name = command_obj.get("app");
411         self.app_name = match app_name {
412             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
413             _ => {
414                 return Err(SessionError {
415                     value: SessionErrorValue::NoAppName,
416                 });
417             }
418         };
419 
420         let mut netconnection = NetConnection::new(BytesWriter::new());
421         let data = netconnection.connect_response(
422             &transaction_id,
423             &define::FMSVER.to_string(),
424             &define::CAPABILITIES,
425             &String::from("NetConnection.Connect.Success"),
426             &define::LEVEL.to_string(),
427             &String::from("Connection Succeeded."),
428             encoding,
429         )?;
430 
431         let mut chunk_info = ChunkInfo::new(
432             csid_type::COMMAND_AMF0_AMF3,
433             chunk_type::TYPE_0,
434             0,
435             data.len() as u32,
436             msg_type_id::COMMAND_AMF0,
437             0,
438             data,
439         );
440 
441         self.packetizer.write_chunk(&mut chunk_info).await?;
442 
443         Ok(())
444     }
445 
446     pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
447         let mut netconnection = NetConnection::new(BytesWriter::new());
448         let data = netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?;
449 
450         let mut chunk_info = ChunkInfo::new(
451             csid_type::COMMAND_AMF0_AMF3,
452             chunk_type::TYPE_0,
453             0,
454             data.len() as u32,
455             msg_type_id::COMMAND_AMF0,
456             0,
457             data,
458         );
459 
460         self.packetizer.write_chunk(&mut chunk_info).await?;
461 
462         Ok(())
463     }
464 
465     pub async fn on_delete_stream(
466         &mut self,
467         transaction_id: &f64,
468         stream_id: &f64,
469     ) -> Result<(), SessionError> {
470         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
471         netstream
472             .on_status(
473                 transaction_id,
474                 &"status".to_string(),
475                 &"NetStream.DeleteStream.Suceess".to_string(),
476                 &"".to_string(),
477             )
478             .await?;
479 
480         print!("stream id{}", stream_id);
481 
482         //self.unsubscribe_from_channels().await?;
483 
484         Ok(())
485     }
486     pub async fn on_play(
487         &mut self,
488         transaction_id: &f64,
489         stream_id: &u32,
490         other_values: &mut Vec<Amf0ValueType>,
491     ) -> Result<(), SessionError> {
492         let length = other_values.len() as u8;
493         let mut index: u8 = 0;
494 
495         let mut stream_name: Option<String> = None;
496         let mut start: Option<f64> = None;
497         let mut duration: Option<f64> = None;
498         let mut reset: Option<bool> = None;
499 
500         loop {
501             if index >= length {
502                 break;
503             }
504             index = index + 1;
505             stream_name = match other_values.remove(0) {
506                 Amf0ValueType::UTF8String(val) => Some(val),
507                 _ => None,
508             };
509 
510             if index >= length {
511                 break;
512             }
513             index = index + 1;
514             start = match other_values.remove(0) {
515                 Amf0ValueType::Number(val) => Some(val),
516                 _ => None,
517             };
518 
519             if index >= length {
520                 break;
521             }
522             index = index + 1;
523             duration = match other_values.remove(0) {
524                 Amf0ValueType::Number(val) => Some(val),
525                 _ => None,
526             };
527 
528             if index >= length {
529                 break;
530             }
531             //index = index + 1;
532             reset = match other_values.remove(0) {
533                 Amf0ValueType::Boolean(val) => Some(val),
534                 _ => None,
535             };
536             break;
537         }
538         print!("start {}", start.is_some());
539         print!("druation {}", duration.is_some());
540         print!("reset {}", reset.is_some());
541 
542         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
543         event_messages.write_stream_begin(stream_id.clone()).await?;
544 
545         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
546         netstream
547             .on_status(
548                 transaction_id,
549                 &"status".to_string(),
550                 &"NetStream.Play.Reset".to_string(),
551                 &"reset".to_string(),
552             )
553             .await?;
554 
555         netstream
556             .on_status(
557                 transaction_id,
558                 &"status".to_string(),
559                 &"NetStream.Play.Start".to_string(),
560                 &"play start".to_string(),
561             )
562             .await?;
563 
564         netstream
565             .on_status(
566                 transaction_id,
567                 &"status".to_string(),
568                 &"NetStream.Data.Start".to_string(),
569                 &"data start.".to_string(),
570             )
571             .await?;
572 
573         netstream
574             .on_status(
575                 transaction_id,
576                 &"status".to_string(),
577                 &"NetStream.Play.PublishNotify".to_string(),
578                 &"play publish notify.".to_string(),
579             )
580             .await?;
581 
582         event_messages
583             .write_stream_is_record(stream_id.clone())
584             .await?;
585 
586         self.subscribe_from_channels(stream_name.unwrap()).await?;
587         self.state = ServerSessionState::Play;
588 
589         Ok(())
590     }
591 
592     async fn unsubscribe_from_channels(&mut self) -> Result<(), SessionError> {
593         let subscribe_event = ChannelEvent::UnSubscribe {
594             app_name: self.app_name.clone(),
595             stream_name: self.stream_name.clone(),
596             session_id: self.session_id,
597         };
598 
599         let _ = self.event_producer.send(subscribe_event);
600 
601         Ok(())
602     }
603 
604     async fn subscribe_from_channels(&mut self, stream_name: String) -> Result<(), SessionError> {
605         self.stream_name = stream_name.clone();
606 
607         print!(
608             "subscribe info............{} {} {}\n",
609             self.app_name,
610             stream_name.clone(),
611             self.session_id
612         );
613 
614         let (sender, receiver) = oneshot::channel();
615         let subscribe_event = ChannelEvent::Subscribe {
616             app_name: self.app_name.clone(),
617             stream_name,
618             session_id: self.session_id,
619             responder: sender,
620         };
621 
622         let rv = self.event_producer.send(subscribe_event);
623         match rv {
624             Err(_) => {
625                 return Err(SessionError {
626                     value: SessionErrorValue::ChannelEventSendErr,
627                 })
628             }
629             _ => {}
630         }
631 
632         match receiver.await {
633             Ok(consumer) => {
634                 self.data_consumer = consumer;
635             }
636             Err(_) => {}
637         }
638         Ok(())
639     }
640 
641     pub async fn on_publish(
642         &mut self,
643         transaction_id: &f64,
644         stream_id: &u32,
645         other_values: &mut Vec<Amf0ValueType>,
646     ) -> Result<(), SessionError> {
647         let length = other_values.len();
648 
649         if length < 2 {
650             return Err(SessionError {
651                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
652             });
653         }
654 
655         let stream_name = match other_values.remove(0) {
656             Amf0ValueType::UTF8String(val) => val,
657             _ => {
658                 return Err(SessionError {
659                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
660                 });
661             }
662         };
663 
664         let _ = match other_values.remove(0) {
665             Amf0ValueType::UTF8String(val) => val,
666             _ => {
667                 return Err(SessionError {
668                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
669                 });
670             }
671         };
672 
673         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
674         event_messages.write_stream_begin(stream_id.clone()).await?;
675 
676         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
677         netstream
678             .on_status(
679                 transaction_id,
680                 &"status".to_string(),
681                 &"NetStream.Publish.Start".to_string(),
682                 &"".to_string(),
683             )
684             .await?;
685 
686         print!("before publish_to_channels\n");
687         self.publish_to_channels(stream_name).await?;
688         print!("after publish_to_channels\n");
689 
690         Ok(())
691     }
692 
693     async fn publish_to_channels(&mut self, stream_name: String) -> Result<(), SessionError> {
694         let (sender, receiver) = oneshot::channel();
695         let publish_event = ChannelEvent::Publish {
696             app_name: self.app_name.clone(),
697             stream_name,
698             responder: sender,
699         };
700 
701         let rv = self.event_producer.send(publish_event);
702         match rv {
703             Err(_) => {
704                 return Err(SessionError {
705                     value: SessionErrorValue::ChannelEventSendErr,
706                 })
707             }
708             _ => {}
709         }
710 
711         match receiver.await {
712             Ok(producer) => {
713                 print!("set producer before\n");
714                 self.data_producer = producer;
715                 print!("set producer after\n");
716             }
717             Err(_) => {}
718         }
719         Ok(())
720     }
721 
722     pub fn on_video_data(
723         &mut self,
724         data: &mut BytesMut,
725         timestamp: &u32,
726     ) -> Result<(), SessionError> {
727         let data = ChannelData::Video {
728             timestamp: timestamp.clone(),
729             data: data.clone(),
730         };
731 
732         //print!("receive video data\n");
733         match self.data_producer.send(data) {
734             Ok(_) => {}
735             Err(err) => {
736                 print!("send video err {}\n", err);
737                 return Err(SessionError {
738                     value: SessionErrorValue::SendChannelDataErr,
739                 });
740             }
741         }
742 
743         Ok(())
744     }
745 
746     pub fn on_audio_data(
747         &mut self,
748         data: &mut BytesMut,
749         timestamp: &u32,
750     ) -> Result<(), SessionError> {
751         let data = ChannelData::Audio {
752             timestamp: timestamp.clone(),
753             data: data.clone(),
754         };
755 
756         match self.data_producer.send(data) {
757             Ok(_) => {}
758             Err(err) => {
759                 print!("receive audio err {}\n", err);
760                 return Err(SessionError {
761                     value: SessionErrorValue::SendChannelDataErr,
762                 });
763             }
764         }
765 
766         Ok(())
767     }
768 
769     pub fn on_amf_data(&mut self, body: &mut BytesMut) -> Result<(), SessionError> {
770         let data = ChannelData::MetaData { body: body.clone() };
771 
772         match self.data_producer.send(data) {
773             Ok(_) => {}
774             Err(_) => {
775                 return Err(SessionError {
776                     value: SessionErrorValue::SendChannelDataErr,
777                 })
778             }
779         }
780 
781         Ok(())
782     }
783 }
784