1 use {
2     super::{
3         common::Common,
4         define,
5         define::SessionType,
6         errors::{SessionError, SessionErrorValue},
7     },
8     crate::utils::print::print,
9     crate::{
10         amf0::Amf0ValueType,
11         channels::define::ChannelEventProducer,
12         chunk::{
13             define::{chunk_type, csid_type, CHUNK_SIZE},
14             packetizer::ChunkPacketizer,
15             unpacketizer::{ChunkUnpacketizer, UnpackResult},
16             ChunkInfo,
17         },
18         handshake::handshake::{ClientHandshakeState, SimpleHandshakeClient},
19         messages::{
20             define::{msg_type_id, RtmpMessageData},
21             parser::MessageParser,
22         },
23         netconnection::commands::{ConnectProperties, NetConnection},
24         netstream::writer::NetStreamWriter,
25         protocol_control_messages::writer::ProtocolControlMessagesWriter,
26         user_control_messages::writer::EventMessagesWriter,
27     },
28     networkio::{
29         bytes_writer::{AsyncBytesWriter, BytesWriter},
30         networkio::NetworkIO,
31     },
32     std::{collections::HashMap, sync::Arc},
33     tokio::{net::TcpStream, sync::Mutex},
34 };
35 
36 #[allow(dead_code)]
37 enum ClientSessionState {
38     Handshake,
39     Connect,
40     CreateStream,
41     Play,
42     PublishingContent,
43     StartPublish,
44     WaitStateChange,
45 }
46 
47 #[allow(dead_code)]
48 enum ClientSessionPlayState {
49     Handshake,
50     Connect,
51     CreateStream,
52     Play,
53 }
54 
55 #[allow(dead_code)]
56 enum ClientSessionPublishState {
57     Handshake,
58     Connect,
59     CreateStream,
60     PublishingContent,
61 }
62 #[allow(dead_code)]
63 pub enum ClientType {
64     Play,
65     Publish,
66 }
67 pub struct ClientSession {
68     io: Arc<Mutex<NetworkIO>>,
69     common: Common,
70 
71     handshaker: SimpleHandshakeClient,
72 
73     packetizer: ChunkPacketizer,
74     unpacketizer: ChunkUnpacketizer,
75 
76     app_name: String,
77     stream_name: String,
78     session_type: u8,
79     session_id: u64,
80 
81     state: ClientSessionState,
82     client_type: ClientType,
83 }
84 
85 impl ClientSession {
86     #[allow(dead_code)]
87     pub fn new(
88         stream: TcpStream,
89         client_type: ClientType,
90         app_name: String,
91         stream_name: String,
92         event_producer: ChannelEventProducer,
93         session_id: u64,
94     ) -> Self {
95         let net_io = Arc::new(Mutex::new(NetworkIO::new(stream)));
96 
97         Self {
98             io: Arc::clone(&net_io),
99             common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Client),
100 
101             handshaker: SimpleHandshakeClient::new(Arc::clone(&net_io)),
102 
103             packetizer: ChunkPacketizer::new(Arc::clone(&net_io)),
104             unpacketizer: ChunkUnpacketizer::new(),
105 
106             app_name: app_name,
107             stream_name: stream_name,
108             client_type: client_type,
109 
110             state: ClientSessionState::Handshake,
111             session_type: 0,
112             session_id: session_id,
113         }
114     }
115 
116     pub async fn run(&mut self) -> Result<(), SessionError> {
117         loop {
118             match self.state {
119                 ClientSessionState::Handshake => {
120                     println!("handshake");
121                     self.handshake().await?;
122                     continue;
123                 }
124                 ClientSessionState::Connect => {
125                     println!("connect");
126                     self.send_connect(&(define::TRANSACTION_ID_CONNECT as f64))
127                         .await?;
128                     self.state = ClientSessionState::WaitStateChange;
129                 }
130                 ClientSessionState::CreateStream => {
131                     println!("CreateStream");
132                     self.send_create_stream(&(define::TRANSACTION_ID_CREATE_STREAM as f64))
133                         .await?;
134                     self.state = ClientSessionState::WaitStateChange;
135                 }
136                 ClientSessionState::Play => {
137                     self.send_play(&0.0, &self.stream_name.clone(), &0.0, &0.0, &false)
138                         .await?;
139                     self.state = ClientSessionState::WaitStateChange;
140                 }
141                 ClientSessionState::PublishingContent => {
142                     println!("PublishingContent");
143                     self.send_publish(&0.0, &self.stream_name.clone(), &"live".to_string())
144                         .await?;
145                     self.state = ClientSessionState::WaitStateChange;
146                 }
147                 ClientSessionState::StartPublish => {
148                     println!("StartPublish");
149                     self.common.send_channel_data().await?;
150                 }
151                 ClientSessionState::WaitStateChange => {}
152             }
153 
154             let data = self.io.lock().await.read().await?;
155             self.unpacketizer.extend_data(&data[..]);
156             let result = self.unpacketizer.read_chunk()?;
157 
158             match result {
159                 UnpackResult::ChunkInfo(chunk_info) => {
160                     let mut message_parser =
161                         MessageParser::new(chunk_info.clone(), self.session_type);
162                     let mut msg = message_parser.parse()?;
163                     let timestamp = chunk_info.message_header.timestamp;
164 
165                     self.process_messages(&mut msg, &timestamp).await?;
166                 }
167                 _ => {}
168             }
169         }
170 
171         // Ok(())
172     }
173 
174     async fn handshake(&mut self) -> Result<(), SessionError> {
175         loop {
176             self.handshaker.handshake().await?;
177             if self.handshaker.state == ClientHandshakeState::Finish {
178                 println!("handshake finish");
179                 break;
180             }
181 
182             let data = self.io.lock().await.read().await?;
183             print(data.clone());
184             self.handshaker.extend_data(&data[..]);
185         }
186 
187         self.state = ClientSessionState::Connect;
188 
189         Ok(())
190     }
191 
192     pub async fn process_messages(
193         &mut self,
194         msg: &mut RtmpMessageData,
195         timestamp: &u32,
196     ) -> Result<(), SessionError> {
197         match msg {
198             RtmpMessageData::Amf0Command {
199                 command_name,
200                 transaction_id,
201                 command_object,
202                 others,
203             } => {
204                 self.on_amf0_command_message(command_name, transaction_id, command_object, others)
205                     .await?
206             }
207             RtmpMessageData::SetPeerBandwidth { properties } => {
208                 print!("{}", properties.window_size);
209                 self.on_set_peer_bandwidth().await?
210             }
211             RtmpMessageData::SetChunkSize { chunk_size } => self.on_set_chunk_size(chunk_size)?,
212 
213             RtmpMessageData::StreamBegin { stream_id } => self.on_stream_begin(stream_id)?,
214 
215             RtmpMessageData::StreamIsRecorded { stream_id } => {
216                 self.on_stream_is_recorded(stream_id)?
217             }
218 
219             RtmpMessageData::AudioData { data } => self.common.on_audio_data(data, timestamp)?,
220 
221             RtmpMessageData::VideoData { data } => self.common.on_video_data(data, timestamp)?,
222 
223             _ => {}
224         }
225         Ok(())
226     }
227 
228     pub async fn on_amf0_command_message(
229         &mut self,
230         command_name: &Amf0ValueType,
231         transaction_id: &Amf0ValueType,
232         command_object: &Amf0ValueType,
233         others: &mut Vec<Amf0ValueType>,
234     ) -> Result<(), SessionError> {
235         let empty_cmd_name = &String::new();
236         let cmd_name = match command_name {
237             Amf0ValueType::UTF8String(str) => str,
238             _ => empty_cmd_name,
239         };
240 
241         let transaction_id = match transaction_id {
242             Amf0ValueType::Number(number) => number.clone() as u8,
243             _ => 0,
244         };
245 
246         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
247         let _ = match command_object {
248             Amf0ValueType::Object(obj) => obj,
249             // Amf0ValueType::Null =>
250             _ => &empty_cmd_obj,
251         };
252 
253         match cmd_name.as_str() {
254             "_result" => match transaction_id {
255                 define::TRANSACTION_ID_CONNECT => {
256                     self.on_result_connect().await?;
257                 }
258                 define::TRANSACTION_ID_CREATE_STREAM => {
259                     self.on_result_create_stream()?;
260                 }
261                 _ => {}
262             },
263             "_error" => {
264                 self.on_error()?;
265             }
266             "onStatus" => {
267                 match others.remove(0) {
268                     Amf0ValueType::Object(obj) => self.on_status(&obj).await?,
269                     _ => {
270                         return Err(SessionError {
271                             value: SessionErrorValue::Amf0ValueCountNotCorrect,
272                         })
273                     }
274                 };
275             }
276 
277             _ => {}
278         }
279 
280         Ok(())
281     }
282 
283     pub async fn send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
284         self.send_set_chunk_size().await?;
285 
286         let mut netconnection = NetConnection::new(BytesWriter::new());
287 
288         let mut properties = ConnectProperties::new_none();
289 
290         let url = format!("rtmp://localhost:1935/{app_name}", app_name = self.app_name);
291         properties.app = Some(self.app_name.clone());
292         properties.tc_url = Some(url.clone());
293 
294         match self.client_type {
295             ClientType::Play => {
296                 properties.flash_ver = Some("flashVerFMLE/3.0 (compatible; FMSc/1.0)".to_string());
297                 properties.swf_url = Some(url.clone());
298             }
299             ClientType::Publish => {
300                 properties.fpad = Some(false);
301                 properties.capabilities = Some(15_f64);
302                 properties.audio_codecs = Some(3191_f64);
303                 properties.video_codecs = Some(252_f64);
304                 properties.video_function = Some(1_f64);
305             }
306         }
307 
308         let data = netconnection.connect(transaction_id, &properties)?;
309 
310         let mut chunk_info = ChunkInfo::new(
311             csid_type::COMMAND_AMF0_AMF3,
312             chunk_type::TYPE_0,
313             0,
314             data.len() as u32,
315             msg_type_id::COMMAND_AMF0,
316             0,
317             data,
318         );
319 
320         self.packetizer.write_chunk(&mut chunk_info).await?;
321         Ok(())
322     }
323 
324     pub async fn send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
325         let mut netconnection = NetConnection::new(BytesWriter::new());
326         let data = netconnection.create_stream(transaction_id)?;
327 
328         let mut chunk_info = ChunkInfo::new(
329             csid_type::COMMAND_AMF0_AMF3,
330             chunk_type::TYPE_0,
331             0,
332             data.len() as u32,
333             msg_type_id::COMMAND_AMF0,
334             0,
335             data,
336         );
337 
338         self.packetizer.write_chunk(&mut chunk_info).await?;
339 
340         Ok(())
341     }
342 
343     pub async fn send_delete_stream(
344         &mut self,
345         transaction_id: &f64,
346         stream_id: &f64,
347     ) -> Result<(), SessionError> {
348         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
349         netstream.delete_stream(transaction_id, stream_id).await?;
350 
351         Ok(())
352     }
353 
354     pub async fn send_publish(
355         &mut self,
356         transaction_id: &f64,
357         stream_name: &String,
358         stream_type: &String,
359     ) -> Result<(), SessionError> {
360         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
361         netstream
362             .publish(transaction_id, stream_name, stream_type)
363             .await?;
364 
365         Ok(())
366     }
367 
368     pub async fn send_play(
369         &mut self,
370         transaction_id: &f64,
371         stream_name: &String,
372         start: &f64,
373         duration: &f64,
374         reset: &bool,
375     ) -> Result<(), SessionError> {
376         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
377         netstream
378             .play(transaction_id, stream_name, start, duration, reset)
379             .await?;
380 
381         Ok(())
382     }
383 
384     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
385         let mut controlmessage =
386             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
387         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
388         Ok(())
389     }
390 
391     pub async fn send_window_acknowledgement_size(
392         &mut self,
393         window_size: u32,
394     ) -> Result<(), SessionError> {
395         let mut controlmessage =
396             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
397         controlmessage
398             .write_window_acknowledgement_size(window_size)
399             .await?;
400         Ok(())
401     }
402 
403     pub async fn send_set_buffer_length(
404         &mut self,
405         stream_id: u32,
406         ms: u32,
407     ) -> Result<(), SessionError> {
408         let mut eventmessages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
409         eventmessages.write_set_buffer_length(stream_id, ms).await?;
410 
411         Ok(())
412     }
413 
414     pub async fn on_result_connect(&mut self) -> Result<(), SessionError> {
415         let mut controlmessage =
416             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
417         controlmessage.write_acknowledgement(3107).await?;
418 
419         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
420         netstream
421             .release_stream(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name)
422             .await?;
423         netstream
424             .fcpublish(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name)
425             .await?;
426 
427         self.state = ClientSessionState::CreateStream;
428 
429         Ok(())
430     }
431 
432     pub fn on_result_create_stream(&mut self) -> Result<(), SessionError> {
433         match self.client_type {
434             ClientType::Play => {
435                 self.state = ClientSessionState::Play;
436             }
437             ClientType::Publish => {
438                 self.state = ClientSessionState::PublishingContent;
439             }
440         }
441         Ok(())
442     }
443 
444     pub fn on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError> {
445         self.unpacketizer
446             .update_max_chunk_size(chunk_size.clone() as usize);
447         Ok(())
448     }
449 
450     pub fn on_stream_is_recorded(&mut self, stream_id: &mut u32) -> Result<(), SessionError> {
451         println!("stream is recorded stream_id is {}", stream_id);
452         Ok(())
453     }
454 
455     pub fn on_stream_begin(&mut self, stream_id: &mut u32) -> Result<(), SessionError> {
456         println!("stream is begin stream_id is {}", stream_id);
457         Ok(())
458     }
459 
460     pub async fn on_set_peer_bandwidth(&mut self) -> Result<(), SessionError> {
461         self.send_window_acknowledgement_size(250000).await?;
462         Ok(())
463     }
464 
465     pub fn on_error(&mut self) -> Result<(), SessionError> {
466         Ok(())
467     }
468 
469     pub async fn on_status(
470         &mut self,
471         obj: &HashMap<String, Amf0ValueType>,
472     ) -> Result<(), SessionError> {
473         println!("on_status===");
474         if let Some(Amf0ValueType::UTF8String(code_info)) = obj.get("code") {
475             match &code_info[..] {
476                 "NetStream.Publish.Start" => {
477                     self.state = ClientSessionState::StartPublish;
478                     self.common
479                         .subscribe_from_channels(
480                             self.app_name.clone(),
481                             self.stream_name.clone(),
482                             self.session_id,
483                         )
484                         .await?;
485                 }
486                 "NetStream.Publish.Reset" => {}
487 
488                 "NetStream.Play.Start" => {
489                     self.common
490                         .publish_to_channels(self.app_name.clone(), self.stream_name.clone())
491                         .await?
492                 }
493                 _ => {}
494             }
495         }
496         println!("{}", obj.len());
497         Ok(())
498     }
499 }
500