1 use {
2     super::{
3         define,
4         errors::{SessionError, SessionErrorValue},
5     },
6     crate::{
7         amf0::Amf0ValueType,
8         channels::define::{
9             ChannelData, ChannelDataConsumer, ChannelDataPublisher, ChannelEvent,
10             ChannelEventPublisher,
11         },
12         chunk::{
13             define::{chunk_type, csid_type, CHUNK_SIZE},
14             packetizer::ChunkPacketizer,
15             unpacketizer::{ChunkUnpacketizer, UnpackResult},
16             ChunkInfo,
17         },
18         handshake::handshake::{
19             ComplexHandshakeServer, ServerHandshakeState, SimpleHandshakeServer,
20         },
21         messages::{
22             define::{msg_type_id, RtmpMessageData},
23             parser::MessageParser,
24         },
25         netconnection::commands::NetConnection,
26         netstream::writer::NetStreamWriter,
27         protocol_control_messages::writer::ProtocolControlMessagesWriter,
28         user_control_messages::writer::EventMessagesWriter,
29     },
30     bytes::BytesMut,
31     netio::{
32         bytes_writer::{AsyncBytesWriter, BytesWriter},
33         netio::NetworkIO,
34     },
35     std::{collections::HashMap, sync::Arc, time::Duration},
36     tokio::{
37         net::TcpStream,
38         sync::{mpsc, oneshot, Mutex},
39     },
40 };
41 
42 enum ServerSessionState {
43     Handshake,
44     ReadChunk,
45     // OnConnect,
46     // OnCreateStream,
47     //Publish,
48     Play,
49 }
50 
51 pub struct ServerSession {
52     app_name: String,
53 
54     io: Arc<Mutex<NetworkIO>>,
55     simple_handshaker: SimpleHandshakeServer,
56     complex_handshaker: ComplexHandshakeServer,
57 
58     packetizer: ChunkPacketizer,
59     unpacketizer: ChunkUnpacketizer,
60 
61     state: ServerSessionState,
62 
63     event_producer: ChannelEventPublisher,
64 
65     //send video, audio or metadata from publish server session to player server sessions
66     data_producer: ChannelDataPublisher,
67     //receive video, audio or metadata from publish server session and send out to player
68     data_consumer: ChannelDataConsumer,
69 
70     session_id: u8,
71 }
72 
73 impl ServerSession {
74     pub fn new(
75         stream: TcpStream,
76         event_producer: ChannelEventPublisher,
77         timeout: Duration,
78         session_id: u8,
79     ) -> Self {
80         let net_io = Arc::new(Mutex::new(NetworkIO::new(stream, timeout)));
81         //only used for init,since I don't found a better way to deal with this.
82         let (init_producer, init_consumer) = mpsc::unbounded_channel();
83 
84         // let reader = BytesReader::new(BytesMut::new());
85 
86         Self {
87             app_name: String::from(""),
88 
89             io: Arc::clone(&net_io),
90             simple_handshaker: SimpleHandshakeServer::new(Arc::clone(&net_io)),
91             complex_handshaker: ComplexHandshakeServer::new(Arc::clone(&net_io)),
92             packetizer: ChunkPacketizer::new(Arc::clone(&net_io)),
93             unpacketizer: ChunkUnpacketizer::new(),
94 
95             state: ServerSessionState::Handshake,
96 
97             event_producer,
98             data_producer: init_producer,
99             data_consumer: init_consumer,
100             session_id: session_id,
101         }
102     }
103 
104     pub async fn run(&mut self) -> Result<(), SessionError> {
105         let duration = Duration::new(10, 10);
106 
107         let mut remaining_bytes: BytesMut = BytesMut::new();
108 
109         loop {
110             let mut net_io_data: BytesMut = BytesMut::new();
111             if remaining_bytes.len() <= 0 {
112                 net_io_data = self.io.lock().await.read().await?;
113 
114                 //utils::print::print(net_io_data.clone());
115             }
116 
117             match self.state {
118                 ServerSessionState::Handshake => {
119                     if self.session_id > 0 {
120                         // utils::print::printu8(net_io_data.clone());
121                     }
122 
123                     self.simple_handshaker.extend_data(&net_io_data[..]);
124                     self.simple_handshaker.handshake().await?;
125 
126                     match self.simple_handshaker.state {
127                         ServerHandshakeState::Finish => {
128                             self.state = ServerSessionState::ReadChunk;
129                             remaining_bytes = self.simple_handshaker.get_remaining_bytes();
130                         }
131                         _ => continue,
132                     }
133                 }
134                 ServerSessionState::ReadChunk => {
135                     if self.session_id > 0 {
136                         // utils::print::printu8(net_io_data.clone());
137                     }
138 
139                     if remaining_bytes.len() > 0 {
140                         let bytes = std::mem::replace(&mut remaining_bytes, BytesMut::new());
141                         self.unpacketizer.extend_data(&bytes[..]);
142                     } else {
143                         self.unpacketizer.extend_data(&net_io_data[..]);
144                     }
145 
146                     loop {
147                         let result = self.unpacketizer.read_chunks();
148                         let rv = match result {
149                             Ok(val) => val,
150                             Err(_) => {
151                                 break;
152                             }
153                         };
154 
155                         match rv {
156                             UnpackResult::Chunks(chunks) => {
157                                 for chunk_info in chunks.iter() {
158                                     let msg_stream_id = chunk_info.message_header.msg_streamd_id;
159                                     let timestamp = chunk_info.message_header.timestamp;
160 
161                                     let mut message_parser = MessageParser::new(chunk_info.clone());
162                                     let mut msg = message_parser.parse()?;
163 
164                                     self.process_messages(&mut msg, &msg_stream_id, &timestamp)
165                                         .await?;
166                                 }
167                             }
168                             _ => {}
169                         }
170                     }
171                 }
172 
173                 //when in play state, only transfer publisher's video/audio/metadta to player.
174                 ServerSessionState::Play => loop {
175                     if let Some(data) = self.data_consumer.recv().await {
176                         match data {
177                             ChannelData::Audio { timestamp, data } => {
178                                 print!("send audio data\n");
179                                 self.send_audio(data, timestamp).await?;
180                             }
181                             ChannelData::Video { timestamp, data } => {
182                                 print!("send video data\n");
183                                 self.send_video(data, timestamp).await?;
184                             }
185                             ChannelData::MetaData { body } => {
186                                 print!("send meta data\n");
187                                 self.send_metadata(body).await?;
188                             }
189                         }
190                     } else {
191                     }
192                 },
193             }
194         }
195 
196         //Ok(())
197     }
198 
199     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
200         let mut controlmessage =
201             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
202         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
203 
204         Ok(())
205     }
206     pub async fn send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> {
207         let mut chunk_info = ChunkInfo::new(
208             csid_type::AUDIO,
209             chunk_type::TYPE_0,
210             timestamp,
211             data.len() as u32,
212             msg_type_id::AUDIO,
213             0,
214             data,
215         );
216 
217         self.packetizer.write_chunk(&mut chunk_info).await?;
218 
219         Ok(())
220     }
221 
222     pub async fn send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> {
223         let mut chunk_info = ChunkInfo::new(
224             csid_type::VIDEO,
225             chunk_type::TYPE_0,
226             timestamp,
227             data.len() as u32,
228             msg_type_id::VIDEO,
229             0,
230             data,
231         );
232 
233         self.packetizer.write_chunk(&mut chunk_info).await?;
234 
235         Ok(())
236     }
237 
238     async fn send_metadata(&mut self, data: BytesMut) -> Result<(), SessionError> {
239         let mut chunk_info = ChunkInfo::new(
240             csid_type::DATA_AMF0_AMF3,
241             chunk_type::TYPE_0,
242             0,
243             data.len() as u32,
244             msg_type_id::DATA_AMF0,
245             0,
246             data,
247         );
248 
249         self.packetizer.write_chunk(&mut chunk_info).await?;
250         Ok(())
251     }
252     pub async fn process_messages(
253         &mut self,
254         rtmp_msg: &mut RtmpMessageData,
255         msg_stream_id: &u32,
256         timestamp: &u32,
257     ) -> Result<(), SessionError> {
258         match rtmp_msg {
259             RtmpMessageData::Amf0Command {
260                 command_name,
261                 transaction_id,
262                 command_object,
263                 others,
264             } => {
265                 self.process_amf0_command_message(
266                     msg_stream_id,
267                     command_name,
268                     transaction_id,
269                     command_object,
270                     others,
271                 )
272                 .await?
273             }
274             RtmpMessageData::SetChunkSize { chunk_size } => {
275                 self.on_set_chunk_size(chunk_size.clone() as usize)?;
276             }
277             RtmpMessageData::AudioData { data } => {
278                 self.on_audio_data(data, timestamp)?;
279             }
280             RtmpMessageData::VideoData { data } => {
281                 self.on_video_data(data, timestamp)?;
282             }
283             RtmpMessageData::AmfData { raw_data } => {
284                 self.on_amf_data(raw_data)?;
285             }
286 
287             _ => {}
288         }
289         Ok(())
290     }
291 
292     pub async fn process_amf0_command_message(
293         &mut self,
294         stream_id: &u32,
295         command_name: &Amf0ValueType,
296         transaction_id: &Amf0ValueType,
297         command_object: &Amf0ValueType,
298         others: &mut Vec<Amf0ValueType>,
299     ) -> Result<(), SessionError> {
300         let empty_cmd_name = &String::new();
301         let cmd_name = match command_name {
302             Amf0ValueType::UTF8String(str) => str,
303             _ => empty_cmd_name,
304         };
305 
306         let transaction_id = match transaction_id {
307             Amf0ValueType::Number(number) => number,
308             _ => &0.0,
309         };
310 
311         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
312         let obj = match command_object {
313             Amf0ValueType::Object(obj) => obj,
314             _ => &empty_cmd_obj,
315         };
316 
317         match cmd_name.as_str() {
318             "connect" => {
319                 print!("connect .......");
320                 self.on_connect(&transaction_id, &obj).await?;
321             }
322             "createStream" => {
323                 self.on_create_stream(transaction_id).await?;
324             }
325             "deleteStream" => {
326                 if others.len() > 1 {
327                     let stream_id = match others.pop() {
328                         Some(val) => match val {
329                             Amf0ValueType::Number(streamid) => streamid,
330                             _ => 0.0,
331                         },
332                         _ => 0.0,
333                     };
334 
335                     self.on_delete_stream(transaction_id, &stream_id).await?;
336                 }
337             }
338             "play" => {
339                 self.on_play(transaction_id, stream_id, others).await?;
340             }
341             "publish" => {
342                 self.on_publish(transaction_id, stream_id, others).await?;
343             }
344             _ => {}
345         }
346 
347         Ok(())
348     }
349 
350     fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> {
351         self.unpacketizer.update_max_chunk_size(chunk_size);
352         Ok(())
353     }
354 
355     async fn on_connect(
356         &mut self,
357         transaction_id: &f64,
358         command_obj: &HashMap<String, Amf0ValueType>,
359     ) -> Result<(), SessionError> {
360         let mut control_message =
361             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
362         control_message
363             .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)
364             .await?;
365         control_message
366             .write_set_peer_bandwidth(
367                 define::PEER_BANDWIDTH,
368                 define::PeerBandWidthLimitType::DYNAMIC,
369             )
370             .await?;
371         control_message.write_set_chunk_size(CHUNK_SIZE).await?;
372 
373         let obj_encoding = command_obj.get("objectEncoding");
374         let encoding = match obj_encoding {
375             Some(Amf0ValueType::Number(encoding)) => encoding,
376             _ => &define::OBJENCODING_AMF0,
377         };
378 
379         let app_name = command_obj.get("app");
380         self.app_name = match app_name {
381             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
382             _ => {
383                 return Err(SessionError {
384                     value: SessionErrorValue::NoAppName,
385                 });
386             }
387         };
388 
389         let mut netconnection = NetConnection::new(BytesWriter::new());
390         let data = netconnection.connect_response(
391             &transaction_id,
392             &define::FMSVER.to_string(),
393             &define::CAPABILITIES,
394             &String::from("NetConnection.Connect.Success"),
395             &define::LEVEL.to_string(),
396             &String::from("Connection Succeeded."),
397             encoding,
398         )?;
399 
400         let mut chunk_info = ChunkInfo::new(
401             csid_type::COMMAND_AMF0_AMF3,
402             chunk_type::TYPE_0,
403             0,
404             data.len() as u32,
405             msg_type_id::COMMAND_AMF0,
406             0,
407             data,
408         );
409 
410         self.packetizer.write_chunk(&mut chunk_info).await?;
411 
412         Ok(())
413     }
414 
415     pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
416         let mut netconnection = NetConnection::new(BytesWriter::new());
417         let data = netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?;
418 
419         let mut chunk_info = ChunkInfo::new(
420             csid_type::COMMAND_AMF0_AMF3,
421             chunk_type::TYPE_0,
422             0,
423             data.len() as u32,
424             msg_type_id::COMMAND_AMF0,
425             0,
426             data,
427         );
428 
429         self.packetizer.write_chunk(&mut chunk_info).await?;
430 
431         Ok(())
432     }
433 
434     pub async fn on_delete_stream(
435         &mut self,
436         transaction_id: &f64,
437         stream_id: &f64,
438     ) -> Result<(), SessionError> {
439         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
440         netstream
441             .on_status(
442                 transaction_id,
443                 &"status".to_string(),
444                 &"NetStream.DeleteStream.Suceess".to_string(),
445                 &"".to_string(),
446             )
447             .await?;
448 
449         Ok(())
450     }
451     pub async fn on_play(
452         &mut self,
453         transaction_id: &f64,
454         stream_id: &u32,
455         other_values: &mut Vec<Amf0ValueType>,
456     ) -> Result<(), SessionError> {
457         let length = other_values.len() as u8;
458         let mut index: u8 = 0;
459 
460         let mut stream_name: Option<String> = None;
461         let mut start: Option<f64> = None;
462         let mut duration: Option<f64> = None;
463         let mut reset: Option<bool> = None;
464 
465         loop {
466             if index >= length {
467                 break;
468             }
469             index = index + 1;
470             stream_name = match other_values.remove(0) {
471                 Amf0ValueType::UTF8String(val) => Some(val),
472                 _ => None,
473             };
474 
475             if index >= length {
476                 break;
477             }
478             index = index + 1;
479             start = match other_values.remove(0) {
480                 Amf0ValueType::Number(val) => Some(val),
481                 _ => None,
482             };
483 
484             if index >= length {
485                 break;
486             }
487             index = index + 1;
488             duration = match other_values.remove(0) {
489                 Amf0ValueType::Number(val) => Some(val),
490                 _ => None,
491             };
492 
493             if index >= length {
494                 break;
495             }
496             index = index + 1;
497             reset = match other_values.remove(0) {
498                 Amf0ValueType::Boolean(val) => Some(val),
499                 _ => None,
500             };
501             break;
502         }
503 
504         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
505         event_messages.write_stream_begin(stream_id.clone()).await?;
506 
507         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
508         netstream
509             .on_status(
510                 transaction_id,
511                 &"status".to_string(),
512                 &"NetStream.Play.Reset".to_string(),
513                 &"reset".to_string(),
514             )
515             .await?;
516 
517         netstream
518             .on_status(
519                 transaction_id,
520                 &"status".to_string(),
521                 &"NetStream.Play.Start".to_string(),
522                 &"play start".to_string(),
523             )
524             .await?;
525 
526         netstream
527             .on_status(
528                 transaction_id,
529                 &"status".to_string(),
530                 &"NetStream.Data.Start".to_string(),
531                 &"data start.".to_string(),
532             )
533             .await?;
534 
535         netstream
536             .on_status(
537                 transaction_id,
538                 &"status".to_string(),
539                 &"NetStream.Play.PublishNotify".to_string(),
540                 &"play publish notify.".to_string(),
541             )
542             .await?;
543 
544         event_messages
545             .write_stream_is_record(stream_id.clone())
546             .await?;
547 
548         self.subscribe_from_channels(stream_name.unwrap()).await?;
549         self.state = ServerSessionState::Play;
550 
551         Ok(())
552     }
553 
554     async fn subscribe_from_channels(&mut self, stream_name: String) -> Result<(), SessionError> {
555         let (sender, receiver) = oneshot::channel();
556         let subscribe_event = ChannelEvent::Subscribe {
557             app_name: self.app_name.clone(),
558             stream_name,
559             responder: sender,
560         };
561 
562         let rv = self.event_producer.send(subscribe_event);
563         match rv {
564             Err(_) => {
565                 return Err(SessionError {
566                     value: SessionErrorValue::ChannelEventSendErr,
567                 })
568             }
569             _ => {}
570         }
571 
572         match receiver.await {
573             Ok(consumer) => {
574                 self.data_consumer = consumer;
575             }
576             Err(_) => {}
577         }
578         Ok(())
579     }
580 
581     pub async fn on_publish(
582         &mut self,
583         transaction_id: &f64,
584         stream_id: &u32,
585         other_values: &mut Vec<Amf0ValueType>,
586     ) -> Result<(), SessionError> {
587         let length = other_values.len();
588 
589         if length < 2 {
590             return Err(SessionError {
591                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
592             });
593         }
594 
595         let stream_name = match other_values.remove(0) {
596             Amf0ValueType::UTF8String(val) => val,
597             _ => {
598                 return Err(SessionError {
599                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
600                 });
601             }
602         };
603 
604         let stream_type = match other_values.remove(0) {
605             Amf0ValueType::UTF8String(val) => val,
606             _ => {
607                 return Err(SessionError {
608                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
609                 });
610             }
611         };
612 
613         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
614         event_messages.write_stream_begin(stream_id.clone()).await?;
615 
616         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
617         netstream
618             .on_status(
619                 transaction_id,
620                 &"status".to_string(),
621                 &"NetStream.Publish.Start".to_string(),
622                 &"".to_string(),
623             )
624             .await?;
625 
626         print!("before publish_to_channels\n");
627         self.publish_to_channels(stream_name).await?;
628         print!("after publish_to_channels\n");
629 
630         Ok(())
631     }
632 
633     async fn publish_to_channels(&mut self, stream_name: String) -> Result<(), SessionError> {
634         let (sender, receiver) = oneshot::channel();
635         let publish_event = ChannelEvent::Publish {
636             app_name: self.app_name.clone(),
637             stream_name,
638             responder: sender,
639         };
640 
641         let rv = self.event_producer.send(publish_event);
642         match rv {
643             Err(_) => {
644                 return Err(SessionError {
645                     value: SessionErrorValue::ChannelEventSendErr,
646                 })
647             }
648             _ => {}
649         }
650 
651         match receiver.await {
652             Ok(producer) => {
653                 print!("set producer before\n");
654                 self.data_producer = producer;
655                 print!("set producer after\n");
656             }
657             Err(_) => {}
658         }
659         Ok(())
660     }
661 
662     pub fn on_video_data(
663         &mut self,
664         data: &mut BytesMut,
665         timestamp: &u32,
666     ) -> Result<(), SessionError> {
667         let data = ChannelData::Video {
668             timestamp: timestamp.clone(),
669             data: data.clone(),
670         };
671 
672         //print!("receive video data\n");
673         match self.data_producer.send(data) {
674             Ok(size) => {}
675             Err(err) => {
676                 format!("receive video err {}", err);
677                 return Err(SessionError {
678                     value: SessionErrorValue::SendChannelDataErr,
679                 });
680             }
681         }
682 
683         Ok(())
684     }
685 
686     pub fn on_audio_data(
687         &mut self,
688         data: &mut BytesMut,
689         timestamp: &u32,
690     ) -> Result<(), SessionError> {
691         let data = ChannelData::Audio {
692             timestamp: timestamp.clone(),
693             data: data.clone(),
694         };
695 
696         match self.data_producer.send(data) {
697             Ok(size) => {}
698             Err(_) => {
699                 return Err(SessionError {
700                     value: SessionErrorValue::SendChannelDataErr,
701                 })
702             }
703         }
704 
705         Ok(())
706     }
707 
708     pub fn on_amf_data(&mut self, body: &mut BytesMut) -> Result<(), SessionError> {
709         let data = ChannelData::MetaData { body: body.clone() };
710 
711         match self.data_producer.send(data) {
712             Ok(size) => {}
713             Err(_) => {
714                 return Err(SessionError {
715                     value: SessionErrorValue::SendChannelDataErr,
716                 })
717             }
718         }
719         // let mut metadata_handler = metadata::MetaData::default();
720 
721         // metadata_handler.save(body, values);
722 
723         Ok(())
724     }
725 }
726