1 use super::errors::SessionError;
2 
3 use crate::chunk::define::{chunk_type, csid_type, CHUNK_SIZE};
4 use crate::chunk::unpacketizer::ChunkUnpacketizer;
5 use crate::chunk::unpacketizer::UnpackResult;
6 use crate::chunk::{packetizer::ChunkPacketizer, ChunkInfo};
7 
8 use super::errors::SessionErrorValue;
9 use crate::handshake::handshake::SimpleHandshakeClient;
10 
11 use crate::messages::define::msg_type_id;
12 use crate::messages::define::RtmpMessageData;
13 use crate::messages::parser::MessageParser;
14 
15 use crate::amf0::Amf0ValueType;
16 
17 use netio::bytes_writer::AsyncBytesWriter;
18 
19 use netio::bytes_reader::BytesReader;
20 use netio::bytes_writer::BytesWriter;
21 use netio::netio::NetworkIO;
22 
23 use std::time::Duration;
24 
25 use crate::handshake::handshake::ClientHandshakeState;
26 use crate::netconnection::commands::ConnectProperties;
27 use crate::netconnection::commands::NetConnection;
28 use crate::netstream::commands::NetStream;
29 use crate::protocol_control_messages::control_messages::ControlMessages;
30 
31 use crate::user_control_messages::event_messages::EventMessages;
32 
33 use std::collections::HashMap;
34 
35 use super::define;
36 use tokio::net::TcpStream;
37 
38 use bytes::BytesMut;
39 use std::sync::Arc;
40 use tokio::sync::Mutex;
41 
42 use std::cell::{RefCell, RefMut};
43 use std::rc::Rc;
44 
45 enum ClientSessionState {
46     Handshake,
47     Connect,
48     CreateStream,
49     Play,
50     PublishingContent,
51 }
52 
53 enum ClientSessionPlayState {
54     Handshake,
55     Connect,
56     CreateStream,
57     Play,
58 }
59 
60 enum ClientSessionPublishState {
61     Handshake,
62     Connect,
63     CreateStream,
64     PublishingContent,
65 }
66 
67 enum ClientType {
68     Play,
69     Publish,
70 }
71 pub struct ClientSession {
72     packetizer: ChunkPacketizer,
73     unpacketizer: ChunkUnpacketizer,
74     handshaker: SimpleHandshakeClient,
75     io: Arc<Mutex<NetworkIO>>,
76 
77     play_state: ClientSessionPlayState,
78     publish_state: ClientSessionPublishState,
79     state: ClientSessionState,
80     client_type: ClientType,
81     stream_name: String,
82 }
83 
84 impl ClientSession {
85     fn new(
86         stream: TcpStream,
87         timeout: Duration,
88         client_type: ClientType,
89         stream_name: String,
90     ) -> Self {
91         let net_io = Arc::new(Mutex::new(NetworkIO::new(stream, timeout)));
92 
93         // let reader = BytesReader::new(BytesMut::new());
94 
95         Self {
96             io: Arc::clone(&net_io),
97 
98             packetizer: ChunkPacketizer::new(Arc::clone(&net_io)),
99             unpacketizer: ChunkUnpacketizer::new(),
100             handshaker: SimpleHandshakeClient::new(Arc::clone(&net_io)),
101 
102             play_state: ClientSessionPlayState::Handshake,
103             publish_state: ClientSessionPublishState::Handshake,
104             state: ClientSessionState::Handshake,
105             client_type: client_type,
106             stream_name: stream_name,
107         }
108     }
109 
110     pub async fn run(&mut self) -> Result<(), SessionError> {
111         loop {
112             match self.state {
113                 ClientSessionState::Handshake => {
114                     self.handshake().await?;
115                 }
116                 ClientSessionState::Connect => {
117                     self.send_connect(&(define::TRANSACTION_ID_CONNECT as f64))
118                         .await?;
119                 }
120                 ClientSessionState::CreateStream => {
121                     self.send_create_stream(&(define::TRANSACTION_ID_CREATE_STREAM as f64))
122                         .await?;
123                 }
124                 ClientSessionState::Play => {
125                     self.send_play(&0.0, &self.stream_name.clone(), &0.0, &0.0, &false)
126                         .await?;
127                 }
128                 ClientSessionState::PublishingContent => {
129                     self.send_publish(&0.0, &self.stream_name.clone(), &"live".to_string())
130                         .await?;
131                 }
132             }
133 
134             let data = self.io.lock().await.read().await?;
135             self.unpacketizer.extend_data(&data[..]);
136             let result = self.unpacketizer.read_chunk()?;
137 
138             match result {
139                 UnpackResult::ChunkInfo(chunk_info) => {
140                     let mut message_parser = MessageParser::new(chunk_info);
141                     let mut msg = message_parser.parse()?;
142 
143                     self.process_messages(&mut msg).await?;
144                 }
145                 _ => {}
146             }
147         }
148 
149         // Ok(())
150     }
151 
152     async fn handshake(&mut self) -> Result<(), SessionError> {
153         loop {
154             self.handshaker.handshake().await?;
155             if self.handshaker.state == ClientHandshakeState::Finish {
156                 break;
157             }
158 
159             let data = self.io.lock().await.read().await?;
160             self.handshaker.extend_data(&data[..]);
161         }
162         self.state = ClientSessionState::Connect;
163 
164         Ok(())
165     }
166 
167     pub async fn process_messages(
168         &mut self,
169         msg: &mut RtmpMessageData,
170     ) -> Result<(), SessionError> {
171         match msg {
172             RtmpMessageData::Amf0Command {
173                 command_name,
174                 transaction_id,
175                 command_object,
176                 others,
177             } => self.process_amf0_command_message(
178                 command_name,
179                 transaction_id,
180                 command_object,
181                 others,
182             )?,
183             RtmpMessageData::SetPeerBandwidth { properties } => {
184                 self.on_set_peer_bandwidth().await?
185             }
186             RtmpMessageData::SetChunkSize { chunk_size } => self.on_set_chunk_size(chunk_size)?,
187             RtmpMessageData::AudioData { data } => {}
188             RtmpMessageData::VideoData { data } => {}
189 
190             _ => {}
191         }
192         Ok(())
193     }
194 
195     pub fn process_amf0_command_message(
196         &mut self,
197         command_name: &Amf0ValueType,
198         transaction_id: &Amf0ValueType,
199         command_object: &Amf0ValueType,
200         others: &mut Vec<Amf0ValueType>,
201     ) -> Result<(), SessionError> {
202         let empty_cmd_name = &String::new();
203         let cmd_name = match command_name {
204             Amf0ValueType::UTF8String(str) => str,
205             _ => empty_cmd_name,
206         };
207 
208         let transaction_id = match transaction_id {
209             Amf0ValueType::Number(number) => number.clone() as u8,
210             _ => 0,
211         };
212 
213         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
214         let obj = match command_object {
215             Amf0ValueType::Object(obj) => obj,
216             // Amf0ValueType::Null =>
217             _ => &empty_cmd_obj,
218         };
219 
220         match cmd_name.as_str() {
221             "_reslut" => match transaction_id {
222                 define::TRANSACTION_ID_CONNECT => {
223                     self.on_result_connect()?;
224                 }
225                 define::TRANSACTION_ID_CREATE_STREAM => {
226                     self.on_result_create_stream()?;
227                 }
228                 _ => {}
229             },
230             "_error" => {
231                 self.on_error()?;
232             }
233             "onStatus" => {
234                 match others.remove(0) {
235                     Amf0ValueType::Object(obj) => self.on_status(&obj),
236                     _ => Err(SessionError {
237                         value: SessionErrorValue::Amf0ValueCountNotCorrect,
238                     }),
239                 };
240             }
241 
242             _ => {}
243         }
244 
245         Ok(())
246     }
247 
248     pub async fn send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
249         let app_name = String::from("app");
250         let properties = ConnectProperties::new(app_name);
251 
252         let mut netconnection = NetConnection::new(BytesWriter::new());
253         let data = netconnection.connect(transaction_id, &properties)?;
254 
255         let mut chunk_info = ChunkInfo::new(
256             csid_type::COMMAND_AMF0_AMF3,
257             chunk_type::TYPE_0,
258             0,
259             data.len() as u32,
260             msg_type_id::COMMAND_AMF0,
261             0,
262             data,
263         );
264 
265         self.packetizer.write_chunk(&mut chunk_info).await?;
266         Ok(())
267     }
268 
269     pub async fn send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
270         let mut netconnection = NetConnection::new(BytesWriter::new());
271         let data = netconnection.create_stream(transaction_id)?;
272 
273         let mut chunk_info = ChunkInfo::new(
274             csid_type::COMMAND_AMF0_AMF3,
275             chunk_type::TYPE_0,
276             0,
277             data.len() as u32,
278             msg_type_id::COMMAND_AMF0,
279             0,
280             data,
281         );
282 
283         self.packetizer.write_chunk(&mut chunk_info).await?;
284 
285         Ok(())
286     }
287 
288     pub async fn send_delete_stream(
289         &mut self,
290         transaction_id: &f64,
291         stream_id: &f64,
292     ) -> Result<(), SessionError> {
293         let mut netstream = NetStream::new(BytesWriter::new());
294         let data = netstream.delete_stream(transaction_id, stream_id)?;
295 
296         let mut chunk_info = ChunkInfo::new(
297             csid_type::COMMAND_AMF0_AMF3,
298             chunk_type::TYPE_0,
299             0,
300             data.len() as u32,
301             msg_type_id::COMMAND_AMF0,
302             0,
303             data,
304         );
305 
306         self.packetizer.write_chunk(&mut chunk_info).await?;
307         Ok(())
308     }
309 
310     pub async fn send_publish(
311         &mut self,
312         transaction_id: &f64,
313         stream_name: &String,
314         stream_type: &String,
315     ) -> Result<(), SessionError> {
316         let mut netstream = NetStream::new(BytesWriter::new());
317         let data = netstream.publish(transaction_id, stream_name, stream_type)?;
318 
319         let mut chunk_info = ChunkInfo::new(
320             csid_type::COMMAND_AMF0_AMF3,
321             chunk_type::TYPE_0,
322             0,
323             data.len() as u32,
324             msg_type_id::COMMAND_AMF0,
325             0,
326             data,
327         );
328 
329         self.packetizer.write_chunk(&mut chunk_info).await?;
330 
331         Ok(())
332     }
333 
334     pub async fn send_play(
335         &mut self,
336         transaction_id: &f64,
337         stream_name: &String,
338         start: &f64,
339         duration: &f64,
340         reset: &bool,
341     ) -> Result<(), SessionError> {
342         let mut netstream = NetStream::new(BytesWriter::new());
343         let data = netstream.play(transaction_id, stream_name, start, duration, reset)?;
344 
345         let mut chunk_info = ChunkInfo::new(
346             csid_type::COMMAND_AMF0_AMF3,
347             chunk_type::TYPE_0,
348             0,
349             data.len() as u32,
350             msg_type_id::COMMAND_AMF0,
351             0,
352             data,
353         );
354 
355         self.packetizer.write_chunk(&mut chunk_info).await?;
356 
357         Ok(())
358     }
359 
360     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
361         let mut controlmessage = ControlMessages::new(AsyncBytesWriter::new(self.io.clone()));
362         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
363         Ok(())
364     }
365 
366     pub async fn send_window_acknowledgement_size(
367         &mut self,
368         window_size: u32,
369     ) -> Result<(), SessionError> {
370         let mut controlmessage = ControlMessages::new(AsyncBytesWriter::new(self.io.clone()));
371         controlmessage
372             .write_window_acknowledgement_size(window_size)
373             .await?;
374         Ok(())
375     }
376 
377     pub async fn send_set_buffer_length(
378         &mut self,
379         stream_id: u32,
380         ms: u32,
381     ) -> Result<(), SessionError> {
382         let mut eventmessages = EventMessages::new(AsyncBytesWriter::new(self.io.clone()));
383         eventmessages.set_buffer_length(stream_id, ms).await?;
384 
385         Ok(())
386     }
387 
388     pub async fn send_audio(&mut self, data: BytesMut) -> Result<(), SessionError> {
389         let mut chunk_info = ChunkInfo::new(
390             csid_type::AUDIO,
391             chunk_type::TYPE_0,
392             0,
393             data.len() as u32,
394             msg_type_id::AUDIO,
395             0,
396             data,
397         );
398 
399         self.packetizer.write_chunk(&mut chunk_info).await?;
400 
401         Ok(())
402     }
403 
404     pub async fn send_video(&mut self, data: BytesMut) -> Result<(), SessionError> {
405         let mut chunk_info = ChunkInfo::new(
406             csid_type::VIDEO,
407             chunk_type::TYPE_0,
408             0,
409             data.len() as u32,
410             msg_type_id::VIDEO,
411             0,
412             data,
413         );
414 
415         self.packetizer.write_chunk(&mut chunk_info).await?;
416 
417         Ok(())
418     }
419 
420     pub fn on_result_connect(&mut self) -> Result<(), SessionError> {
421         self.state = ClientSessionState::CreateStream;
422         Ok(())
423     }
424 
425     pub fn on_result_create_stream(&mut self) -> Result<(), SessionError> {
426         match self.client_type {
427             ClientType::Play => {
428                 self.state = ClientSessionState::Play;
429             }
430             ClientType::Publish => {
431                 self.state = ClientSessionState::PublishingContent;
432             }
433         }
434         Ok(())
435     }
436 
437     pub fn on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError> {
438         self.unpacketizer
439             .update_max_chunk_size(chunk_size.clone() as usize);
440         Ok(())
441     }
442 
443     pub async fn on_set_peer_bandwidth(&mut self) -> Result<(), SessionError> {
444         self.send_window_acknowledgement_size(250000).await?;
445         Ok(())
446     }
447     pub fn on_error(&mut self) -> Result<(), SessionError> {
448         Ok(())
449     }
450 
451     pub fn on_status(&mut self, obj: &HashMap<String, Amf0ValueType>) -> Result<(), SessionError> {
452         Ok(())
453     }
454 }
455