1 use {
2     super::{
3         common::Common,
4         define,
5         define::SessionType,
6         errors::{SessionError, SessionErrorValue},
7     },
8     crate::utils::print::print,
9     crate::{
10         amf0::Amf0ValueType,
11         channels::define::ChannelEventProducer,
12         chunk::{
13             define::{chunk_type, csid_type, CHUNK_SIZE},
14             packetizer::ChunkPacketizer,
15             unpacketizer::{ChunkUnpacketizer, UnpackResult},
16             ChunkInfo,
17         },
18         handshake::handshake::{ClientHandshakeState, SimpleHandshakeClient},
19         messages::{
20             define::{msg_type_id, RtmpMessageData},
21             parser::MessageParser,
22         },
23         netconnection::commands::{ConnectProperties, NetConnection},
24         netstream::writer::NetStreamWriter,
25         protocol_control_messages::writer::ProtocolControlMessagesWriter,
26         user_control_messages::writer::EventMessagesWriter,
27     },
28     bytes::BytesMut,
29     networkio::{
30         bytes_writer::{AsyncBytesWriter, BytesWriter},
31         networkio::NetworkIO,
32     },
33     std::{collections::HashMap, sync::Arc},
34     tokio::{net::TcpStream, sync::Mutex},
35 };
36 
37 #[allow(dead_code)]
38 enum ClientSessionState {
39     Handshake,
40     Connect,
41     CreateStream,
42     Play,
43     PublishingContent,
44     StartPublish,
45     WaitStateChange,
46 }
47 
48 #[allow(dead_code)]
49 enum ClientSessionPlayState {
50     Handshake,
51     Connect,
52     CreateStream,
53     Play,
54 }
55 
56 #[allow(dead_code)]
57 enum ClientSessionPublishState {
58     Handshake,
59     Connect,
60     CreateStream,
61     PublishingContent,
62 }
63 #[allow(dead_code)]
64 pub enum ClientType {
65     Play,
66     Publish,
67 }
68 pub struct ClientSession {
69     io: Arc<Mutex<NetworkIO>>,
70     common: Common,
71 
72     handshaker: SimpleHandshakeClient,
73 
74     packetizer: ChunkPacketizer,
75     unpacketizer: ChunkUnpacketizer,
76 
77     app_name: String,
78     stream_name: String,
79     session_type: u8,
80     session_id: u64,
81 
82     state: ClientSessionState,
83     client_type: ClientType,
84     connect_command_object: Option<HashMap<String, Amf0ValueType>>,
85 }
86 
87 impl ClientSession {
88     #[allow(dead_code)]
89     pub fn new(
90         stream: TcpStream,
91         client_type: ClientType,
92         app_name: String,
93         stream_name: String,
94         event_producer: ChannelEventProducer,
95         session_id: u64,
96     ) -> Self {
97         let net_io = Arc::new(Mutex::new(NetworkIO::new(stream)));
98 
99         Self {
100             io: Arc::clone(&net_io),
101             common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Client),
102 
103             handshaker: SimpleHandshakeClient::new(Arc::clone(&net_io)),
104 
105             packetizer: ChunkPacketizer::new(Arc::clone(&net_io)),
106             unpacketizer: ChunkUnpacketizer::new(),
107 
108             app_name: app_name,
109             stream_name: stream_name,
110             client_type: client_type,
111 
112             state: ClientSessionState::Handshake,
113             session_type: 0,
114             session_id: session_id,
115             connect_command_object: None,
116         }
117     }
118 
119     pub fn set_connect_command_object(&mut self, object: HashMap<String, Amf0ValueType>) {
120         self.connect_command_object = Some(object);
121     }
122 
123     pub async fn run(&mut self) -> Result<(), SessionError> {
124         loop {
125             match self.state {
126                 ClientSessionState::Handshake => {
127                     println!("handshake");
128                     self.handshake().await?;
129                     continue;
130                 }
131                 ClientSessionState::Connect => {
132                     println!("connect");
133                     self.send_connect(&(define::TRANSACTION_ID_CONNECT as f64))
134                         .await?;
135                     self.state = ClientSessionState::WaitStateChange;
136                 }
137                 ClientSessionState::CreateStream => {
138                     println!("CreateStream");
139                     self.send_create_stream(&(define::TRANSACTION_ID_CREATE_STREAM as f64))
140                         .await?;
141                     self.state = ClientSessionState::WaitStateChange;
142                 }
143                 ClientSessionState::Play => {
144                     self.send_play(&0.0, &self.stream_name.clone(), &0.0, &0.0, &false)
145                         .await?;
146                     self.state = ClientSessionState::WaitStateChange;
147                 }
148                 ClientSessionState::PublishingContent => {
149                     println!("PublishingContent");
150                     self.send_publish(&0.0, &self.stream_name.clone(), &"live".to_string())
151                         .await?;
152                     self.state = ClientSessionState::WaitStateChange;
153                 }
154                 ClientSessionState::StartPublish => {
155                     println!("StartPublish");
156                     self.common.send_channel_data().await?;
157                 }
158                 ClientSessionState::WaitStateChange => {}
159             }
160 
161             let data = self.io.lock().await.read().await?;
162             self.unpacketizer.extend_data(&data[..]);
163             let result = self.unpacketizer.read_chunk()?;
164 
165             match result {
166                 UnpackResult::ChunkInfo(chunk_info) => {
167                     let mut message_parser = MessageParser::new(chunk_info, self.session_type);
168                     let mut msg = message_parser.parse()?;
169 
170                     self.process_messages(&mut msg).await?;
171                 }
172                 _ => {}
173             }
174         }
175 
176         // Ok(())
177     }
178 
179     async fn handshake(&mut self) -> Result<(), SessionError> {
180         loop {
181             self.handshaker.handshake().await?;
182             if self.handshaker.state == ClientHandshakeState::Finish {
183                 println!("handshake finish");
184                 break;
185             }
186 
187             let data = self.io.lock().await.read().await?;
188             print(data.clone());
189             self.handshaker.extend_data(&data[..]);
190         }
191 
192         self.state = ClientSessionState::Connect;
193 
194         Ok(())
195     }
196 
197     pub async fn process_messages(
198         &mut self,
199         msg: &mut RtmpMessageData,
200     ) -> Result<(), SessionError> {
201         match msg {
202             RtmpMessageData::Amf0Command {
203                 command_name,
204                 transaction_id,
205                 command_object,
206                 others,
207             } => {
208                 self.process_amf0_command_message(
209                     command_name,
210                     transaction_id,
211                     command_object,
212                     others,
213                 )
214                 .await?
215             }
216             RtmpMessageData::SetPeerBandwidth { properties } => {
217                 print!("{}", properties.window_size);
218                 self.on_set_peer_bandwidth().await?
219             }
220             RtmpMessageData::SetChunkSize { chunk_size } => self.on_set_chunk_size(chunk_size)?,
221             RtmpMessageData::AudioData { data } => {
222                 let _ = data.len();
223             }
224             RtmpMessageData::VideoData { data } => {
225                 let _ = data.len();
226             }
227 
228             _ => {}
229         }
230         Ok(())
231     }
232 
233     pub async fn process_amf0_command_message(
234         &mut self,
235         command_name: &Amf0ValueType,
236         transaction_id: &Amf0ValueType,
237         command_object: &Amf0ValueType,
238         others: &mut Vec<Amf0ValueType>,
239     ) -> Result<(), SessionError> {
240         let empty_cmd_name = &String::new();
241         let cmd_name = match command_name {
242             Amf0ValueType::UTF8String(str) => str,
243             _ => empty_cmd_name,
244         };
245 
246         let transaction_id = match transaction_id {
247             Amf0ValueType::Number(number) => number.clone() as u8,
248             _ => 0,
249         };
250 
251         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
252         let _ = match command_object {
253             Amf0ValueType::Object(obj) => obj,
254             // Amf0ValueType::Null =>
255             _ => &empty_cmd_obj,
256         };
257 
258         match cmd_name.as_str() {
259             "_result" => match transaction_id {
260                 define::TRANSACTION_ID_CONNECT => {
261                     self.on_result_connect().await?;
262                 }
263                 define::TRANSACTION_ID_CREATE_STREAM => {
264                     self.on_result_create_stream()?;
265                 }
266                 _ => {}
267             },
268             "_error" => {
269                 self.on_error()?;
270             }
271             "onStatus" => {
272                 match others.remove(0) {
273                     Amf0ValueType::Object(obj) => self.on_status(&obj).await?,
274                     _ => {
275                         return Err(SessionError {
276                             value: SessionErrorValue::Amf0ValueCountNotCorrect,
277                         })
278                     }
279                 };
280             }
281 
282             _ => {}
283         }
284 
285         Ok(())
286     }
287 
288     pub async fn send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
289         self.send_set_chunk_size().await?;
290 
291         // let properties = ConnectProperties::new(self.app_name.clone());
292         let mut netconnection = NetConnection::new(BytesWriter::new());
293         let data = netconnection
294             .connect_with_value(transaction_id, self.connect_command_object.clone().unwrap())?;
295 
296         let mut chunk_info = ChunkInfo::new(
297             csid_type::COMMAND_AMF0_AMF3,
298             chunk_type::TYPE_0,
299             0,
300             data.len() as u32,
301             msg_type_id::COMMAND_AMF0,
302             0,
303             data,
304         );
305 
306         self.packetizer.write_chunk(&mut chunk_info).await?;
307         Ok(())
308     }
309 
310     pub async fn send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
311         let mut netconnection = NetConnection::new(BytesWriter::new());
312         let data = netconnection.create_stream(transaction_id)?;
313 
314         let mut chunk_info = ChunkInfo::new(
315             csid_type::COMMAND_AMF0_AMF3,
316             chunk_type::TYPE_0,
317             0,
318             data.len() as u32,
319             msg_type_id::COMMAND_AMF0,
320             0,
321             data,
322         );
323 
324         self.packetizer.write_chunk(&mut chunk_info).await?;
325 
326         Ok(())
327     }
328 
329     pub async fn send_delete_stream(
330         &mut self,
331         transaction_id: &f64,
332         stream_id: &f64,
333     ) -> Result<(), SessionError> {
334         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
335         netstream.delete_stream(transaction_id, stream_id).await?;
336 
337         Ok(())
338     }
339 
340     pub async fn send_publish(
341         &mut self,
342         transaction_id: &f64,
343         stream_name: &String,
344         stream_type: &String,
345     ) -> Result<(), SessionError> {
346         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
347         netstream
348             .publish(transaction_id, stream_name, stream_type)
349             .await?;
350 
351         Ok(())
352     }
353 
354     pub async fn send_play(
355         &mut self,
356         transaction_id: &f64,
357         stream_name: &String,
358         start: &f64,
359         duration: &f64,
360         reset: &bool,
361     ) -> Result<(), SessionError> {
362         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
363         netstream
364             .play(transaction_id, stream_name, start, duration, reset)
365             .await?;
366 
367         Ok(())
368     }
369 
370     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
371         let mut controlmessage =
372             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
373         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
374         Ok(())
375     }
376 
377     pub async fn send_window_acknowledgement_size(
378         &mut self,
379         window_size: u32,
380     ) -> Result<(), SessionError> {
381         let mut controlmessage =
382             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
383         controlmessage
384             .write_window_acknowledgement_size(window_size)
385             .await?;
386         Ok(())
387     }
388 
389     pub async fn send_set_buffer_length(
390         &mut self,
391         stream_id: u32,
392         ms: u32,
393     ) -> Result<(), SessionError> {
394         let mut eventmessages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
395         eventmessages.write_set_buffer_length(stream_id, ms).await?;
396 
397         Ok(())
398     }
399 
400     pub async fn on_result_connect(&mut self) -> Result<(), SessionError> {
401         let mut controlmessage =
402             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
403         controlmessage.write_acknowledgement(3107).await?;
404 
405         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
406         netstream
407             .release_stream(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name)
408             .await?;
409         netstream
410             .fcpublish(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name)
411             .await?;
412 
413         self.state = ClientSessionState::CreateStream;
414 
415         Ok(())
416     }
417 
418     pub fn on_result_create_stream(&mut self) -> Result<(), SessionError> {
419         match self.client_type {
420             ClientType::Play => {
421                 self.state = ClientSessionState::Play;
422             }
423             ClientType::Publish => {
424                 self.state = ClientSessionState::PublishingContent;
425             }
426         }
427         Ok(())
428     }
429 
430     pub fn on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError> {
431         self.unpacketizer
432             .update_max_chunk_size(chunk_size.clone() as usize);
433         Ok(())
434     }
435 
436     pub async fn on_set_peer_bandwidth(&mut self) -> Result<(), SessionError> {
437         self.send_window_acknowledgement_size(250000).await?;
438         Ok(())
439     }
440 
441     pub fn on_error(&mut self) -> Result<(), SessionError> {
442         Ok(())
443     }
444 
445     pub async fn on_status(
446         &mut self,
447         obj: &HashMap<String, Amf0ValueType>,
448     ) -> Result<(), SessionError> {
449         println!("on_status===");
450         if let Some(Amf0ValueType::UTF8String(code_info)) = obj.get("code") {
451             match &code_info[..] == "NetStream.Publish.Start" {
452                 true => {
453                     self.state = ClientSessionState::StartPublish;
454                     self.common
455                         .subscribe_from_channels(
456                             self.app_name.clone(),
457                             self.stream_name.clone(),
458                             self.session_id,
459                         )
460                         .await?;
461                 }
462                 false => {}
463             }
464         }
465         println!("{}", obj.len());
466         Ok(())
467     }
468 }
469