1 use {
2     super::{
3         common::Common,
4         define,
5         define::SessionType,
6         errors::{SessionError, SessionErrorValue},
7     },
8     crate::{
9         amf0::Amf0ValueType,
10         channels::define::ChannelEventProducer,
11         chunk::{
12             define::{chunk_type, csid_type, CHUNK_SIZE},
13             packetizer::ChunkPacketizer,
14             unpacketizer::{ChunkUnpacketizer, UnpackResult},
15             ChunkInfo,
16         },
17         config,
18         handshake::handshake::{HandshakeServer, ServerHandshakeState},
19         messages::{
20             define::{msg_type_id, RtmpMessageData},
21             parser::MessageParser,
22         },
23         netconnection::commands::NetConnection,
24         netstream::writer::NetStreamWriter,
25         protocol_control_messages::writer::ProtocolControlMessagesWriter,
26         user_control_messages::writer::EventMessagesWriter,
27     },
28     bytes::BytesMut,
29     networkio::{
30         bytes_writer::{AsyncBytesWriter, BytesWriter},
31         networkio::NetworkIO,
32     },
33     std::{collections::HashMap, sync::Arc},
34     tokio::{net::TcpStream, sync::Mutex},
35 };
36 
37 enum ServerSessionState {
38     Handshake,
39     ReadChunk,
40     // OnConnect,
41     // OnCreateStream,
42     //Publish,
43     Play,
44 }
45 
46 pub struct ServerSession {
47     app_name: String,
48     stream_name: String,
49 
50     io: Arc<Mutex<NetworkIO>>,
51     handshaker: HandshakeServer,
52 
53     packetizer: ChunkPacketizer,
54     unpacketizer: ChunkUnpacketizer,
55 
56     state: ServerSessionState,
57 
58     common: Common,
59 
60     netio_data: BytesMut,
61     need_process: bool,
62 
63     pub session_id: u64,
64     pub session_type: u8,
65 
66     connect_command_object: Option<HashMap<String, Amf0ValueType>>,
67 }
68 
69 impl ServerSession {
70     pub fn new(stream: TcpStream, event_producer: ChannelEventProducer, session_id: u64) -> Self {
71         let net_io = Arc::new(Mutex::new(NetworkIO::new(stream)));
72 
73         Self {
74             app_name: String::from(""),
75             stream_name: String::from(""),
76 
77             io: Arc::clone(&net_io),
78             handshaker: HandshakeServer::new(Arc::clone(&net_io)),
79 
80             packetizer: ChunkPacketizer::new(Arc::clone(&net_io)),
81             unpacketizer: ChunkUnpacketizer::new(),
82 
83             state: ServerSessionState::Handshake,
84 
85             common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Server),
86 
87             session_id: session_id,
88             netio_data: BytesMut::new(),
89             need_process: false,
90             session_type: 0,
91             connect_command_object: None,
92         }
93     }
94 
95     pub async fn run(&mut self) -> Result<(), SessionError> {
96         loop {
97             match self.state {
98                 ServerSessionState::Handshake => {
99                     self.handshake().await?;
100                 }
101                 ServerSessionState::ReadChunk => {
102                     self.read_parse_chunks().await?;
103                 }
104                 ServerSessionState::Play => {
105                     self.play().await?;
106                 }
107             }
108         }
109 
110         //Ok(())
111     }
112 
113     async fn handshake(&mut self) -> Result<(), SessionError> {
114         self.netio_data = self.io.lock().await.read().await?;
115         self.handshaker.extend_data(&self.netio_data[..]);
116         self.handshaker.handshake().await?;
117 
118         match self.handshaker.state() {
119             ServerHandshakeState::Finish => {
120                 self.state = ServerSessionState::ReadChunk;
121 
122                 let left_bytes = self.handshaker.get_remaining_bytes();
123                 if left_bytes.len() > 0 {
124                     self.unpacketizer.extend_data(&left_bytes[..]);
125                     self.need_process = true;
126                 }
127                 self.send_set_chunk_size().await?;
128                 return Ok(());
129             }
130             _ => {}
131         }
132 
133         Ok(())
134     }
135 
136     async fn read_parse_chunks(&mut self) -> Result<(), SessionError> {
137         if !self.need_process {
138             self.netio_data = self.io.lock().await.read().await?;
139             self.unpacketizer.extend_data(&self.netio_data[..]);
140         }
141 
142         self.need_process = false;
143 
144         loop {
145             let result = self.unpacketizer.read_chunks();
146 
147             if let Ok(rv) = result {
148                 match rv {
149                     UnpackResult::Chunks(chunks) => {
150                         for chunk_info in chunks.iter() {
151                             let mut msg = MessageParser::new(chunk_info.clone(), self.session_type)
152                                 .parse()?;
153 
154                             let msg_stream_id = chunk_info.message_header.msg_streamd_id;
155                             let timestamp = chunk_info.message_header.timestamp;
156                             self.process_messages(&mut msg, &msg_stream_id, &timestamp)
157                                 .await?;
158                         }
159                     }
160                     _ => {}
161                 }
162             } else {
163                 break;
164             }
165         }
166         Ok(())
167     }
168 
169     async fn play(&mut self) -> Result<(), SessionError> {
170         match self.common.send_channel_data().await {
171             Ok(_) => {}
172 
173             Err(err) => {
174                 self.common
175                     .unsubscribe_from_channels(
176                         self.app_name.clone(),
177                         self.stream_name.clone(),
178                         self.session_id,
179                     )
180                     .await?;
181                 return Err(err);
182             }
183         }
184 
185         Ok(())
186     }
187 
188     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
189         let mut controlmessage =
190             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
191         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
192 
193         Ok(())
194     }
195 
196     pub async fn process_messages(
197         &mut self,
198         rtmp_msg: &mut RtmpMessageData,
199         msg_stream_id: &u32,
200         timestamp: &u32,
201     ) -> Result<(), SessionError> {
202         match rtmp_msg {
203             RtmpMessageData::Amf0Command {
204                 command_name,
205                 transaction_id,
206                 command_object,
207                 others,
208             } => {
209                 self.on_amf0_command_message(
210                     msg_stream_id,
211                     command_name,
212                     transaction_id,
213                     command_object,
214                     others,
215                 )
216                 .await?
217             }
218             RtmpMessageData::SetChunkSize { chunk_size } => {
219                 self.on_set_chunk_size(chunk_size.clone() as usize)?;
220             }
221             RtmpMessageData::AudioData { data } => {
222                 self.common.on_audio_data(data, timestamp)?;
223             }
224             RtmpMessageData::VideoData { data } => {
225                 self.common.on_video_data(data, timestamp)?;
226             }
227             RtmpMessageData::AmfData { raw_data } => {
228                 self.common.on_meta_data(raw_data, timestamp)?;
229             }
230 
231             _ => {}
232         }
233         Ok(())
234     }
235 
236     pub async fn on_amf0_command_message(
237         &mut self,
238         stream_id: &u32,
239         command_name: &Amf0ValueType,
240         transaction_id: &Amf0ValueType,
241         command_object: &Amf0ValueType,
242         others: &mut Vec<Amf0ValueType>,
243     ) -> Result<(), SessionError> {
244         let empty_cmd_name = &String::new();
245         let cmd_name = match command_name {
246             Amf0ValueType::UTF8String(str) => str,
247             _ => empty_cmd_name,
248         };
249 
250         let transaction_id = match transaction_id {
251             Amf0ValueType::Number(number) => number,
252             _ => &0.0,
253         };
254 
255         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
256         let obj = match command_object {
257             Amf0ValueType::Object(obj) => obj,
258             _ => &empty_cmd_obj,
259         };
260 
261         match cmd_name.as_str() {
262             "connect" => {
263                 print!("connect .......");
264                 self.on_connect(&transaction_id, &obj).await?;
265             }
266             "createStream" => {
267                 self.on_create_stream(transaction_id).await?;
268             }
269             "deleteStream" => {
270                 print!("deletestream....\n");
271                 if others.len() > 0 {
272                     let stream_id = match others.pop() {
273                         Some(val) => match val {
274                             Amf0ValueType::Number(streamid) => streamid,
275                             _ => 0.0,
276                         },
277                         _ => 0.0,
278                     };
279                     print!("deletestream....{}\n", stream_id);
280                     self.on_delete_stream(transaction_id, &stream_id).await?;
281                 }
282             }
283             "play" => {
284                 self.session_type = config::SERVER_PULL;
285                 self.unpacketizer.session_type = config::SERVER_PULL;
286                 self.on_play(transaction_id, stream_id, others).await?;
287             }
288             "publish" => {
289                 self.session_type = config::SERVER_PUSH;
290                 self.unpacketizer.session_type = config::SERVER_PUSH;
291                 self.on_publish(transaction_id, stream_id, others).await?;
292             }
293             _ => {}
294         }
295 
296         Ok(())
297     }
298 
299     fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> {
300         self.unpacketizer.update_max_chunk_size(chunk_size);
301         Ok(())
302     }
303 
304     async fn on_connect(
305         &mut self,
306         transaction_id: &f64,
307         command_obj: &HashMap<String, Amf0ValueType>,
308     ) -> Result<(), SessionError> {
309         self.connect_command_object = Some(command_obj.clone());
310         let mut control_message =
311             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
312         control_message
313             .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)
314             .await?;
315         control_message
316             .write_set_peer_bandwidth(
317                 define::PEER_BANDWIDTH,
318                 define::peer_bandwidth_limit_type::DYNAMIC,
319             )
320             .await?;
321         //control_message.write_set_chunk_size(CHUNK_SIZE).await?;
322 
323         let obj_encoding = command_obj.get("objectEncoding");
324         let encoding = match obj_encoding {
325             Some(Amf0ValueType::Number(encoding)) => encoding,
326             _ => &define::OBJENCODING_AMF0,
327         };
328 
329         let app_name = command_obj.get("app");
330         self.app_name = match app_name {
331             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
332             _ => {
333                 return Err(SessionError {
334                     value: SessionErrorValue::NoAppName,
335                 });
336             }
337         };
338 
339         let mut netconnection = NetConnection::new(BytesWriter::new());
340         let data = netconnection.connect_response(
341             &transaction_id,
342             &define::FMSVER.to_string(),
343             &define::CAPABILITIES,
344             &String::from("NetConnection.Connect.Success"),
345             &define::LEVEL.to_string(),
346             &String::from("Connection Succeeded."),
347             encoding,
348         )?;
349 
350         let mut chunk_info = ChunkInfo::new(
351             csid_type::COMMAND_AMF0_AMF3,
352             chunk_type::TYPE_0,
353             0,
354             data.len() as u32,
355             msg_type_id::COMMAND_AMF0,
356             0,
357             data,
358         );
359 
360         self.packetizer.write_chunk(&mut chunk_info).await?;
361 
362         Ok(())
363     }
364 
365     pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
366         let mut netconnection = NetConnection::new(BytesWriter::new());
367         let data = netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?;
368 
369         let mut chunk_info = ChunkInfo::new(
370             csid_type::COMMAND_AMF0_AMF3,
371             chunk_type::TYPE_0,
372             0,
373             data.len() as u32,
374             msg_type_id::COMMAND_AMF0,
375             0,
376             data,
377         );
378 
379         self.packetizer.write_chunk(&mut chunk_info).await?;
380 
381         Ok(())
382     }
383 
384     pub async fn on_delete_stream(
385         &mut self,
386         transaction_id: &f64,
387         stream_id: &f64,
388     ) -> Result<(), SessionError> {
389         self.common
390             .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone())
391             .await?;
392 
393         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
394         netstream
395             .on_status(
396                 transaction_id,
397                 &"status".to_string(),
398                 &"NetStream.DeleteStream.Suceess".to_string(),
399                 &"".to_string(),
400             )
401             .await?;
402 
403         print!("stream id{}", stream_id);
404 
405         //self.unsubscribe_from_channels().await?;
406 
407         Ok(())
408     }
409     pub async fn on_play(
410         &mut self,
411         transaction_id: &f64,
412         stream_id: &u32,
413         other_values: &mut Vec<Amf0ValueType>,
414     ) -> Result<(), SessionError> {
415         let length = other_values.len() as u8;
416         let mut index: u8 = 0;
417 
418         let mut stream_name: Option<String> = None;
419         let mut start: Option<f64> = None;
420         let mut duration: Option<f64> = None;
421         let mut reset: Option<bool> = None;
422 
423         loop {
424             if index >= length {
425                 break;
426             }
427             index = index + 1;
428             stream_name = match other_values.remove(0) {
429                 Amf0ValueType::UTF8String(val) => Some(val),
430                 _ => None,
431             };
432 
433             if index >= length {
434                 break;
435             }
436             index = index + 1;
437             start = match other_values.remove(0) {
438                 Amf0ValueType::Number(val) => Some(val),
439                 _ => None,
440             };
441 
442             if index >= length {
443                 break;
444             }
445             index = index + 1;
446             duration = match other_values.remove(0) {
447                 Amf0ValueType::Number(val) => Some(val),
448                 _ => None,
449             };
450 
451             if index >= length {
452                 break;
453             }
454             //index = index + 1;
455             reset = match other_values.remove(0) {
456                 Amf0ValueType::Boolean(val) => Some(val),
457                 _ => None,
458             };
459             break;
460         }
461         print!("start {}", start.is_some());
462         print!("druation {}", duration.is_some());
463         print!("reset {}", reset.is_some());
464 
465         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
466         event_messages.write_stream_begin(stream_id.clone()).await?;
467 
468         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
469         netstream
470             .on_status(
471                 transaction_id,
472                 &"status".to_string(),
473                 &"NetStream.Play.Reset".to_string(),
474                 &"reset".to_string(),
475             )
476             .await?;
477 
478         netstream
479             .on_status(
480                 transaction_id,
481                 &"status".to_string(),
482                 &"NetStream.Play.Start".to_string(),
483                 &"play start".to_string(),
484             )
485             .await?;
486 
487         netstream
488             .on_status(
489                 transaction_id,
490                 &"status".to_string(),
491                 &"NetStream.Data.Start".to_string(),
492                 &"data start.".to_string(),
493             )
494             .await?;
495 
496         netstream
497             .on_status(
498                 transaction_id,
499                 &"status".to_string(),
500                 &"NetStream.Play.PublishNotify".to_string(),
501                 &"play publish notify.".to_string(),
502             )
503             .await?;
504 
505         event_messages
506             .write_stream_is_record(stream_id.clone())
507             .await?;
508 
509         self.stream_name = stream_name.clone().unwrap();
510         self.common
511             .subscribe_from_channels(self.app_name.clone(), stream_name.unwrap(), self.session_id)
512             .await?;
513 
514         self.state = ServerSessionState::Play;
515 
516         Ok(())
517     }
518 
519     pub async fn on_publish(
520         &mut self,
521         transaction_id: &f64,
522         stream_id: &u32,
523         other_values: &mut Vec<Amf0ValueType>,
524     ) -> Result<(), SessionError> {
525         let length = other_values.len();
526 
527         if length < 2 {
528             return Err(SessionError {
529                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
530             });
531         }
532 
533         let stream_name = match other_values.remove(0) {
534             Amf0ValueType::UTF8String(val) => val,
535             _ => {
536                 return Err(SessionError {
537                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
538                 });
539             }
540         };
541 
542         self.stream_name = stream_name;
543 
544         let _ = match other_values.remove(0) {
545             Amf0ValueType::UTF8String(val) => val,
546             _ => {
547                 return Err(SessionError {
548                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
549                 });
550             }
551         };
552 
553         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
554         event_messages.write_stream_begin(stream_id.clone()).await?;
555 
556         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
557         netstream
558             .on_status(
559                 transaction_id,
560                 &"status".to_string(),
561                 &"NetStream.Publish.Start".to_string(),
562                 &"".to_string(),
563             )
564             .await?;
565 
566         self.common
567             .publish_to_channels(self.app_name.clone(), self.stream_name.clone())
568             .await?;
569 
570         Ok(())
571     }
572 }
573