1 use super::define;
2 use super::errors::SessionError;
3 use super::errors::SessionErrorValue;
4 use crate::handshake::handshake::{SimpleHandshakeServer,ComplexHandshakeServer};
5 use crate::{amf0::Amf0ValueType, chunk::unpacketizer::UnpackResult};
6 use crate::{
7     application,
8     chunk::{unpacketizer::ChunkUnpacketizer, ChunkInfo},
9 };
10 use crate::{channels, chunk::packetizer::ChunkPacketizer};
11 use crate::{
12     chunk::define::CHUNK_SIZE,
13     chunk::define::{chunk_type, csid_type},
14 };
15 
16 use crate::messages::define::msg_type_id;
17 use crate::messages::define::RtmpMessageData;
18 
19 use crate::messages::parser::MessageParser;
20 use bytes::BytesMut;
21 
22 use netio::bytes_reader::BytesReader;
23 use netio::bytes_writer::AsyncBytesWriter;
24 use netio::bytes_writer::BytesWriter;
25 use netio::netio::NetworkIO;
26 use std::{borrow::BorrowMut, time::Duration};
27 
28 use crate::channels::define::ChannelEvent;
29 use crate::channels::define::MultiConsumerForData;
30 use crate::channels::define::MultiProducerForEvent;
31 use crate::channels::define::SingleProducerForData;
32 use crate::netconnection::commands::NetConnection;
33 use crate::netstream::commands::NetStream;
34 use crate::protocol_control_messages::control_messages::ControlMessages;
35 
36 use crate::user_control_messages::event_messages::EventMessages;
37 
38 use std::collections::HashMap;
39 
40 use tokio::sync::broadcast;
41 use tokio::{io::ReadHalf, net::TcpStream};
42 
43 use std::cell::{RefCell, RefMut};
44 use std::rc::Rc;
45 use std::sync::Arc;
46 use tokio::sync::oneshot;
47 use tokio::sync::Mutex;
48 
49 use crate::channels::define::ChannelData;
50 use crate::channels::errors::ChannelError;
51 use crate::channels::errors::ChannelErrorValue;
52 
53 use crate::handshake::handshake::ServerHandshakeState;
54 
55 use crate::utils;
56 use log::{debug, log, log_enabled, Level};
57 use std::fmt;
58 
59 enum ServerSessionState {
60     Handshake,
61     ReadChunk,
62     // OnConnect,
63     // OnCreateStream,
64     //Publish,
65     Play,
66 }
67 
68 pub struct ServerSession {
69     app_name: String,
70 
71     io: Arc<Mutex<NetworkIO>>,
72     simple_handshaker: SimpleHandshakeServer,
73     complex_handshaker: ComplexHandshakeServer,
74 
75     packetizer: ChunkPacketizer,
76     unpacketizer: ChunkUnpacketizer,
77 
78     state: ServerSessionState,
79 
80     event_producer: MultiProducerForEvent,
81 
82     //send video, audio or metadata from publish server session to player server sessions
83     data_producer: SingleProducerForData,
84     //receive video, audio or metadata from publish server session and send out to player
85     data_consumer: MultiConsumerForData,
86 
87     session_id: u8,
88 }
89 
90 impl ServerSession {
91     pub fn new(
92         stream: TcpStream,
93         event_producer: MultiProducerForEvent,
94         timeout: Duration,
95         session_id: u8,
96     ) -> Self {
97         let net_io = Arc::new(Mutex::new(NetworkIO::new(stream, timeout)));
98         //only used for init,since I don't found a better way to deal with this.
99         let (init_producer, init_consumer) = broadcast::channel(1);
100         // let reader = BytesReader::new(BytesMut::new());
101 
102         Self {
103             app_name: String::from(""),
104 
105             io: Arc::clone(&net_io),
106             simple_handshaker: SimpleHandshakeServer::new(Arc::clone(&net_io)),
107             complex_handshaker: ComplexHandshakeServer::new(Arc::clone(&net_io)),
108             packetizer: ChunkPacketizer::new(Arc::clone(&net_io)),
109             unpacketizer: ChunkUnpacketizer::new(),
110 
111             state: ServerSessionState::Handshake,
112 
113             event_producer,
114             data_producer: init_producer,
115             data_consumer: init_consumer,
116             session_id: session_id,
117         }
118     }
119 
120     pub async fn run(&mut self) -> Result<(), SessionError> {
121         let duration = Duration::new(10, 10);
122 
123         let mut remaining_bytes: BytesMut = BytesMut::new();
124 
125         loop {
126             let mut net_io_data: BytesMut = BytesMut::new();
127             if remaining_bytes.len() <= 0 {
128                 net_io_data = self.io.lock().await.read().await?;
129 
130                 utils::print::print(net_io_data.clone());
131             }
132 
133             match self.state {
134                 ServerSessionState::Handshake => {
135                     if self.session_id > 0 {
136                         // utils::print::printu8(net_io_data.clone());
137                     }
138 
139                     self.simple_handshaker.extend_data(&net_io_data[..]);
140                     self.simple_handshaker.handshake().await?;
141 
142                     match self.simple_handshaker.state {
143                         ServerHandshakeState::Finish => {
144                             self.state = ServerSessionState::ReadChunk;
145                             remaining_bytes = self.simple_handshaker.get_remaining_bytes();
146                         }
147                         _ => continue,
148                     }
149                 }
150                 ServerSessionState::ReadChunk => {
151                     if self.session_id > 0 {
152                         // utils::print::printu8(net_io_data.clone());
153                     }
154 
155                     if remaining_bytes.len() > 0 {
156                         let bytes = std::mem::replace(&mut remaining_bytes, BytesMut::new());
157                         self.unpacketizer.extend_data(&bytes[..]);
158                     } else {
159                         self.unpacketizer.extend_data(&net_io_data[..]);
160                     }
161 
162                     let result = self.unpacketizer.read_chunks();
163 
164                     let rv = match result {
165                         Ok(val) => val,
166                         Err(err) => {
167                             // return Err(SessionError {
168                             //     value: SessionErrorValue::UnPackError(err),
169                             // })
170                             continue;
171                         }
172                     };
173 
174                     match rv {
175                         UnpackResult::Chunks(chunks) => {
176                             for chunk_info in chunks.iter() {
177                                 let msg_stream_id = chunk_info.message_header.msg_streamd_id;
178                                 let timestamp = chunk_info.message_header.timestamp;
179 
180                                 let mut message_parser = MessageParser::new(chunk_info.clone());
181                                 let mut msg = message_parser.parse()?;
182 
183                                 self.process_messages(&mut msg, &msg_stream_id, &timestamp)
184                                     .await?;
185                             }
186                         }
187                         _ => {}
188                     }
189                 }
190 
191                 //when in play state, only transfer publisher's video/audio/metadta to player.
192                 ServerSessionState::Play => loop {
193                     let data = self.data_consumer.recv().await;
194 
195                     match data {
196                         Ok(val) => match val {
197                             ChannelData::Audio { timestamp, data } => {
198                                 self.send_audio(data, timestamp).await?;
199                             }
200                             ChannelData::Video { timestamp, data } => {
201                                 self.send_video(data, timestamp).await?;
202                             }
203                             ChannelData::MetaData {} => {}
204                         },
205                         Err(err) => {}
206                     }
207                 },
208             }
209         }
210 
211         //Ok(())
212     }
213 
214     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
215         let mut controlmessage = ControlMessages::new(AsyncBytesWriter::new(self.io.clone()));
216         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
217 
218         Ok(())
219     }
220     pub async fn send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> {
221         let mut chunk_info = ChunkInfo::new(
222             csid_type::AUDIO,
223             chunk_type::TYPE_0,
224             timestamp,
225             data.len() as u32,
226             msg_type_id::AUDIO,
227             0,
228             data,
229         );
230 
231         self.packetizer.write_chunk(&mut chunk_info).await?;
232 
233         Ok(())
234     }
235 
236     pub async fn send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> {
237         let mut chunk_info = ChunkInfo::new(
238             csid_type::VIDEO,
239             chunk_type::TYPE_0,
240             timestamp,
241             data.len() as u32,
242             msg_type_id::VIDEO,
243             0,
244             data,
245         );
246 
247         self.packetizer.write_chunk(&mut chunk_info).await?;
248 
249         Ok(())
250     }
251     pub async fn process_messages(
252         &mut self,
253         rtmp_msg: &mut RtmpMessageData,
254         msg_stream_id: &u32,
255         timestamp: &u32,
256     ) -> Result<(), SessionError> {
257         match rtmp_msg {
258             RtmpMessageData::Amf0Command {
259                 command_name,
260                 transaction_id,
261                 command_object,
262                 others,
263             } => {
264                 self.process_amf0_command_message(
265                     msg_stream_id,
266                     command_name,
267                     transaction_id,
268                     command_object,
269                     others,
270                 )
271                 .await?
272             }
273             RtmpMessageData::AudioData { data } => {
274                 self.on_audio_data(data, timestamp)?;
275             }
276             RtmpMessageData::VideoData { data } => {
277                 self.on_video_data(data, timestamp)?;
278             }
279 
280             _ => {}
281         }
282         Ok(())
283     }
284 
285     pub async fn process_amf0_command_message(
286         &mut self,
287         stream_id: &u32,
288         command_name: &Amf0ValueType,
289         transaction_id: &Amf0ValueType,
290         command_object: &Amf0ValueType,
291         others: &mut Vec<Amf0ValueType>,
292     ) -> Result<(), SessionError> {
293         let empty_cmd_name = &String::new();
294         let cmd_name = match command_name {
295             Amf0ValueType::UTF8String(str) => str,
296             _ => empty_cmd_name,
297         };
298 
299         let transaction_id = match transaction_id {
300             Amf0ValueType::Number(number) => number,
301             _ => &0.0,
302         };
303 
304         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
305         let obj = match command_object {
306             Amf0ValueType::Object(obj) => obj,
307             _ => &empty_cmd_obj,
308         };
309 
310         match cmd_name.as_str() {
311             "connect" => {
312                 print!("connect .......");
313                 self.on_connect(&transaction_id, &obj).await?;
314             }
315             "createStream" => {
316                 self.on_create_stream(transaction_id).await?;
317             }
318             "deleteStream" => {
319                 if others.len() > 1 {
320                     let stream_id = match others.pop() {
321                         Some(val) => match val {
322                             Amf0ValueType::Number(streamid) => streamid,
323                             _ => 0.0,
324                         },
325                         _ => 0.0,
326                     };
327 
328                     self.on_delete_stream(transaction_id, &stream_id).await?;
329                 }
330             }
331             "play" => {
332                 self.on_play(transaction_id, stream_id, others).await?;
333             }
334             "publish" => {
335                 self.on_publish(transaction_id, stream_id, others).await?;
336             }
337             _ => {}
338         }
339 
340         Ok(())
341     }
342 
343     async fn on_connect(
344         &mut self,
345         transaction_id: &f64,
346         command_obj: &HashMap<String, Amf0ValueType>,
347     ) -> Result<(), SessionError> {
348         let mut control_message = ControlMessages::new(AsyncBytesWriter::new(self.io.clone()));
349         control_message
350             .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)
351             .await?;
352         control_message
353             .write_set_peer_bandwidth(
354                 define::PEER_BANDWIDTH,
355                 define::PeerBandWidthLimitType::DYNAMIC,
356             )
357             .await?;
358         control_message.write_set_chunk_size(CHUNK_SIZE).await?;
359 
360         let obj_encoding = command_obj.get("objectEncoding");
361         let encoding = match obj_encoding {
362             Some(Amf0ValueType::Number(encoding)) => encoding,
363             _ => &define::OBJENCODING_AMF0,
364         };
365 
366         let app_name = command_obj.get("app");
367         self.app_name = match app_name {
368             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
369             _ => {
370                 return Err(SessionError {
371                     value: SessionErrorValue::NoAppName,
372                 });
373             }
374         };
375 
376         let mut netconnection = NetConnection::new(BytesWriter::new());
377         let data = netconnection.connect_response(
378             &transaction_id,
379             &define::FMSVER.to_string(),
380             &define::CAPABILITIES,
381             &String::from("NetConnection.Connect.Success"),
382             &define::LEVEL.to_string(),
383             &String::from("Connection Succeeded."),
384             encoding,
385         )?;
386 
387         let mut chunk_info = ChunkInfo::new(
388             csid_type::COMMAND_AMF0_AMF3,
389             chunk_type::TYPE_0,
390             0,
391             data.len() as u32,
392             msg_type_id::COMMAND_AMF0,
393             0,
394             data,
395         );
396 
397         self.packetizer.write_chunk(&mut chunk_info).await?;
398 
399         Ok(())
400     }
401 
402     pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
403         let mut netconnection = NetConnection::new(BytesWriter::new());
404         let data = netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?;
405 
406         let mut chunk_info = ChunkInfo::new(
407             csid_type::COMMAND_AMF0_AMF3,
408             chunk_type::TYPE_0,
409             0,
410             data.len() as u32,
411             msg_type_id::COMMAND_AMF0,
412             0,
413             data,
414         );
415 
416         self.packetizer.write_chunk(&mut chunk_info).await?;
417 
418         Ok(())
419     }
420 
421     pub async fn on_delete_stream(
422         &mut self,
423         transaction_id: &f64,
424         stream_id: &f64,
425     ) -> Result<(), SessionError> {
426         let mut netstream = NetStream::new(BytesWriter::new());
427         let data = netstream.on_status(
428             transaction_id,
429             &"status".to_string(),
430             &"NetStream.DeleteStream.Suceess".to_string(),
431             &"".to_string(),
432         )?;
433 
434         let mut chunk_info = ChunkInfo::new(
435             csid_type::COMMAND_AMF0_AMF3,
436             chunk_type::TYPE_0,
437             0,
438             data.len() as u32,
439             msg_type_id::COMMAND_AMF0,
440             0,
441             data,
442         );
443 
444         self.packetizer.write_chunk(&mut chunk_info).await?;
445         Ok(())
446     }
447     pub async fn on_play(
448         &mut self,
449         transaction_id: &f64,
450         stream_id: &u32,
451         other_values: &mut Vec<Amf0ValueType>,
452     ) -> Result<(), SessionError> {
453         let length = other_values.len() as u8;
454         let mut index: u8 = 0;
455 
456         let mut stream_name: Option<String> = None;
457         let mut start: Option<f64> = None;
458         let mut duration: Option<f64> = None;
459         let mut reset: Option<bool> = None;
460 
461         loop {
462             if index >= length {
463                 break;
464             }
465             index = index + 1;
466             stream_name = match other_values.remove(0) {
467                 Amf0ValueType::UTF8String(val) => Some(val),
468                 _ => None,
469             };
470 
471             if index >= length {
472                 break;
473             }
474             index = index + 1;
475             start = match other_values.remove(0) {
476                 Amf0ValueType::Number(val) => Some(val),
477                 _ => None,
478             };
479 
480             if index >= length {
481                 break;
482             }
483             index = index + 1;
484             duration = match other_values.remove(0) {
485                 Amf0ValueType::Number(val) => Some(val),
486                 _ => None,
487             };
488 
489             if index >= length {
490                 break;
491             }
492             index = index + 1;
493             reset = match other_values.remove(0) {
494                 Amf0ValueType::Boolean(val) => Some(val),
495                 _ => None,
496             };
497             break;
498         }
499 
500         let mut event_messages = EventMessages::new(AsyncBytesWriter::new(self.io.clone()));
501         event_messages.stream_begin(stream_id.clone()).await?;
502 
503         let mut netstream = NetStream::new(BytesWriter::new());
504         match reset {
505             Some(val) => {
506                 if val {
507                     netstream.on_status(
508                         transaction_id,
509                         &"status".to_string(),
510                         &"NetStream.Play.Reset".to_string(),
511                         &"".to_string(),
512                     )?;
513                 }
514             }
515             _ => {}
516         }
517 
518         netstream.on_status(
519             transaction_id,
520             &"status".to_string(),
521             &"NetStream.Play.Start".to_string(),
522             &"".to_string(),
523         )?;
524 
525         event_messages.stream_is_record(stream_id.clone()).await?;
526 
527         self.subscribe_from_channels(stream_name.unwrap()).await?;
528         self.state = ServerSessionState::Play;
529 
530         Ok(())
531     }
532 
533     async fn subscribe_from_channels(&mut self, stream_name: String) -> Result<(), SessionError> {
534         let (sender, receiver) = oneshot::channel();
535         let subscribe_event = ChannelEvent::Subscribe {
536             app_name: self.app_name.clone(),
537             stream_name,
538             responder: sender,
539         };
540 
541         let rv = self.event_producer.send(subscribe_event);
542         match rv {
543             Err(_) => {
544                 return Err(SessionError {
545                     value: SessionErrorValue::ChannelEventSendErr,
546                 })
547             }
548             _ => {}
549         }
550 
551         match receiver.await {
552             Ok(consumer) => {
553                 self.data_consumer = consumer;
554             }
555             Err(_) => {}
556         }
557         Ok(())
558     }
559 
560     pub async fn on_publish(
561         &mut self,
562         transaction_id: &f64,
563         stream_id: &u32,
564         other_values: &mut Vec<Amf0ValueType>,
565     ) -> Result<(), SessionError> {
566         let length = other_values.len();
567 
568         if length < 2 {
569             return Err(SessionError {
570                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
571             });
572         }
573 
574         let stream_name = match other_values.remove(0) {
575             Amf0ValueType::UTF8String(val) => val,
576             _ => {
577                 return Err(SessionError {
578                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
579                 });
580             }
581         };
582 
583         let stream_type = match other_values.remove(0) {
584             Amf0ValueType::UTF8String(val) => val,
585             _ => {
586                 return Err(SessionError {
587                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
588                 });
589             }
590         };
591 
592         let mut event_messages = EventMessages::new(AsyncBytesWriter::new(self.io.clone()));
593         event_messages.stream_begin(stream_id.clone()).await?;
594 
595         let mut netstream = NetStream::new(BytesWriter::new());
596         let data = netstream.on_status(
597             transaction_id,
598             &"status".to_string(),
599             &"NetStream.Publish.Start".to_string(),
600             &"".to_string(),
601         )?;
602 
603         let mut chunk_info = ChunkInfo::new(
604             csid_type::COMMAND_AMF0_AMF3,
605             chunk_type::TYPE_0,
606             0,
607             data.len() as u32,
608             msg_type_id::COMMAND_AMF0,
609             0,
610             data,
611         );
612 
613         self.packetizer.write_chunk(&mut chunk_info).await?;
614 
615         //self.publish_to_channels(stream_name).await?;
616 
617         Ok(())
618     }
619 
620     async fn publish_to_channels(&mut self, stream_name: String) -> Result<(), SessionError> {
621         let (sender, receiver) = oneshot::channel();
622         let publish_event = ChannelEvent::Publish {
623             app_name: self.app_name.clone(),
624             stream_name,
625             responder: sender,
626         };
627 
628         let rv = self.event_producer.send(publish_event);
629         match rv {
630             Err(_) => {
631                 return Err(SessionError {
632                     value: SessionErrorValue::ChannelEventSendErr,
633                 })
634             }
635             _ => {}
636         }
637 
638         match receiver.await {
639             Ok(producer) => {
640                 self.data_producer = producer;
641             }
642             Err(_) => {}
643         }
644         Ok(())
645     }
646 
647     pub fn on_video_data(
648         &mut self,
649         data: &mut BytesMut,
650         timestamp: &u32,
651     ) -> Result<(), SessionError> {
652         // let data = ChannelData::Video {
653         //     timestamp: timestamp.clone(),
654         //     data: data.clone(),
655         // };
656 
657         // match self.data_producer.send(data) {
658         //     Ok(size) => {}
659         //     Err(_) => {
660         //         return Err(SessionError {
661         //             value: SessionErrorValue::SendChannelDataErr,
662         //         })
663         //     }
664         // }
665 
666         Ok(())
667     }
668 
669     pub fn on_audio_data(
670         &mut self,
671         data: &mut BytesMut,
672         timestamp: &u32,
673     ) -> Result<(), SessionError> {
674         // let data = ChannelData::Audio {
675         //     timestamp: timestamp.clone(),
676         //     data: data.clone(),
677         // };
678 
679         // match self.data_producer.send(data) {
680         //     Ok(size) => {}
681         //     Err(_) => {
682         //         return Err(SessionError {
683         //             value: SessionErrorValue::SendChannelDataErr,
684         //         })
685         //     }
686         // }
687 
688         Ok(())
689     }
690 }
691