1 use super::define;
2 use super::errors::SessionError;
3 use super::errors::SessionErrorValue;
4 use crate::handshake::handshake::SimpleHandshakeServer;
5 use crate::{amf0::Amf0ValueType, chunk::unpacketizer::UnpackResult};
6 use crate::{
7     application,
8     chunk::{unpacketizer::ChunkUnpacketizer, ChunkInfo},
9 };
10 use crate::{channels, chunk::packetizer::ChunkPacketizer};
11 use crate::{
12     chunk::define::CHUNK_SIZE,
13     chunk::define::{chunk_type, csid_type},
14 };
15 
16 use crate::messages::define::msg_type_id;
17 use crate::messages::define::RtmpMessageData;
18 
19 use crate::messages::parser::MessageParser;
20 use bytes::BytesMut;
21 
22 use netio::bytes_reader::BytesReader;
23 use netio::bytes_writer::AsyncBytesWriter;
24 use netio::bytes_writer::BytesWriter;
25 use netio::netio::NetworkIO;
26 use std::{borrow::BorrowMut, time::Duration};
27 
28 use crate::channels::define::ChannelEvent;
29 use crate::channels::define::MultiConsumerForData;
30 use crate::channels::define::MultiProducerForEvent;
31 use crate::channels::define::SingleProducerForData;
32 use crate::netconnection::commands::NetConnection;
33 use crate::netstream::commands::NetStream;
34 use crate::protocol_control_messages::control_messages::ControlMessages;
35 
36 use crate::user_control_messages::event_messages::EventMessages;
37 
38 use std::collections::HashMap;
39 
40 use tokio::{prelude::*, sync::broadcast};
41 
42 use std::cell::{RefCell, RefMut};
43 use std::rc::Rc;
44 use std::sync::Arc;
45 use tokio::sync::oneshot;
46 use tokio::sync::Mutex;
47 
48 use crate::channels::define::ChannelData;
49 use crate::channels::errors::ChannelError;
50 use crate::channels::errors::ChannelErrorValue;
51 
52 use crate::handshake::handshake::ServerHandshakeState;
53 
54 use crate::utils;
55 use log::{debug, log, log_enabled, Level};
56 use std::fmt;
57 
58 enum ServerSessionState {
59     Handshake,
60     ReadChunk,
61     // OnConnect,
62     // OnCreateStream,
63     //Publish,
64     Play,
65 }
66 
67 pub struct ServerSession<S>
68 where
69     S: AsyncRead + AsyncWrite + Unpin + Send + Sync,
70 {
71     app_name: String,
72 
73     io: Arc<Mutex<NetworkIO<S>>>,
74     handshaker: SimpleHandshakeServer<S>,
75 
76     packetizer: ChunkPacketizer<S>,
77     unpacketizer: ChunkUnpacketizer,
78 
79     state: ServerSessionState,
80 
81     event_producer: MultiProducerForEvent,
82 
83     //send video, audio or metadata from publish server session to player server sessions
84     data_producer: SingleProducerForData,
85     //receive video, audio or metadata from publish server session and send out to player
86     data_consumer: MultiConsumerForData,
87 }
88 
89 impl<S> ServerSession<S>
90 where
91     S: AsyncRead + AsyncWrite + Unpin + Send + Sync,
92 {
93     pub fn new(stream: S, event_producer: MultiProducerForEvent, timeout: Duration) -> Self {
94         let net_io = Arc::new(Mutex::new(NetworkIO::new(stream, timeout)));
95         //only used for init,since I don't found a better way to deal with this.
96         let (init_producer, init_consumer) = broadcast::channel(1);
97         // let reader = BytesReader::new(BytesMut::new());
98 
99         Self {
100             app_name: String::from(""),
101 
102             io: Arc::clone(&net_io),
103             handshaker: SimpleHandshakeServer::new(Arc::clone(&net_io)),
104             packetizer: ChunkPacketizer::new(Arc::clone(&net_io)),
105             unpacketizer: ChunkUnpacketizer::new(),
106 
107             state: ServerSessionState::Handshake,
108 
109             event_producer,
110             data_producer: init_producer,
111             data_consumer: init_consumer,
112         }
113     }
114 
115     pub async fn run(&mut self) -> Result<(), SessionError> {
116         let duration = Duration::new(10, 10);
117 
118         let mut remaining_bytes: BytesMut = BytesMut::new();
119         let mut net_io_data: BytesMut = BytesMut::new();
120 
121         loop {
122             if remaining_bytes.len() <= 0 {
123                 net_io_data = self.io.lock().await.read().await?;
124             }
125 
126             match self.state {
127                 ServerSessionState::Handshake => {
128                     utils::print::printu8(net_io_data.clone());
129                     self.handshaker.extend_data(&net_io_data[..]);
130                     self.handshaker.handshake().await?;
131 
132                     match self.handshaker.state {
133                         ServerHandshakeState::Finish => {
134                             self.state = ServerSessionState::ReadChunk;
135                             remaining_bytes = self.handshaker.get_remaining_bytes();
136                         }
137                         _ => continue,
138                     }
139                 }
140                 ServerSessionState::ReadChunk => {
141                     utils::print::printu8(net_io_data.clone());
142 
143                     if remaining_bytes.len() > 0 {
144                         let bytes = std::mem::replace(&mut remaining_bytes, BytesMut::new());
145                         self.unpacketizer.extend_data(&bytes[..]);
146                     } else {
147                         self.unpacketizer.extend_data(&net_io_data[..]);
148                     }
149 
150                     let result = self.unpacketizer.read_chunks();
151 
152                     let rv = match result {
153                         Ok(val) => val,
154                         Err(err) => {
155                             // return Err(SessionError {
156                             //     value: SessionErrorValue::UnPackError(err),
157                             // })
158                             continue;
159                         }
160                     };
161 
162                     match rv {
163                         UnpackResult::Chunks(chunks) => {
164                             for chunk_info in chunks.iter() {
165                                 let msg_stream_id = chunk_info.message_header.msg_streamd_id;
166                                 let timestamp = chunk_info.message_header.timestamp;
167 
168                                 let mut message_parser = MessageParser::new(chunk_info.clone());
169                                 let mut msg = message_parser.parse()?;
170 
171                                 self.process_messages(&mut msg, &msg_stream_id, &timestamp)
172                                     .await?;
173                             }
174                         }
175                         _ => {}
176                     }
177                 }
178 
179                 //when in play state, only transfer publisher's video/audio/metadta to player.
180                 ServerSessionState::Play => loop {
181                     let data = self.data_consumer.recv().await;
182 
183                     match data {
184                         Ok(val) => match val {
185                             ChannelData::Audio { timestamp, data } => {
186                                 self.send_audio(data, timestamp).await?;
187                             }
188                             ChannelData::Video { timestamp, data } => {
189                                 self.send_video(data, timestamp).await?;
190                             }
191                             ChannelData::MetaData {} => {}
192                         },
193                         Err(err) => {}
194                     }
195                 },
196             }
197         }
198 
199         //Ok(())
200     }
201 
202     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
203         let mut controlmessage = ControlMessages::new(AsyncBytesWriter::new(self.io.clone()));
204         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
205 
206         Ok(())
207     }
208     pub async fn send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> {
209         let mut chunk_info = ChunkInfo::new(
210             csid_type::AUDIO,
211             chunk_type::TYPE_0,
212             timestamp,
213             data.len() as u32,
214             msg_type_id::AUDIO,
215             0,
216             data,
217         );
218 
219         self.packetizer.write_chunk(&mut chunk_info).await?;
220 
221         Ok(())
222     }
223 
224     pub async fn send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> {
225         let mut chunk_info = ChunkInfo::new(
226             csid_type::VIDEO,
227             chunk_type::TYPE_0,
228             timestamp,
229             data.len() as u32,
230             msg_type_id::VIDEO,
231             0,
232             data,
233         );
234 
235         self.packetizer.write_chunk(&mut chunk_info).await?;
236 
237         Ok(())
238     }
239     pub async fn process_messages(
240         &mut self,
241         rtmp_msg: &mut RtmpMessageData,
242         msg_stream_id: &u32,
243         timestamp: &u32,
244     ) -> Result<(), SessionError> {
245         match rtmp_msg {
246             RtmpMessageData::Amf0Command {
247                 command_name,
248                 transaction_id,
249                 command_object,
250                 others,
251             } => {
252                 self.process_amf0_command_message(
253                     msg_stream_id,
254                     command_name,
255                     transaction_id,
256                     command_object,
257                     others,
258                 )
259                 .await?
260             }
261             RtmpMessageData::AudioData { data } => {
262                 self.on_audio_data(data, timestamp)?;
263             }
264             RtmpMessageData::VideoData { data } => {
265                 self.on_video_data(data, timestamp)?;
266             }
267 
268             _ => {}
269         }
270         Ok(())
271     }
272 
273     pub async fn process_amf0_command_message(
274         &mut self,
275         stream_id: &u32,
276         command_name: &Amf0ValueType,
277         transaction_id: &Amf0ValueType,
278         command_object: &Amf0ValueType,
279         others: &mut Vec<Amf0ValueType>,
280     ) -> Result<(), SessionError> {
281         let empty_cmd_name = &String::new();
282         let cmd_name = match command_name {
283             Amf0ValueType::UTF8String(str) => str,
284             _ => empty_cmd_name,
285         };
286 
287         let transaction_id = match transaction_id {
288             Amf0ValueType::Number(number) => number,
289             _ => &0.0,
290         };
291 
292         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
293         let obj = match command_object {
294             Amf0ValueType::Object(obj) => obj,
295             _ => &empty_cmd_obj,
296         };
297 
298         match cmd_name.as_str() {
299             "connect" => {
300                 self.on_connect(&transaction_id, &obj).await?;
301             }
302             "createStream" => {
303                 self.on_create_stream(transaction_id)?;
304             }
305             "deleteStream" => {
306                 if others.len() > 1 {
307                     let stream_id = match others.pop() {
308                         Some(val) => match val {
309                             Amf0ValueType::Number(streamid) => streamid,
310                             _ => 0.0,
311                         },
312                         _ => 0.0,
313                     };
314 
315                     self.on_delete_stream(transaction_id, &stream_id).await?;
316                 }
317             }
318             "play" => {
319                 self.on_play(transaction_id, stream_id, others).await?;
320             }
321             "publish" => {
322                 self.on_publish(transaction_id, stream_id, others).await?;
323             }
324             _ => {}
325         }
326 
327         Ok(())
328     }
329 
330     async fn on_connect(
331         &mut self,
332         transaction_id: &f64,
333         command_obj: &HashMap<String, Amf0ValueType>,
334     ) -> Result<(), SessionError> {
335         let mut control_message = ControlMessages::new(AsyncBytesWriter::new(self.io.clone()));
336         control_message
337             .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)
338             .await?;
339         control_message
340             .write_set_peer_bandwidth(
341                 define::PEER_BANDWIDTH,
342                 define::PeerBandWidthLimitType::DYNAMIC,
343             )
344             .await?;
345         control_message.write_set_chunk_size(CHUNK_SIZE).await?;
346 
347         let obj_encoding = command_obj.get("objectEncoding");
348         let encoding = match obj_encoding {
349             Some(Amf0ValueType::Number(encoding)) => encoding,
350             _ => &define::OBJENCODING_AMF0,
351         };
352 
353         let app_name = command_obj.get("app");
354         self.app_name = match app_name {
355             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
356             _ => {
357                 return Err(SessionError {
358                     value: SessionErrorValue::NoAppName,
359                 });
360             }
361         };
362 
363         let mut netconnection = NetConnection::new(BytesWriter::new());
364         let data = netconnection.connect_response(
365             &transaction_id,
366             &define::FMSVER.to_string(),
367             &define::CAPABILITIES,
368             &String::from("NetConnection.Connect.Success"),
369             &define::LEVEL.to_string(),
370             &String::from("Connection Succeeded."),
371             encoding,
372         )?;
373 
374         let mut chunk_info = ChunkInfo::new(
375             csid_type::COMMAND_AMF0_AMF3,
376             chunk_type::TYPE_0,
377             0,
378             data.len() as u32,
379             msg_type_id::COMMAND_AMF0,
380             0,
381             data,
382         );
383 
384         self.packetizer.write_chunk(&mut chunk_info).await?;
385 
386         Ok(())
387     }
388 
389     pub fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
390         let mut netconnection = NetConnection::new(BytesWriter::new());
391         netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?;
392 
393         Ok(())
394     }
395 
396     pub async fn on_delete_stream(
397         &mut self,
398         transaction_id: &f64,
399         stream_id: &f64,
400     ) -> Result<(), SessionError> {
401         let mut netstream = NetStream::new(BytesWriter::new());
402         let data = netstream.on_status(
403             transaction_id,
404             &"status".to_string(),
405             &"NetStream.DeleteStream.Suceess".to_string(),
406             &"".to_string(),
407         )?;
408 
409         let mut chunk_info = ChunkInfo::new(
410             csid_type::COMMAND_AMF0_AMF3,
411             chunk_type::TYPE_0,
412             0,
413             data.len() as u32,
414             msg_type_id::COMMAND_AMF0,
415             0,
416             data,
417         );
418 
419         self.packetizer.write_chunk(&mut chunk_info).await?;
420         Ok(())
421     }
422     pub async fn on_play(
423         &mut self,
424         transaction_id: &f64,
425         stream_id: &u32,
426         other_values: &mut Vec<Amf0ValueType>,
427     ) -> Result<(), SessionError> {
428         let length = other_values.len() as u8;
429         let mut index: u8 = 0;
430 
431         let mut stream_name: Option<String> = None;
432         let mut start: Option<f64> = None;
433         let mut duration: Option<f64> = None;
434         let mut reset: Option<bool> = None;
435 
436         loop {
437             if index >= length {
438                 break;
439             }
440             index = index + 1;
441             stream_name = match other_values.remove(0) {
442                 Amf0ValueType::UTF8String(val) => Some(val),
443                 _ => None,
444             };
445 
446             if index >= length {
447                 break;
448             }
449             index = index + 1;
450             start = match other_values.remove(0) {
451                 Amf0ValueType::Number(val) => Some(val),
452                 _ => None,
453             };
454 
455             if index >= length {
456                 break;
457             }
458             index = index + 1;
459             duration = match other_values.remove(0) {
460                 Amf0ValueType::Number(val) => Some(val),
461                 _ => None,
462             };
463 
464             if index >= length {
465                 break;
466             }
467             index = index + 1;
468             reset = match other_values.remove(0) {
469                 Amf0ValueType::Boolean(val) => Some(val),
470                 _ => None,
471             };
472             break;
473         }
474 
475         let mut event_messages = EventMessages::new(AsyncBytesWriter::new(self.io.clone()));
476         event_messages.stream_begin(stream_id.clone()).await?;
477 
478         let mut netstream = NetStream::new(BytesWriter::new());
479         match reset {
480             Some(val) => {
481                 if val {
482                     netstream.on_status(
483                         transaction_id,
484                         &"status".to_string(),
485                         &"NetStream.Play.Reset".to_string(),
486                         &"".to_string(),
487                     )?;
488                 }
489             }
490             _ => {}
491         }
492 
493         netstream.on_status(
494             transaction_id,
495             &"status".to_string(),
496             &"NetStream.Play.Start".to_string(),
497             &"".to_string(),
498         )?;
499 
500         event_messages.stream_is_record(stream_id.clone()).await?;
501 
502         self.subscribe_from_channels(stream_name.unwrap()).await?;
503         self.state = ServerSessionState::Play;
504 
505         Ok(())
506     }
507 
508     async fn subscribe_from_channels(&mut self, stream_name: String) -> Result<(), SessionError> {
509         let (sender, receiver) = oneshot::channel();
510         let subscribe_event = ChannelEvent::Subscribe {
511             app_name: self.app_name.clone(),
512             stream_name,
513             responder: sender,
514         };
515 
516         let rv = self.event_producer.send(subscribe_event);
517         match rv {
518             Err(_) => {
519                 return Err(SessionError {
520                     value: SessionErrorValue::ChannelEventSendErr,
521                 })
522             }
523             _ => {}
524         }
525 
526         match receiver.await {
527             Ok(consumer) => {
528                 self.data_consumer = consumer;
529             }
530             Err(_) => {}
531         }
532         Ok(())
533     }
534 
535     pub async fn on_publish(
536         &mut self,
537         transaction_id: &f64,
538         stream_id: &u32,
539         other_values: &mut Vec<Amf0ValueType>,
540     ) -> Result<(), SessionError> {
541         let length = other_values.len();
542 
543         if length < 2 {
544             return Err(SessionError {
545                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
546             });
547         }
548 
549         let stream_name = match other_values.remove(0) {
550             Amf0ValueType::UTF8String(val) => val,
551             _ => {
552                 return Err(SessionError {
553                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
554                 });
555             }
556         };
557 
558         let stream_type = match other_values.remove(0) {
559             Amf0ValueType::UTF8String(val) => val,
560             _ => {
561                 return Err(SessionError {
562                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
563                 });
564             }
565         };
566 
567         let mut event_messages = EventMessages::new(AsyncBytesWriter::new(self.io.clone()));
568         event_messages.stream_begin(stream_id.clone()).await?;
569 
570         let mut netstream = NetStream::new(BytesWriter::new());
571         let data = netstream.on_status(
572             transaction_id,
573             &"status".to_string(),
574             &"NetStream.Publish.Start".to_string(),
575             &"".to_string(),
576         )?;
577 
578         let mut chunk_info = ChunkInfo::new(
579             csid_type::COMMAND_AMF0_AMF3,
580             chunk_type::TYPE_0,
581             0,
582             data.len() as u32,
583             msg_type_id::COMMAND_AMF0,
584             0,
585             data,
586         );
587 
588         self.packetizer.write_chunk(&mut chunk_info).await?;
589 
590         self.publish_to_channels(stream_name).await?;
591 
592         Ok(())
593     }
594 
595     async fn publish_to_channels(&mut self, stream_name: String) -> Result<(), SessionError> {
596         let (sender, receiver) = oneshot::channel();
597         let publish_event = ChannelEvent::Publish {
598             app_name: self.app_name.clone(),
599             stream_name,
600             responder: sender,
601         };
602 
603         let rv = self.event_producer.send(publish_event);
604         match rv {
605             Err(_) => {
606                 return Err(SessionError {
607                     value: SessionErrorValue::ChannelEventSendErr,
608                 })
609             }
610             _ => {}
611         }
612 
613         match receiver.await {
614             Ok(producer) => {
615                 self.data_producer = producer;
616             }
617             Err(_) => {}
618         }
619         Ok(())
620     }
621 
622     pub fn on_video_data(
623         &mut self,
624         data: &mut BytesMut,
625         timestamp: &u32,
626     ) -> Result<(), SessionError> {
627         let data = ChannelData::Video {
628             timestamp: timestamp.clone(),
629             data: data.clone(),
630         };
631 
632         match self.data_producer.send(data) {
633             Ok(size) => {}
634             Err(_) => {
635                 return Err(SessionError {
636                     value: SessionErrorValue::SendChannelDataErr,
637                 })
638             }
639         }
640 
641         Ok(())
642     }
643 
644     pub fn on_audio_data(
645         &mut self,
646         data: &mut BytesMut,
647         timestamp: &u32,
648     ) -> Result<(), SessionError> {
649         let data = ChannelData::Audio {
650             timestamp: timestamp.clone(),
651             data: data.clone(),
652         };
653 
654         match self.data_producer.send(data) {
655             Ok(size) => {}
656             Err(_) => {
657                 return Err(SessionError {
658                     value: SessionErrorValue::SendChannelDataErr,
659                 })
660             }
661         }
662 
663         Ok(())
664     }
665 }
666