1 use {
2     super::{
3         common::Common,
4         define,
5         errors::{SessionError, SessionErrorValue},
6     },
7     crate::{
8         amf0::Amf0ValueType,
9         channels::define::{
10             ChannelData, ChannelDataConsumer, ChannelDataProducer, ChannelEvent,
11             ChannelEventProducer,
12         },
13         chunk::{
14             define::{chunk_type, csid_type, CHUNK_SIZE},
15             packetizer::ChunkPacketizer,
16             unpacketizer::{ChunkUnpacketizer, UnpackResult},
17             ChunkInfo,
18         },
19         config,
20         handshake::handshake::{ServerHandshakeState, SimpleHandshakeServer},
21         messages::{
22             define::{msg_type_id, RtmpMessageData},
23             parser::MessageParser,
24         },
25         netconnection::commands::NetConnection,
26         netstream::writer::NetStreamWriter,
27         protocol_control_messages::writer::ProtocolControlMessagesWriter,
28         user_control_messages::writer::EventMessagesWriter,
29     },
30     bytes::BytesMut,
31     networkio::{
32         bytes_writer::{AsyncBytesWriter, BytesWriter},
33         networkio::NetworkIO,
34     },
35     std::{collections::HashMap, sync::Arc},
36     tokio::{
37         net::TcpStream,
38         sync::{mpsc, oneshot, Mutex},
39     },
40 };
41 
42 enum ServerSessionState {
43     Handshake,
44     ReadChunk,
45     // OnConnect,
46     // OnCreateStream,
47     //Publish,
48     Play,
49 }
50 
51 pub struct ServerSession {
52     app_name: String,
53     stream_name: String,
54 
55     io: Arc<Mutex<NetworkIO>>,
56     simple_handshaker: SimpleHandshakeServer,
57     //complex_handshaker: ComplexHandshakeServer,
58     packetizer: ChunkPacketizer,
59     unpacketizer: ChunkUnpacketizer,
60 
61     state: ServerSessionState,
62 
63     common: Common,
64 
65     netio_data: BytesMut,
66     need_process: bool,
67 
68     pub session_id: u64,
69     pub session_type: u8,
70 }
71 
72 impl ServerSession {
73     pub fn new(stream: TcpStream, event_producer: ChannelEventProducer, session_id: u64) -> Self {
74         let net_io = Arc::new(Mutex::new(NetworkIO::new(stream)));
75 
76         Self {
77             app_name: String::from(""),
78             stream_name: String::from(""),
79 
80             io: Arc::clone(&net_io),
81             simple_handshaker: SimpleHandshakeServer::new(Arc::clone(&net_io)),
82             //complex_handshaker: ComplexHandshakeServer::new(Arc::clone(&net_io)),
83             packetizer: ChunkPacketizer::new(Arc::clone(&net_io)),
84             unpacketizer: ChunkUnpacketizer::new(),
85 
86             state: ServerSessionState::Handshake,
87 
88             common: Common::new(Arc::clone(&net_io), event_producer),
89 
90             session_id: session_id,
91             netio_data: BytesMut::new(),
92             need_process: false,
93             session_type: 0,
94         }
95     }
96 
97     pub async fn run(&mut self) -> Result<(), SessionError> {
98         loop {
99             match self.state {
100                 ServerSessionState::Handshake => {
101                     self.handshake().await?;
102                 }
103                 ServerSessionState::ReadChunk => {
104                     self.read_parse_chunks().await?;
105                 }
106                 ServerSessionState::Play => {
107                     self.play().await?;
108                 }
109             }
110         }
111 
112         //Ok(())
113     }
114 
115     async fn handshake(&mut self) -> Result<(), SessionError> {
116         self.netio_data = self.io.lock().await.read().await?;
117         self.simple_handshaker.extend_data(&self.netio_data[..]);
118         self.simple_handshaker.handshake().await?;
119 
120         match self.simple_handshaker.state {
121             ServerHandshakeState::Finish => {
122                 self.state = ServerSessionState::ReadChunk;
123 
124                 let left_bytes = self.simple_handshaker.get_remaining_bytes();
125                 if left_bytes.len() > 0 {
126                     self.unpacketizer.extend_data(&left_bytes[..]);
127                     self.need_process = true;
128                 }
129 
130                 return Ok(());
131             }
132             _ => {}
133         }
134 
135         Ok(())
136     }
137 
138     async fn read_parse_chunks(&mut self) -> Result<(), SessionError> {
139         if !self.need_process {
140             self.netio_data = self.io.lock().await.read().await?;
141             self.unpacketizer.extend_data(&self.netio_data[..]);
142         }
143 
144         self.need_process = false;
145 
146         loop {
147             let result = self.unpacketizer.read_chunks();
148 
149             if let Ok(rv) = result {
150                 match rv {
151                     UnpackResult::Chunks(chunks) => {
152                         for chunk_info in chunks.iter() {
153                             let mut msg = MessageParser::new(chunk_info.clone(), self.session_type)
154                                 .parse()?;
155 
156                             let msg_stream_id = chunk_info.message_header.msg_streamd_id;
157                             let timestamp = chunk_info.message_header.timestamp;
158                             self.process_messages(&mut msg, &msg_stream_id, &timestamp)
159                                 .await?;
160                         }
161                     }
162                     _ => {}
163                 }
164             } else {
165                 break;
166             }
167         }
168         Ok(())
169     }
170 
171     async fn play(&mut self) -> Result<(), SessionError> {
172         match self.common.send_channel_data().await {
173             Ok(_) => {}
174 
175             Err(err) => {
176                 self.common
177                     .unsubscribe_from_channels(
178                         self.app_name.clone(),
179                         self.stream_name.clone(),
180                         self.session_id,
181                     )
182                     .await?;
183                 return Err(err);
184             }
185         }
186 
187         Ok(())
188     }
189 
190     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
191         let mut controlmessage =
192             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
193         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
194 
195         Ok(())
196     }
197 
198     pub async fn process_messages(
199         &mut self,
200         rtmp_msg: &mut RtmpMessageData,
201         msg_stream_id: &u32,
202         timestamp: &u32,
203     ) -> Result<(), SessionError> {
204         match rtmp_msg {
205             RtmpMessageData::Amf0Command {
206                 command_name,
207                 transaction_id,
208                 command_object,
209                 others,
210             } => {
211                 self.on_amf0_command_message(
212                     msg_stream_id,
213                     command_name,
214                     transaction_id,
215                     command_object,
216                     others,
217                 )
218                 .await?
219             }
220             RtmpMessageData::SetChunkSize { chunk_size } => {
221                 self.on_set_chunk_size(chunk_size.clone() as usize)?;
222             }
223             RtmpMessageData::AudioData { data } => {
224                 self.common.on_audio_data(data, timestamp)?;
225             }
226             RtmpMessageData::VideoData { data } => {
227                 self.common.on_video_data(data, timestamp)?;
228             }
229             RtmpMessageData::AmfData { raw_data } => {
230                 self.common.on_amf_data(raw_data)?;
231             }
232 
233             _ => {}
234         }
235         Ok(())
236     }
237 
238     pub async fn on_amf0_command_message(
239         &mut self,
240         stream_id: &u32,
241         command_name: &Amf0ValueType,
242         transaction_id: &Amf0ValueType,
243         command_object: &Amf0ValueType,
244         others: &mut Vec<Amf0ValueType>,
245     ) -> Result<(), SessionError> {
246         let empty_cmd_name = &String::new();
247         let cmd_name = match command_name {
248             Amf0ValueType::UTF8String(str) => str,
249             _ => empty_cmd_name,
250         };
251 
252         let transaction_id = match transaction_id {
253             Amf0ValueType::Number(number) => number,
254             _ => &0.0,
255         };
256 
257         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
258         let obj = match command_object {
259             Amf0ValueType::Object(obj) => obj,
260             _ => &empty_cmd_obj,
261         };
262 
263         match cmd_name.as_str() {
264             "connect" => {
265                 print!("connect .......");
266                 self.on_connect(&transaction_id, &obj).await?;
267             }
268             "createStream" => {
269                 self.on_create_stream(transaction_id).await?;
270             }
271             "deleteStream" => {
272                 print!("deletestream....\n");
273                 if others.len() > 0 {
274                     let stream_id = match others.pop() {
275                         Some(val) => match val {
276                             Amf0ValueType::Number(streamid) => streamid,
277                             _ => 0.0,
278                         },
279                         _ => 0.0,
280                     };
281 
282                     print!("deletestream....{}\n", stream_id);
283 
284                     self.on_delete_stream(transaction_id, &stream_id).await?;
285                 }
286             }
287             "play" => {
288                 self.session_type = config::SERVER_PULL;
289                 self.unpacketizer.session_type = config::SERVER_PULL;
290                 self.on_play(transaction_id, stream_id, others).await?;
291             }
292             "publish" => {
293                 self.session_type = config::SERVER_PUSH;
294                 self.unpacketizer.session_type = config::SERVER_PUSH;
295                 self.on_publish(transaction_id, stream_id, others).await?;
296             }
297             _ => {}
298         }
299 
300         Ok(())
301     }
302 
303     fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> {
304         self.unpacketizer.update_max_chunk_size(chunk_size);
305         Ok(())
306     }
307 
308     async fn on_connect(
309         &mut self,
310         transaction_id: &f64,
311         command_obj: &HashMap<String, Amf0ValueType>,
312     ) -> Result<(), SessionError> {
313         let mut control_message =
314             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
315         control_message
316             .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)
317             .await?;
318         control_message
319             .write_set_peer_bandwidth(
320                 define::PEER_BANDWIDTH,
321                 define::peer_bandwidth_limit_type::DYNAMIC,
322             )
323             .await?;
324         control_message.write_set_chunk_size(CHUNK_SIZE).await?;
325 
326         let obj_encoding = command_obj.get("objectEncoding");
327         let encoding = match obj_encoding {
328             Some(Amf0ValueType::Number(encoding)) => encoding,
329             _ => &define::OBJENCODING_AMF0,
330         };
331 
332         let app_name = command_obj.get("app");
333         self.app_name = match app_name {
334             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
335             _ => {
336                 return Err(SessionError {
337                     value: SessionErrorValue::NoAppName,
338                 });
339             }
340         };
341 
342         let mut netconnection = NetConnection::new(BytesWriter::new());
343         let data = netconnection.connect_response(
344             &transaction_id,
345             &define::FMSVER.to_string(),
346             &define::CAPABILITIES,
347             &String::from("NetConnection.Connect.Success"),
348             &define::LEVEL.to_string(),
349             &String::from("Connection Succeeded."),
350             encoding,
351         )?;
352 
353         let mut chunk_info = ChunkInfo::new(
354             csid_type::COMMAND_AMF0_AMF3,
355             chunk_type::TYPE_0,
356             0,
357             data.len() as u32,
358             msg_type_id::COMMAND_AMF0,
359             0,
360             data,
361         );
362 
363         self.packetizer.write_chunk(&mut chunk_info).await?;
364 
365         Ok(())
366     }
367 
368     pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
369         let mut netconnection = NetConnection::new(BytesWriter::new());
370         let data = netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?;
371 
372         let mut chunk_info = ChunkInfo::new(
373             csid_type::COMMAND_AMF0_AMF3,
374             chunk_type::TYPE_0,
375             0,
376             data.len() as u32,
377             msg_type_id::COMMAND_AMF0,
378             0,
379             data,
380         );
381 
382         self.packetizer.write_chunk(&mut chunk_info).await?;
383 
384         Ok(())
385     }
386 
387     pub async fn on_delete_stream(
388         &mut self,
389         transaction_id: &f64,
390         stream_id: &f64,
391     ) -> Result<(), SessionError> {
392         self.common
393             .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone())
394             .await?;
395 
396         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
397         netstream
398             .on_status(
399                 transaction_id,
400                 &"status".to_string(),
401                 &"NetStream.DeleteStream.Suceess".to_string(),
402                 &"".to_string(),
403             )
404             .await?;
405 
406         print!("stream id{}", stream_id);
407 
408         //self.unsubscribe_from_channels().await?;
409 
410         Ok(())
411     }
412     pub async fn on_play(
413         &mut self,
414         transaction_id: &f64,
415         stream_id: &u32,
416         other_values: &mut Vec<Amf0ValueType>,
417     ) -> Result<(), SessionError> {
418         let length = other_values.len() as u8;
419         let mut index: u8 = 0;
420 
421         let mut stream_name: Option<String> = None;
422         let mut start: Option<f64> = None;
423         let mut duration: Option<f64> = None;
424         let mut reset: Option<bool> = None;
425 
426         loop {
427             if index >= length {
428                 break;
429             }
430             index = index + 1;
431             stream_name = match other_values.remove(0) {
432                 Amf0ValueType::UTF8String(val) => Some(val),
433                 _ => None,
434             };
435 
436             if index >= length {
437                 break;
438             }
439             index = index + 1;
440             start = match other_values.remove(0) {
441                 Amf0ValueType::Number(val) => Some(val),
442                 _ => None,
443             };
444 
445             if index >= length {
446                 break;
447             }
448             index = index + 1;
449             duration = match other_values.remove(0) {
450                 Amf0ValueType::Number(val) => Some(val),
451                 _ => None,
452             };
453 
454             if index >= length {
455                 break;
456             }
457             //index = index + 1;
458             reset = match other_values.remove(0) {
459                 Amf0ValueType::Boolean(val) => Some(val),
460                 _ => None,
461             };
462             break;
463         }
464         print!("start {}", start.is_some());
465         print!("druation {}", duration.is_some());
466         print!("reset {}", reset.is_some());
467 
468         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
469         event_messages.write_stream_begin(stream_id.clone()).await?;
470 
471         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
472         netstream
473             .on_status(
474                 transaction_id,
475                 &"status".to_string(),
476                 &"NetStream.Play.Reset".to_string(),
477                 &"reset".to_string(),
478             )
479             .await?;
480 
481         netstream
482             .on_status(
483                 transaction_id,
484                 &"status".to_string(),
485                 &"NetStream.Play.Start".to_string(),
486                 &"play start".to_string(),
487             )
488             .await?;
489 
490         netstream
491             .on_status(
492                 transaction_id,
493                 &"status".to_string(),
494                 &"NetStream.Data.Start".to_string(),
495                 &"data start.".to_string(),
496             )
497             .await?;
498 
499         netstream
500             .on_status(
501                 transaction_id,
502                 &"status".to_string(),
503                 &"NetStream.Play.PublishNotify".to_string(),
504                 &"play publish notify.".to_string(),
505             )
506             .await?;
507 
508         event_messages
509             .write_stream_is_record(stream_id.clone())
510             .await?;
511 
512         self.common
513             .subscribe_from_channels(self.app_name.clone(), stream_name.unwrap(), self.session_id)
514             .await?;
515         self.state = ServerSessionState::Play;
516 
517         Ok(())
518     }
519 
520     pub async fn on_publish(
521         &mut self,
522         transaction_id: &f64,
523         stream_id: &u32,
524         other_values: &mut Vec<Amf0ValueType>,
525     ) -> Result<(), SessionError> {
526         let length = other_values.len();
527 
528         if length < 2 {
529             return Err(SessionError {
530                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
531             });
532         }
533 
534         let stream_name = match other_values.remove(0) {
535             Amf0ValueType::UTF8String(val) => val,
536             _ => {
537                 return Err(SessionError {
538                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
539                 });
540             }
541         };
542 
543         self.stream_name = stream_name;
544 
545         let _ = match other_values.remove(0) {
546             Amf0ValueType::UTF8String(val) => val,
547             _ => {
548                 return Err(SessionError {
549                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
550                 });
551             }
552         };
553 
554         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
555         event_messages.write_stream_begin(stream_id.clone()).await?;
556 
557         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
558         netstream
559             .on_status(
560                 transaction_id,
561                 &"status".to_string(),
562                 &"NetStream.Publish.Start".to_string(),
563                 &"".to_string(),
564             )
565             .await?;
566 
567         print!("before publish_to_channels\n");
568         self.common
569             .publish_to_channels(self.app_name.clone(), self.stream_name.clone())
570             .await?;
571         print!("after publish_to_channels\n");
572 
573         Ok(())
574     }
575 }
576