1 use super::define;
2 use super::errors::SessionError;
3 use super::errors::SessionErrorValue;
4 use crate::chunk::packetizer::ChunkPacketizer;
5 use crate::chunk::{unpacketizer::ChunkUnpacketizer, ChunkInfo};
6 use crate::handshake::handshake::{ComplexHandshakeServer, SimpleHandshakeServer};
7 use crate::{amf0::Amf0ValueType, chunk::unpacketizer::UnpackResult};
8 use crate::{
9     chunk::define::CHUNK_SIZE,
10     chunk::define::{chunk_type, csid_type},
11 };
12 
13 use crate::messages::define::msg_type_id;
14 use crate::messages::define::RtmpMessageData;
15 
16 use crate::messages::parser::MessageParser;
17 use bytes::BytesMut;
18 
19 use netio::bytes_writer::AsyncBytesWriter;
20 use netio::bytes_writer::BytesWriter;
21 use netio::netio::NetworkIO;
22 use std::{ time::Duration};
23 
24 use crate::channels::define::ChannelDataConsumer;
25 use crate::channels::define::ChannelDataPublisher;
26 use crate::channels::define::ChannelEvent;
27 use crate::channels::define::ChannelEventPublisher;
28 // use crate::channels::define::PlayerConsumer;
29 use crate::netconnection::commands::NetConnection;
30 use crate::netstream::writer::NetStreamWriter;
31 use crate::protocol_control_messages::writer::ProtocolControlMessagesWriter;
32 
33 use crate::user_control_messages::writer::EventMessagesWriter;
34 
35 use std::collections::HashMap;
36 
37 use tokio::net::TcpStream;
38 use tokio::sync::broadcast;
39 
40 use std::sync::Arc;
41 use tokio::sync::oneshot;
42 use tokio::sync::Mutex;
43 
44 use crate::channels::define::ChannelData;
45 
46 use crate::handshake::handshake::ServerHandshakeState;
47 
48 enum ServerSessionState {
49     Handshake,
50     ReadChunk,
51     // OnConnect,
52     // OnCreateStream,
53     //Publish,
54     Play,
55 }
56 
57 pub struct ServerSession {
58     app_name: String,
59 
60     io: Arc<Mutex<NetworkIO>>,
61     simple_handshaker: SimpleHandshakeServer,
62     complex_handshaker: ComplexHandshakeServer,
63 
64     packetizer: ChunkPacketizer,
65     unpacketizer: ChunkUnpacketizer,
66 
67     state: ServerSessionState,
68 
69     event_producer: ChannelEventPublisher,
70 
71     //send video, audio or metadata from publish server session to player server sessions
72     data_producer: ChannelDataPublisher,
73     //receive video, audio or metadata from publish server session and send out to player
74     data_consumer: ChannelDataConsumer,
75 
76     session_id: u8,
77 }
78 
79 impl ServerSession {
80     pub fn new(
81         stream: TcpStream,
82         event_producer: ChannelEventPublisher,
83         timeout: Duration,
84         session_id: u8,
85     ) -> Self {
86         let net_io = Arc::new(Mutex::new(NetworkIO::new(stream, timeout)));
87         //only used for init,since I don't found a better way to deal with this.
88         let (init_producer, init_consumer) = broadcast::channel(1);
89 
90         // let reader = BytesReader::new(BytesMut::new());
91 
92         Self {
93             app_name: String::from(""),
94 
95             io: Arc::clone(&net_io),
96             simple_handshaker: SimpleHandshakeServer::new(Arc::clone(&net_io)),
97             complex_handshaker: ComplexHandshakeServer::new(Arc::clone(&net_io)),
98             packetizer: ChunkPacketizer::new(Arc::clone(&net_io)),
99             unpacketizer: ChunkUnpacketizer::new(),
100 
101             state: ServerSessionState::Handshake,
102 
103             event_producer,
104             data_producer: init_producer,
105             data_consumer: init_consumer,
106             session_id: session_id,
107         }
108     }
109 
110     pub async fn run(&mut self) -> Result<(), SessionError> {
111         let duration = Duration::new(10, 10);
112 
113         let mut remaining_bytes: BytesMut = BytesMut::new();
114 
115         loop {
116             let mut net_io_data: BytesMut = BytesMut::new();
117             if remaining_bytes.len() <= 0 {
118                 net_io_data = self.io.lock().await.read().await?;
119 
120                 //utils::print::print(net_io_data.clone());
121             }
122 
123             match self.state {
124                 ServerSessionState::Handshake => {
125                     if self.session_id > 0 {
126                         // utils::print::printu8(net_io_data.clone());
127                     }
128 
129                     self.simple_handshaker.extend_data(&net_io_data[..]);
130                     self.simple_handshaker.handshake().await?;
131 
132                     match self.simple_handshaker.state {
133                         ServerHandshakeState::Finish => {
134                             self.state = ServerSessionState::ReadChunk;
135                             remaining_bytes = self.simple_handshaker.get_remaining_bytes();
136                         }
137                         _ => continue,
138                     }
139                 }
140                 ServerSessionState::ReadChunk => {
141                     if self.session_id > 0 {
142                         // utils::print::printu8(net_io_data.clone());
143                     }
144 
145                     if remaining_bytes.len() > 0 {
146                         let bytes = std::mem::replace(&mut remaining_bytes, BytesMut::new());
147                         self.unpacketizer.extend_data(&bytes[..]);
148                     } else {
149                         self.unpacketizer.extend_data(&net_io_data[..]);
150                     }
151 
152                     loop {
153                         let result = self.unpacketizer.read_chunks();
154                         let rv = match result {
155                             Ok(val) => val,
156                             Err(_) => {
157                                 break;
158                             }
159                         };
160 
161                         match rv {
162                             UnpackResult::Chunks(chunks) => {
163                                 for chunk_info in chunks.iter() {
164                                     let msg_stream_id = chunk_info.message_header.msg_streamd_id;
165                                     let timestamp = chunk_info.message_header.timestamp;
166 
167                                     let mut message_parser = MessageParser::new(chunk_info.clone());
168                                     let mut msg = message_parser.parse()?;
169 
170                                     self.process_messages(&mut msg, &msg_stream_id, &timestamp)
171                                         .await?;
172                                 }
173                             }
174                             _ => {}
175                         }
176                     }
177                 }
178 
179                 //when in play state, only transfer publisher's video/audio/metadta to player.
180                 ServerSessionState::Play => loop {
181                     let data = self.data_consumer.recv().await;
182 
183                     match data {
184                         Ok(val) => match val {
185                             ChannelData::Audio { timestamp, data } => {
186                                 print!("send audio data\n");
187                                 self.send_audio(data, timestamp).await?;
188                             }
189                             ChannelData::Video { timestamp, data } => {
190                                 print!("send video data\n");
191                                 self.send_video(data, timestamp).await?;
192                             }
193                             ChannelData::MetaData { body } => {
194                                 print!("send meta data\n");
195                                 self.send_metadata(body).await?;
196                             }
197                         },
198                         Err(err) => {}
199                     }
200                 },
201             }
202         }
203 
204         //Ok(())
205     }
206 
207     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
208         let mut controlmessage =
209             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
210         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
211 
212         Ok(())
213     }
214     pub async fn send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> {
215         let mut chunk_info = ChunkInfo::new(
216             csid_type::AUDIO,
217             chunk_type::TYPE_0,
218             timestamp,
219             data.len() as u32,
220             msg_type_id::AUDIO,
221             0,
222             data,
223         );
224 
225         self.packetizer.write_chunk(&mut chunk_info).await?;
226 
227         Ok(())
228     }
229 
230     pub async fn send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> {
231         let mut chunk_info = ChunkInfo::new(
232             csid_type::VIDEO,
233             chunk_type::TYPE_0,
234             timestamp,
235             data.len() as u32,
236             msg_type_id::VIDEO,
237             0,
238             data,
239         );
240 
241         self.packetizer.write_chunk(&mut chunk_info).await?;
242 
243         Ok(())
244     }
245 
246     async fn send_metadata(&mut self, data: BytesMut) -> Result<(), SessionError> {
247         let mut chunk_info = ChunkInfo::new(
248             csid_type::DATA_AMF0_AMF3,
249             chunk_type::TYPE_0,
250             0,
251             data.len() as u32,
252             msg_type_id::DATA_AMF0,
253             0,
254             data,
255         );
256 
257         self.packetizer.write_chunk(&mut chunk_info).await?;
258         Ok(())
259     }
260     pub async fn process_messages(
261         &mut self,
262         rtmp_msg: &mut RtmpMessageData,
263         msg_stream_id: &u32,
264         timestamp: &u32,
265     ) -> Result<(), SessionError> {
266         match rtmp_msg {
267             RtmpMessageData::Amf0Command {
268                 command_name,
269                 transaction_id,
270                 command_object,
271                 others,
272             } => {
273                 self.process_amf0_command_message(
274                     msg_stream_id,
275                     command_name,
276                     transaction_id,
277                     command_object,
278                     others,
279                 )
280                 .await?
281             }
282             RtmpMessageData::SetChunkSize { chunk_size } => {
283                 self.on_set_chunk_size(chunk_size.clone() as usize)?;
284             }
285             RtmpMessageData::AudioData { data } => {
286                 self.on_audio_data(data, timestamp)?;
287             }
288             RtmpMessageData::VideoData { data } => {
289                 self.on_video_data(data, timestamp)?;
290             }
291             RtmpMessageData::AmfData { raw_data } => {
292                 self.on_amf_data(raw_data)?;
293             }
294 
295             _ => {}
296         }
297         Ok(())
298     }
299 
300     pub async fn process_amf0_command_message(
301         &mut self,
302         stream_id: &u32,
303         command_name: &Amf0ValueType,
304         transaction_id: &Amf0ValueType,
305         command_object: &Amf0ValueType,
306         others: &mut Vec<Amf0ValueType>,
307     ) -> Result<(), SessionError> {
308         let empty_cmd_name = &String::new();
309         let cmd_name = match command_name {
310             Amf0ValueType::UTF8String(str) => str,
311             _ => empty_cmd_name,
312         };
313 
314         let transaction_id = match transaction_id {
315             Amf0ValueType::Number(number) => number,
316             _ => &0.0,
317         };
318 
319         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
320         let obj = match command_object {
321             Amf0ValueType::Object(obj) => obj,
322             _ => &empty_cmd_obj,
323         };
324 
325         match cmd_name.as_str() {
326             "connect" => {
327                 print!("connect .......");
328                 self.on_connect(&transaction_id, &obj).await?;
329             }
330             "createStream" => {
331                 self.on_create_stream(transaction_id).await?;
332             }
333             "deleteStream" => {
334                 if others.len() > 1 {
335                     let stream_id = match others.pop() {
336                         Some(val) => match val {
337                             Amf0ValueType::Number(streamid) => streamid,
338                             _ => 0.0,
339                         },
340                         _ => 0.0,
341                     };
342 
343                     self.on_delete_stream(transaction_id, &stream_id).await?;
344                 }
345             }
346             "play" => {
347                 self.on_play(transaction_id, stream_id, others).await?;
348             }
349             "publish" => {
350                 self.on_publish(transaction_id, stream_id, others).await?;
351             }
352             _ => {}
353         }
354 
355         Ok(())
356     }
357 
358     fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> {
359         self.unpacketizer.update_max_chunk_size(chunk_size);
360         Ok(())
361     }
362 
363     async fn on_connect(
364         &mut self,
365         transaction_id: &f64,
366         command_obj: &HashMap<String, Amf0ValueType>,
367     ) -> Result<(), SessionError> {
368         let mut control_message =
369             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
370         control_message
371             .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)
372             .await?;
373         control_message
374             .write_set_peer_bandwidth(
375                 define::PEER_BANDWIDTH,
376                 define::PeerBandWidthLimitType::DYNAMIC,
377             )
378             .await?;
379         control_message.write_set_chunk_size(CHUNK_SIZE).await?;
380 
381         let obj_encoding = command_obj.get("objectEncoding");
382         let encoding = match obj_encoding {
383             Some(Amf0ValueType::Number(encoding)) => encoding,
384             _ => &define::OBJENCODING_AMF0,
385         };
386 
387         let app_name = command_obj.get("app");
388         self.app_name = match app_name {
389             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
390             _ => {
391                 return Err(SessionError {
392                     value: SessionErrorValue::NoAppName,
393                 });
394             }
395         };
396 
397         let mut netconnection = NetConnection::new(BytesWriter::new());
398         let data = netconnection.connect_response(
399             &transaction_id,
400             &define::FMSVER.to_string(),
401             &define::CAPABILITIES,
402             &String::from("NetConnection.Connect.Success"),
403             &define::LEVEL.to_string(),
404             &String::from("Connection Succeeded."),
405             encoding,
406         )?;
407 
408         let mut chunk_info = ChunkInfo::new(
409             csid_type::COMMAND_AMF0_AMF3,
410             chunk_type::TYPE_0,
411             0,
412             data.len() as u32,
413             msg_type_id::COMMAND_AMF0,
414             0,
415             data,
416         );
417 
418         self.packetizer.write_chunk(&mut chunk_info).await?;
419 
420         Ok(())
421     }
422 
423     pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
424         let mut netconnection = NetConnection::new(BytesWriter::new());
425         let data = netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?;
426 
427         let mut chunk_info = ChunkInfo::new(
428             csid_type::COMMAND_AMF0_AMF3,
429             chunk_type::TYPE_0,
430             0,
431             data.len() as u32,
432             msg_type_id::COMMAND_AMF0,
433             0,
434             data,
435         );
436 
437         self.packetizer.write_chunk(&mut chunk_info).await?;
438 
439         Ok(())
440     }
441 
442     pub async fn on_delete_stream(
443         &mut self,
444         transaction_id: &f64,
445         stream_id: &f64,
446     ) -> Result<(), SessionError> {
447         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
448         netstream
449             .on_status(
450                 transaction_id,
451                 &"status".to_string(),
452                 &"NetStream.DeleteStream.Suceess".to_string(),
453                 &"".to_string(),
454             )
455             .await?;
456 
457         Ok(())
458     }
459     pub async fn on_play(
460         &mut self,
461         transaction_id: &f64,
462         stream_id: &u32,
463         other_values: &mut Vec<Amf0ValueType>,
464     ) -> Result<(), SessionError> {
465         let length = other_values.len() as u8;
466         let mut index: u8 = 0;
467 
468         let mut stream_name: Option<String> = None;
469         let mut start: Option<f64> = None;
470         let mut duration: Option<f64> = None;
471         let mut reset: Option<bool> = None;
472 
473         loop {
474             if index >= length {
475                 break;
476             }
477             index = index + 1;
478             stream_name = match other_values.remove(0) {
479                 Amf0ValueType::UTF8String(val) => Some(val),
480                 _ => None,
481             };
482 
483             if index >= length {
484                 break;
485             }
486             index = index + 1;
487             start = match other_values.remove(0) {
488                 Amf0ValueType::Number(val) => Some(val),
489                 _ => None,
490             };
491 
492             if index >= length {
493                 break;
494             }
495             index = index + 1;
496             duration = match other_values.remove(0) {
497                 Amf0ValueType::Number(val) => Some(val),
498                 _ => None,
499             };
500 
501             if index >= length {
502                 break;
503             }
504             index = index + 1;
505             reset = match other_values.remove(0) {
506                 Amf0ValueType::Boolean(val) => Some(val),
507                 _ => None,
508             };
509             break;
510         }
511 
512         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
513         event_messages.write_stream_begin(stream_id.clone()).await?;
514 
515         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
516         netstream
517             .on_status(
518                 transaction_id,
519                 &"status".to_string(),
520                 &"NetStream.Play.Reset".to_string(),
521                 &"reset".to_string(),
522             )
523             .await?;
524 
525         netstream
526             .on_status(
527                 transaction_id,
528                 &"status".to_string(),
529                 &"NetStream.Play.Start".to_string(),
530                 &"play start".to_string(),
531             )
532             .await?;
533 
534         netstream
535             .on_status(
536                 transaction_id,
537                 &"status".to_string(),
538                 &"NetStream.Data.Start".to_string(),
539                 &"data start.".to_string(),
540             )
541             .await?;
542 
543         netstream
544             .on_status(
545                 transaction_id,
546                 &"status".to_string(),
547                 &"NetStream.Play.PublishNotify".to_string(),
548                 &"play publish notify.".to_string(),
549             )
550             .await?;
551 
552         event_messages
553             .write_stream_is_record(stream_id.clone())
554             .await?;
555 
556         self.subscribe_from_channels(stream_name.unwrap()).await?;
557         self.state = ServerSessionState::Play;
558 
559         Ok(())
560     }
561 
562     async fn subscribe_from_channels(&mut self, stream_name: String) -> Result<(), SessionError> {
563         let (sender, receiver) = oneshot::channel();
564         let subscribe_event = ChannelEvent::Subscribe {
565             app_name: self.app_name.clone(),
566             stream_name,
567             responder: sender,
568         };
569 
570         let rv = self.event_producer.send(subscribe_event);
571         match rv {
572             Err(_) => {
573                 return Err(SessionError {
574                     value: SessionErrorValue::ChannelEventSendErr,
575                 })
576             }
577             _ => {}
578         }
579 
580         match receiver.await {
581             Ok(consumer) => {
582                 self.data_consumer = consumer;
583             }
584             Err(_) => {}
585         }
586         Ok(())
587     }
588 
589     pub async fn on_publish(
590         &mut self,
591         transaction_id: &f64,
592         stream_id: &u32,
593         other_values: &mut Vec<Amf0ValueType>,
594     ) -> Result<(), SessionError> {
595         let length = other_values.len();
596 
597         if length < 2 {
598             return Err(SessionError {
599                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
600             });
601         }
602 
603         let stream_name = match other_values.remove(0) {
604             Amf0ValueType::UTF8String(val) => val,
605             _ => {
606                 return Err(SessionError {
607                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
608                 });
609             }
610         };
611 
612         let stream_type = match other_values.remove(0) {
613             Amf0ValueType::UTF8String(val) => val,
614             _ => {
615                 return Err(SessionError {
616                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
617                 });
618             }
619         };
620 
621         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
622         event_messages.write_stream_begin(stream_id.clone()).await?;
623 
624         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
625         netstream
626             .on_status(
627                 transaction_id,
628                 &"status".to_string(),
629                 &"NetStream.Publish.Start".to_string(),
630                 &"".to_string(),
631             )
632             .await?;
633 
634         print!("before publish_to_channels\n");
635         self.publish_to_channels(stream_name).await?;
636         print!("after publish_to_channels\n");
637 
638         Ok(())
639     }
640 
641     async fn publish_to_channels(&mut self, stream_name: String) -> Result<(), SessionError> {
642         let (sender, receiver) = oneshot::channel();
643         let publish_event = ChannelEvent::Publish {
644             app_name: self.app_name.clone(),
645             stream_name,
646             responder: sender,
647         };
648 
649         let rv = self.event_producer.send(publish_event);
650         match rv {
651             Err(_) => {
652                 return Err(SessionError {
653                     value: SessionErrorValue::ChannelEventSendErr,
654                 })
655             }
656             _ => {}
657         }
658 
659         match receiver.await {
660             Ok(producer) => {
661                 print!("set producer before\n");
662                 self.data_producer = producer;
663                 print!("set producer after\n");
664             }
665             Err(_) => {}
666         }
667         Ok(())
668     }
669 
670     pub fn on_video_data(
671         &mut self,
672         data: &mut BytesMut,
673         timestamp: &u32,
674     ) -> Result<(), SessionError> {
675         let data = ChannelData::Video {
676             timestamp: timestamp.clone(),
677             data: data.clone(),
678         };
679 
680         print!("receive video data\n");
681         match self.data_producer.send(data) {
682             Ok(size) => {}
683             Err(err) => {
684                 format!("receive video err {}", err);
685                 return Err(SessionError {
686                     value: SessionErrorValue::SendChannelDataErr,
687                 });
688             }
689         }
690 
691         Ok(())
692     }
693 
694     pub fn on_audio_data(
695         &mut self,
696         data: &mut BytesMut,
697         timestamp: &u32,
698     ) -> Result<(), SessionError> {
699         let data = ChannelData::Audio {
700             timestamp: timestamp.clone(),
701             data: data.clone(),
702         };
703 
704         match self.data_producer.send(data) {
705             Ok(size) => {}
706             Err(_) => {
707                 return Err(SessionError {
708                     value: SessionErrorValue::SendChannelDataErr,
709                 })
710             }
711         }
712 
713         Ok(())
714     }
715 
716     pub fn on_amf_data(&mut self, body: &mut BytesMut) -> Result<(), SessionError> {
717         let data = ChannelData::MetaData { body: body.clone() };
718 
719         match self.data_producer.send(data) {
720             Ok(size) => {}
721             Err(_) => {
722                 return Err(SessionError {
723                     value: SessionErrorValue::SendChannelDataErr,
724                 })
725             }
726         }
727         // let mut metadata_handler = metadata::MetaData::default();
728 
729         // metadata_handler.save(body, values);
730 
731         Ok(())
732     }
733 }
734