1 use super::define;
2 use super::errors::SessionError;
3 use super::errors::SessionErrorValue;
4 use crate::chunk::{unpacketizer::ChunkUnpacketizer, ChunkInfo};
5 use crate::handshake::handshake::{ComplexHandshakeServer, SimpleHandshakeServer};
6 use crate::{amf0::Amf0ValueType, chunk::unpacketizer::UnpackResult};
7 use crate::{channels, chunk::packetizer::ChunkPacketizer};
8 use crate::{
9     chunk::define::CHUNK_SIZE,
10     chunk::define::{chunk_type, csid_type},
11 };
12 
13 use crate::messages::define::msg_type_id;
14 use crate::messages::define::RtmpMessageData;
15 
16 use crate::messages::parser::MessageParser;
17 use bytes::BytesMut;
18 
19 use netio::bytes_writer::AsyncBytesWriter;
20 use netio::bytes_writer::BytesWriter;
21 use netio::netio::NetworkIO;
22 use std::{borrow::BorrowMut, time::Duration};
23 
24 use crate::channels::define::ChannelEvent;
25 use crate::channels::define::MultiConsumerForData;
26 use crate::channels::define::MultiProducerForEvent;
27 use crate::channels::define::SingleProducerForData;
28 use crate::netconnection::commands::NetConnection;
29 use crate::netstream::writer::NetStreamWriter;
30 use crate::protocol_control_messages::writer::ProtocolControlMessagesWriter;
31 
32 use crate::user_control_messages::writer::EventMessagesWriter;
33 
34 use std::collections::HashMap;
35 
36 use tokio::net::TcpStream;
37 use tokio::sync::broadcast;
38 
39 use std::sync::Arc;
40 use tokio::sync::oneshot;
41 use tokio::sync::Mutex;
42 
43 use crate::channels::define::ChannelData;
44 
45 use crate::handshake::handshake::ServerHandshakeState;
46 
47 use crate::cache::metadata;
48 
49 use crate::utils;
50 use log::{debug, log, log_enabled, Level};
51 use std::fmt;
52 
53 enum ServerSessionState {
54     Handshake,
55     ReadChunk,
56     // OnConnect,
57     // OnCreateStream,
58     //Publish,
59     Play,
60 }
61 
62 pub struct ServerSession {
63     app_name: String,
64 
65     io: Arc<Mutex<NetworkIO>>,
66     simple_handshaker: SimpleHandshakeServer,
67     complex_handshaker: ComplexHandshakeServer,
68 
69     packetizer: ChunkPacketizer,
70     unpacketizer: ChunkUnpacketizer,
71 
72     state: ServerSessionState,
73 
74     event_producer: MultiProducerForEvent,
75 
76     //send video, audio or metadata from publish server session to player server sessions
77     data_producer: SingleProducerForData,
78     //receive video, audio or metadata from publish server session and send out to player
79     data_consumer: MultiConsumerForData,
80 
81     session_id: u8,
82 }
83 
84 impl ServerSession {
85     pub fn new(
86         stream: TcpStream,
87         event_producer: MultiProducerForEvent,
88         timeout: Duration,
89         session_id: u8,
90     ) -> Self {
91         let net_io = Arc::new(Mutex::new(NetworkIO::new(stream, timeout)));
92         //only used for init,since I don't found a better way to deal with this.
93         let (init_producer, init_consumer) = broadcast::channel(1);
94         // let reader = BytesReader::new(BytesMut::new());
95 
96         Self {
97             app_name: String::from(""),
98 
99             io: Arc::clone(&net_io),
100             simple_handshaker: SimpleHandshakeServer::new(Arc::clone(&net_io)),
101             complex_handshaker: ComplexHandshakeServer::new(Arc::clone(&net_io)),
102             packetizer: ChunkPacketizer::new(Arc::clone(&net_io)),
103             unpacketizer: ChunkUnpacketizer::new(),
104 
105             state: ServerSessionState::Handshake,
106 
107             event_producer,
108             data_producer: init_producer,
109             data_consumer: init_consumer,
110             session_id: session_id,
111         }
112     }
113 
114     pub async fn run(&mut self) -> Result<(), SessionError> {
115         let duration = Duration::new(10, 10);
116 
117         let mut remaining_bytes: BytesMut = BytesMut::new();
118 
119         loop {
120             let mut net_io_data: BytesMut = BytesMut::new();
121             if remaining_bytes.len() <= 0 {
122                 net_io_data = self.io.lock().await.read().await?;
123 
124                 //utils::print::print(net_io_data.clone());
125             }
126 
127             match self.state {
128                 ServerSessionState::Handshake => {
129                     if self.session_id > 0 {
130                         // utils::print::printu8(net_io_data.clone());
131                     }
132 
133                     self.simple_handshaker.extend_data(&net_io_data[..]);
134                     self.simple_handshaker.handshake().await?;
135 
136                     match self.simple_handshaker.state {
137                         ServerHandshakeState::Finish => {
138                             self.state = ServerSessionState::ReadChunk;
139                             remaining_bytes = self.simple_handshaker.get_remaining_bytes();
140                         }
141                         _ => continue,
142                     }
143                 }
144                 ServerSessionState::ReadChunk => {
145                     if self.session_id > 0 {
146                         // utils::print::printu8(net_io_data.clone());
147                     }
148 
149                     if remaining_bytes.len() > 0 {
150                         let bytes = std::mem::replace(&mut remaining_bytes, BytesMut::new());
151                         self.unpacketizer.extend_data(&bytes[..]);
152                     } else {
153                         self.unpacketizer.extend_data(&net_io_data[..]);
154                     }
155 
156                     loop {
157                         let result = self.unpacketizer.read_chunks();
158                         let rv = match result {
159                             Ok(val) => val,
160                             Err(_) => {
161                                 break;
162                             }
163                         };
164 
165                         match rv {
166                             UnpackResult::Chunks(chunks) => {
167                                 for chunk_info in chunks.iter() {
168                                     let msg_stream_id = chunk_info.message_header.msg_streamd_id;
169                                     let timestamp = chunk_info.message_header.timestamp;
170 
171                                     let mut message_parser = MessageParser::new(chunk_info.clone());
172                                     let mut msg = message_parser.parse()?;
173 
174                                     self.process_messages(&mut msg, &msg_stream_id, &timestamp)
175                                         .await?;
176                                 }
177                             }
178                             _ => {}
179                         }
180                     }
181                 }
182 
183                 //when in play state, only transfer publisher's video/audio/metadta to player.
184                 ServerSessionState::Play => loop {
185                     let data = self.data_consumer.recv().await;
186 
187                     match data {
188                         Ok(val) => match val {
189                             ChannelData::Audio { timestamp, data } => {
190                                 print!("send audio data\n");
191                                 self.send_audio(data, timestamp).await?;
192                             }
193                             ChannelData::Video { timestamp, data } => {
194                                 print!("send video data\n");
195                                 self.send_video(data, timestamp).await?;
196                             }
197                             ChannelData::MetaData {} => {}
198                         },
199                         Err(err) => {}
200                     }
201                 },
202             }
203         }
204 
205         //Ok(())
206     }
207 
208     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
209         let mut controlmessage =
210             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
211         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
212 
213         Ok(())
214     }
215     pub async fn send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> {
216         let mut chunk_info = ChunkInfo::new(
217             csid_type::AUDIO,
218             chunk_type::TYPE_0,
219             timestamp,
220             data.len() as u32,
221             msg_type_id::AUDIO,
222             0,
223             data,
224         );
225 
226         self.packetizer.write_chunk(&mut chunk_info).await?;
227 
228         Ok(())
229     }
230 
231     pub async fn send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> {
232         let mut chunk_info = ChunkInfo::new(
233             csid_type::VIDEO,
234             chunk_type::TYPE_0,
235             timestamp,
236             data.len() as u32,
237             msg_type_id::VIDEO,
238             0,
239             data,
240         );
241 
242         self.packetizer.write_chunk(&mut chunk_info).await?;
243 
244         Ok(())
245     }
246     pub async fn process_messages(
247         &mut self,
248         rtmp_msg: &mut RtmpMessageData,
249         msg_stream_id: &u32,
250         timestamp: &u32,
251     ) -> Result<(), SessionError> {
252         match rtmp_msg {
253             RtmpMessageData::Amf0Command {
254                 command_name,
255                 transaction_id,
256                 command_object,
257                 others,
258             } => {
259                 self.process_amf0_command_message(
260                     msg_stream_id,
261                     command_name,
262                     transaction_id,
263                     command_object,
264                     others,
265                 )
266                 .await?
267             }
268             RtmpMessageData::SetChunkSize { chunk_size } => {
269                 self.on_set_chunk_size(chunk_size.clone() as usize)?;
270             }
271             RtmpMessageData::AudioData { data } => {
272                 self.on_audio_data(data, timestamp)?;
273             }
274             RtmpMessageData::VideoData { data } => {
275                 self.on_video_data(data, timestamp)?;
276             }
277             RtmpMessageData::AmfData { raw_data, values } => {
278                 self.on_amf_data(raw_data, values)?;
279             }
280 
281             _ => {}
282         }
283         Ok(())
284     }
285 
286     pub async fn process_amf0_command_message(
287         &mut self,
288         stream_id: &u32,
289         command_name: &Amf0ValueType,
290         transaction_id: &Amf0ValueType,
291         command_object: &Amf0ValueType,
292         others: &mut Vec<Amf0ValueType>,
293     ) -> Result<(), SessionError> {
294         let empty_cmd_name = &String::new();
295         let cmd_name = match command_name {
296             Amf0ValueType::UTF8String(str) => str,
297             _ => empty_cmd_name,
298         };
299 
300         let transaction_id = match transaction_id {
301             Amf0ValueType::Number(number) => number,
302             _ => &0.0,
303         };
304 
305         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
306         let obj = match command_object {
307             Amf0ValueType::Object(obj) => obj,
308             _ => &empty_cmd_obj,
309         };
310 
311         match cmd_name.as_str() {
312             "connect" => {
313                 print!("connect .......");
314                 self.on_connect(&transaction_id, &obj).await?;
315             }
316             "createStream" => {
317                 self.on_create_stream(transaction_id).await?;
318             }
319             "deleteStream" => {
320                 if others.len() > 1 {
321                     let stream_id = match others.pop() {
322                         Some(val) => match val {
323                             Amf0ValueType::Number(streamid) => streamid,
324                             _ => 0.0,
325                         },
326                         _ => 0.0,
327                     };
328 
329                     self.on_delete_stream(transaction_id, &stream_id).await?;
330                 }
331             }
332             "play" => {
333                 self.on_play(transaction_id, stream_id, others).await?;
334             }
335             "publish" => {
336                 self.on_publish(transaction_id, stream_id, others).await?;
337             }
338             _ => {}
339         }
340 
341         Ok(())
342     }
343 
344     fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> {
345         self.unpacketizer.update_max_chunk_size(chunk_size);
346         Ok(())
347     }
348 
349     async fn on_connect(
350         &mut self,
351         transaction_id: &f64,
352         command_obj: &HashMap<String, Amf0ValueType>,
353     ) -> Result<(), SessionError> {
354         let mut control_message =
355             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
356         control_message
357             .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)
358             .await?;
359         control_message
360             .write_set_peer_bandwidth(
361                 define::PEER_BANDWIDTH,
362                 define::PeerBandWidthLimitType::DYNAMIC,
363             )
364             .await?;
365         control_message.write_set_chunk_size(CHUNK_SIZE).await?;
366 
367         let obj_encoding = command_obj.get("objectEncoding");
368         let encoding = match obj_encoding {
369             Some(Amf0ValueType::Number(encoding)) => encoding,
370             _ => &define::OBJENCODING_AMF0,
371         };
372 
373         let app_name = command_obj.get("app");
374         self.app_name = match app_name {
375             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
376             _ => {
377                 return Err(SessionError {
378                     value: SessionErrorValue::NoAppName,
379                 });
380             }
381         };
382 
383         let mut netconnection = NetConnection::new(BytesWriter::new());
384         let data = netconnection.connect_response(
385             &transaction_id,
386             &define::FMSVER.to_string(),
387             &define::CAPABILITIES,
388             &String::from("NetConnection.Connect.Success"),
389             &define::LEVEL.to_string(),
390             &String::from("Connection Succeeded."),
391             encoding,
392         )?;
393 
394         let mut chunk_info = ChunkInfo::new(
395             csid_type::COMMAND_AMF0_AMF3,
396             chunk_type::TYPE_0,
397             0,
398             data.len() as u32,
399             msg_type_id::COMMAND_AMF0,
400             0,
401             data,
402         );
403 
404         self.packetizer.write_chunk(&mut chunk_info).await?;
405 
406         Ok(())
407     }
408 
409     pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
410         let mut netconnection = NetConnection::new(BytesWriter::new());
411         let data = netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?;
412 
413         let mut chunk_info = ChunkInfo::new(
414             csid_type::COMMAND_AMF0_AMF3,
415             chunk_type::TYPE_0,
416             0,
417             data.len() as u32,
418             msg_type_id::COMMAND_AMF0,
419             0,
420             data,
421         );
422 
423         self.packetizer.write_chunk(&mut chunk_info).await?;
424 
425         Ok(())
426     }
427 
428     pub async fn on_delete_stream(
429         &mut self,
430         transaction_id: &f64,
431         stream_id: &f64,
432     ) -> Result<(), SessionError> {
433         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
434         netstream
435             .on_status(
436                 transaction_id,
437                 &"status".to_string(),
438                 &"NetStream.DeleteStream.Suceess".to_string(),
439                 &"".to_string(),
440             )
441             .await?;
442 
443         Ok(())
444     }
445     pub async fn on_play(
446         &mut self,
447         transaction_id: &f64,
448         stream_id: &u32,
449         other_values: &mut Vec<Amf0ValueType>,
450     ) -> Result<(), SessionError> {
451         let length = other_values.len() as u8;
452         let mut index: u8 = 0;
453 
454         let mut stream_name: Option<String> = None;
455         let mut start: Option<f64> = None;
456         let mut duration: Option<f64> = None;
457         let mut reset: Option<bool> = None;
458 
459         loop {
460             if index >= length {
461                 break;
462             }
463             index = index + 1;
464             stream_name = match other_values.remove(0) {
465                 Amf0ValueType::UTF8String(val) => Some(val),
466                 _ => None,
467             };
468 
469             if index >= length {
470                 break;
471             }
472             index = index + 1;
473             start = match other_values.remove(0) {
474                 Amf0ValueType::Number(val) => Some(val),
475                 _ => None,
476             };
477 
478             if index >= length {
479                 break;
480             }
481             index = index + 1;
482             duration = match other_values.remove(0) {
483                 Amf0ValueType::Number(val) => Some(val),
484                 _ => None,
485             };
486 
487             if index >= length {
488                 break;
489             }
490             index = index + 1;
491             reset = match other_values.remove(0) {
492                 Amf0ValueType::Boolean(val) => Some(val),
493                 _ => None,
494             };
495             break;
496         }
497 
498         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
499         event_messages.write_stream_begin(stream_id.clone()).await?;
500 
501         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
502         netstream
503             .on_status(
504                 transaction_id,
505                 &"status".to_string(),
506                 &"NetStream.Play.Reset".to_string(),
507                 &"reset".to_string(),
508             )
509             .await?;
510 
511         netstream
512             .on_status(
513                 transaction_id,
514                 &"status".to_string(),
515                 &"NetStream.Play.Start".to_string(),
516                 &"play start".to_string(),
517             )
518             .await?;
519 
520         netstream
521             .on_status(
522                 transaction_id,
523                 &"status".to_string(),
524                 &"NetStream.Data.Start".to_string(),
525                 &"data start.".to_string(),
526             )
527             .await?;
528 
529         netstream
530             .on_status(
531                 transaction_id,
532                 &"status".to_string(),
533                 &"NetStream.Play.PublishNotify".to_string(),
534                 &"play publish notify.".to_string(),
535             )
536             .await?;
537 
538         event_messages
539             .write_stream_is_record(stream_id.clone())
540             .await?;
541 
542         self.subscribe_from_channels(stream_name.unwrap()).await?;
543         self.state = ServerSessionState::Play;
544 
545         Ok(())
546     }
547 
548     async fn subscribe_from_channels(&mut self, stream_name: String) -> Result<(), SessionError> {
549         let (sender, receiver) = oneshot::channel();
550         let subscribe_event = ChannelEvent::Subscribe {
551             app_name: self.app_name.clone(),
552             stream_name,
553             responder: sender,
554         };
555 
556         let rv = self.event_producer.send(subscribe_event);
557         match rv {
558             Err(_) => {
559                 return Err(SessionError {
560                     value: SessionErrorValue::ChannelEventSendErr,
561                 })
562             }
563             _ => {}
564         }
565 
566         match receiver.await {
567             Ok(consumer) => {
568                 self.data_consumer = consumer;
569             }
570             Err(_) => {}
571         }
572         Ok(())
573     }
574 
575     pub async fn on_publish(
576         &mut self,
577         transaction_id: &f64,
578         stream_id: &u32,
579         other_values: &mut Vec<Amf0ValueType>,
580     ) -> Result<(), SessionError> {
581         let length = other_values.len();
582 
583         if length < 2 {
584             return Err(SessionError {
585                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
586             });
587         }
588 
589         let stream_name = match other_values.remove(0) {
590             Amf0ValueType::UTF8String(val) => val,
591             _ => {
592                 return Err(SessionError {
593                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
594                 });
595             }
596         };
597 
598         let stream_type = match other_values.remove(0) {
599             Amf0ValueType::UTF8String(val) => val,
600             _ => {
601                 return Err(SessionError {
602                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
603                 });
604             }
605         };
606 
607         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
608         event_messages.write_stream_begin(stream_id.clone()).await?;
609 
610         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
611         netstream
612             .on_status(
613                 transaction_id,
614                 &"status".to_string(),
615                 &"NetStream.Publish.Start".to_string(),
616                 &"".to_string(),
617             )
618             .await?;
619 
620         print!("before publish_to_channels\n");
621         self.publish_to_channels(stream_name).await?;
622         print!("after publish_to_channels\n");
623 
624         Ok(())
625     }
626 
627     async fn publish_to_channels(&mut self, stream_name: String) -> Result<(), SessionError> {
628         let (sender, receiver) = oneshot::channel();
629         let publish_event = ChannelEvent::Publish {
630             app_name: self.app_name.clone(),
631             stream_name,
632             responder: sender,
633         };
634 
635         let rv = self.event_producer.send(publish_event);
636         match rv {
637             Err(_) => {
638                 return Err(SessionError {
639                     value: SessionErrorValue::ChannelEventSendErr,
640                 })
641             }
642             _ => {}
643         }
644 
645         match receiver.await {
646             Ok(producer) => {
647                 print!("set producer before\n");
648                 self.data_producer = producer;
649                 print!("set producer after\n");
650             }
651             Err(_) => {}
652         }
653         Ok(())
654     }
655 
656     pub fn on_video_data(
657         &mut self,
658         data: &mut BytesMut,
659         timestamp: &u32,
660     ) -> Result<(), SessionError> {
661         let data = ChannelData::Video {
662             timestamp: timestamp.clone(),
663             data: data.clone(),
664         };
665 
666         print!("receive video data\n");
667         match self.data_producer.send(data) {
668             Ok(size) => {}
669             Err(err) => {
670                 format!("receive video err {}", err);
671                 return Err(SessionError {
672                     value: SessionErrorValue::SendChannelDataErr,
673                 });
674             }
675         }
676 
677         Ok(())
678     }
679 
680     pub fn on_audio_data(
681         &mut self,
682         data: &mut BytesMut,
683         timestamp: &u32,
684     ) -> Result<(), SessionError> {
685         let data = ChannelData::Audio {
686             timestamp: timestamp.clone(),
687             data: data.clone(),
688         };
689 
690         match self.data_producer.send(data) {
691             Ok(size) => {}
692             Err(_) => {
693                 return Err(SessionError {
694                     value: SessionErrorValue::SendChannelDataErr,
695                 })
696             }
697         }
698 
699         Ok(())
700     }
701 
702     pub fn on_amf_data(
703         &mut self,
704         body: &mut BytesMut,
705         values: &mut Vec<Amf0ValueType>,
706     ) -> Result<(), SessionError> {
707         let mut metadata_handler = metadata::MetaData::default();
708 
709         metadata_handler.save(body, values);
710 
711         Ok(())
712     }
713 }
714