1 use super::errors::SessionError;
2 
3 use crate::chunk::define::{chunk_type, csid_type, CHUNK_SIZE};
4 use crate::chunk::unpacketizer::ChunkUnpacketizer;
5 use crate::chunk::unpacketizer::UnpackResult;
6 use crate::chunk::{packetizer::ChunkPacketizer, ChunkInfo};
7 
8 use super::errors::SessionErrorValue;
9 use crate::handshake::handshake::SimpleHandshakeClient;
10 
11 use crate::messages::define::msg_type_id;
12 use crate::messages::define::RtmpMessageData;
13 use crate::messages::parser::MessageParser;
14 
15 use crate::amf0::Amf0ValueType;
16 
17 use netio::bytes_writer::AsyncBytesWriter;
18 
19 use netio::bytes_writer::BytesWriter;
20 use netio::netio::NetworkIO;
21 
22 use std::time::Duration;
23 
24 use crate::handshake::handshake::ClientHandshakeState;
25 use crate::netconnection::commands::ConnectProperties;
26 use crate::netconnection::commands::NetConnection;
27 use crate::netstream::commands::NetStream;
28 use crate::protocol_control_messages::control_messages::ControlMessages;
29 
30 use crate::user_control_messages::event_messages::EventMessages;
31 
32 use std::collections::HashMap;
33 
34 use super::define;
35 use tokio::prelude::*;
36 
37 use bytes::BytesMut;
38 use std::sync::Arc;
39 use tokio::sync::Mutex;
40 
41 enum ClientSessionState {
42     Handshake,
43     Connect,
44     CreateStream,
45     Play,
46     PublishingContent,
47 }
48 
49 enum ClientSessionPlayState {
50     Handshake,
51     Connect,
52     CreateStream,
53     Play,
54 }
55 
56 enum ClientSessionPublishState {
57     Handshake,
58     Connect,
59     CreateStream,
60     PublishingContent,
61 }
62 
63 enum ClientType {
64     Play,
65     Publish,
66 }
67 pub struct ClientSession<S>
68 where
69     S: AsyncRead + AsyncWrite + Unpin + Send + Sync,
70 {
71     packetizer: ChunkPacketizer<S>,
72     unpacketizer: ChunkUnpacketizer,
73     handshaker: SimpleHandshakeClient<S>,
74     io: Arc<Mutex<NetworkIO<S>>>,
75 
76     play_state: ClientSessionPlayState,
77     publish_state: ClientSessionPublishState,
78     state: ClientSessionState,
79     client_type: ClientType,
80     stream_name: String,
81 }
82 
83 impl<S> ClientSession<S>
84 where
85     S: AsyncRead + AsyncWrite + Unpin + Send + Sync,
86 {
87     fn new(stream: S, timeout: Duration, client_type: ClientType, stream_name: String) -> Self {
88         let net_io = Arc::new(Mutex::new(NetworkIO::new(stream, timeout)));
89         let bytes_writer = AsyncBytesWriter::new(Arc::clone(&net_io));
90 
91         Self {
92             io: Arc::clone(&net_io),
93 
94             packetizer: ChunkPacketizer::new(bytes_writer),
95             unpacketizer: ChunkUnpacketizer::new(),
96             handshaker: SimpleHandshakeClient::new(Arc::clone(&net_io)),
97 
98             play_state: ClientSessionPlayState::Handshake,
99             publish_state: ClientSessionPublishState::Handshake,
100             state: ClientSessionState::Handshake,
101             client_type: client_type,
102             stream_name: stream_name,
103         }
104     }
105 
106     pub async fn run(&mut self) -> Result<(), SessionError> {
107         loop {
108             match self.state {
109                 ClientSessionState::Handshake => {
110                     self.handshake().await?;
111                 }
112                 ClientSessionState::Connect => {
113                     self.send_connect(&(define::TRANSACTION_ID_CONNECT as f64))
114                         .await?;
115                 }
116                 ClientSessionState::CreateStream => {
117                     self.send_create_stream(&(define::TRANSACTION_ID_CREATE_STREAM as f64))
118                         .await?;
119                 }
120                 ClientSessionState::Play => {
121                     self.send_play(&0.0, &self.stream_name.clone(), &0.0, &0.0, &false)
122                         .await?;
123                 }
124                 ClientSessionState::PublishingContent => {
125                     self.send_publish(&0.0, &self.stream_name.clone(), &"live".to_string())
126                         .await?;
127                 }
128             }
129 
130             let data = self.io.lock().await.read().await?;
131             self.unpacketizer.extend_data(&data[..]);
132             let result = self.unpacketizer.read_chunk()?;
133 
134             match result {
135                 UnpackResult::ChunkInfo(chunk_info) => {
136                     let mut message_parser = MessageParser::new(chunk_info);
137                     let mut msg = message_parser.parse()?;
138 
139                     self.process_messages(&mut msg)?;
140                 }
141                 _ => {}
142             }
143         }
144 
145         // Ok(())
146     }
147 
148     async fn handshake(&mut self) -> Result<(), SessionError> {
149         loop {
150             self.handshaker.handshake().await?;
151             if self.handshaker.state == ClientHandshakeState::Finish {
152                 break;
153             }
154 
155             let data = self.io.lock().await.read().await?;
156             self.handshaker.extend_data(&data[..]);
157         }
158         self.state = ClientSessionState::Connect;
159 
160         Ok(())
161     }
162 
163     pub fn process_messages(&mut self, msg: &mut RtmpMessageData) -> Result<(), SessionError> {
164         match msg {
165             RtmpMessageData::Amf0Command {
166                 command_name,
167                 transaction_id,
168                 command_object,
169                 others,
170             } => self.process_amf0_command_message(
171                 command_name,
172                 transaction_id,
173                 command_object,
174                 others,
175             )?,
176             RtmpMessageData::SetPeerBandwidth { properties } => self.on_set_peer_bandwidth()?,
177             RtmpMessageData::SetChunkSize { chunk_size } => self.on_set_chunk_size(chunk_size)?,
178             RtmpMessageData::AudioData { data } => {}
179             RtmpMessageData::VideoData { data } => {}
180 
181             _ => {}
182         }
183         Ok(())
184     }
185 
186     pub fn process_amf0_command_message(
187         &mut self,
188         command_name: &Amf0ValueType,
189         transaction_id: &Amf0ValueType,
190         command_object: &Amf0ValueType,
191         others: &mut Vec<Amf0ValueType>,
192     ) -> Result<(), SessionError> {
193         let empty_cmd_name = &String::new();
194         let cmd_name = match command_name {
195             Amf0ValueType::UTF8String(str) => str,
196             _ => empty_cmd_name,
197         };
198 
199         let transaction_id = match transaction_id {
200             Amf0ValueType::Number(number) => number.clone() as u8,
201             _ => 0,
202         };
203 
204         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
205         let obj = match command_object {
206             Amf0ValueType::Object(obj) => obj,
207             // Amf0ValueType::Null =>
208             _ => &empty_cmd_obj,
209         };
210 
211         match cmd_name.as_str() {
212             "_reslut" => match transaction_id {
213                 define::TRANSACTION_ID_CONNECT => {
214                     self.on_result_connect()?;
215                 }
216                 define::TRANSACTION_ID_CREATE_STREAM => {
217                     self.on_result_create_stream()?;
218                 }
219                 _ => {}
220             },
221             "_error" => {
222                 self.on_error()?;
223             }
224             "onStatus" => {
225                 match others.remove(0) {
226                     Amf0ValueType::Object(obj) => self.on_status(&obj),
227                     _ => Err(SessionError {
228                         value: SessionErrorValue::Amf0ValueCountNotCorrect,
229                     }),
230                 };
231             }
232 
233             _ => {}
234         }
235 
236         Ok(())
237     }
238 
239     pub async fn send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
240         let app_name = String::from("app");
241         let properties = ConnectProperties::new(app_name);
242 
243         let mut netconnection = NetConnection::new(BytesWriter::new());
244         let data = netconnection.connect(transaction_id, &properties)?;
245 
246         let mut chunk_info = ChunkInfo::new(
247             csid_type::COMMAND_AMF0_AMF3,
248             chunk_type::TYPE_0,
249             0,
250             data.len() as u32,
251             msg_type_id::COMMAND_AMF0,
252             0,
253             data,
254         );
255 
256         self.packetizer.write_chunk(&mut chunk_info).await?;
257         Ok(())
258     }
259 
260     pub async fn send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
261         let mut netconnection = NetConnection::new(BytesWriter::new());
262         let data = netconnection.create_stream(transaction_id)?;
263 
264         let mut chunk_info = ChunkInfo::new(
265             csid_type::COMMAND_AMF0_AMF3,
266             chunk_type::TYPE_0,
267             0,
268             data.len() as u32,
269             msg_type_id::COMMAND_AMF0,
270             0,
271             data,
272         );
273 
274         self.packetizer.write_chunk(&mut chunk_info).await?;
275 
276         Ok(())
277     }
278 
279     pub async fn send_delete_stream(
280         &mut self,
281         transaction_id: &f64,
282         stream_id: &f64,
283     ) -> Result<(), SessionError> {
284         let mut netstream = NetStream::new(BytesWriter::new());
285         let data = netstream.delete_stream(transaction_id, stream_id)?;
286 
287         let mut chunk_info = ChunkInfo::new(
288             csid_type::COMMAND_AMF0_AMF3,
289             chunk_type::TYPE_0,
290             0,
291             data.len() as u32,
292             msg_type_id::COMMAND_AMF0,
293             0,
294             data,
295         );
296 
297         self.packetizer.write_chunk(&mut chunk_info).await?;
298         Ok(())
299     }
300 
301     pub async fn send_publish(
302         &mut self,
303         transaction_id: &f64,
304         stream_name: &String,
305         stream_type: &String,
306     ) -> Result<(), SessionError> {
307         let mut netstream = NetStream::new(BytesWriter::new());
308         let data = netstream.publish(transaction_id, stream_name, stream_type)?;
309 
310         let mut chunk_info = ChunkInfo::new(
311             csid_type::COMMAND_AMF0_AMF3,
312             chunk_type::TYPE_0,
313             0,
314             data.len() as u32,
315             msg_type_id::COMMAND_AMF0,
316             0,
317             data,
318         );
319 
320         self.packetizer.write_chunk(&mut chunk_info).await?;
321 
322         Ok(())
323     }
324 
325     pub async fn send_play(
326         &mut self,
327         transaction_id: &f64,
328         stream_name: &String,
329         start: &f64,
330         duration: &f64,
331         reset: &bool,
332     ) -> Result<(), SessionError> {
333         let mut netstream = NetStream::new(BytesWriter::new());
334         let data = netstream.play(transaction_id, stream_name, start, duration, reset)?;
335 
336         let mut chunk_info = ChunkInfo::new(
337             csid_type::COMMAND_AMF0_AMF3,
338             chunk_type::TYPE_0,
339             0,
340             data.len() as u32,
341             msg_type_id::COMMAND_AMF0,
342             0,
343             data,
344         );
345 
346         self.packetizer.write_chunk(&mut chunk_info).await?;
347 
348         Ok(())
349     }
350 
351     pub fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
352         let mut controlmessage = ControlMessages::new(AsyncBytesWriter::new(self.io.clone()));
353         controlmessage.write_set_chunk_size(CHUNK_SIZE)?;
354         Ok(())
355     }
356 
357     pub fn send_window_acknowledgement_size(
358         &mut self,
359         window_size: u32,
360     ) -> Result<(), SessionError> {
361         let mut controlmessage = ControlMessages::new(AsyncBytesWriter::new(self.io.clone()));
362         controlmessage.write_window_acknowledgement_size(window_size)?;
363         Ok(())
364     }
365 
366     pub async fn send_set_buffer_length(&mut self, stream_id: u32, ms: u32) -> Result<(), SessionError> {
367         let mut eventmessages = EventMessages::new(AsyncBytesWriter::new(self.io.clone()));
368         eventmessages.set_buffer_length(stream_id, ms).await?;
369 
370         Ok(())
371     }
372 
373     pub async fn send_audio(&mut self, data: BytesMut) -> Result<(), SessionError> {
374         let mut chunk_info = ChunkInfo::new(
375             csid_type::AUDIO,
376             chunk_type::TYPE_0,
377             0,
378             data.len() as u32,
379             msg_type_id::AUDIO,
380             0,
381             data,
382         );
383 
384         self.packetizer.write_chunk(&mut chunk_info).await?;
385 
386         Ok(())
387     }
388 
389     pub async fn send_video(&mut self, data: BytesMut) -> Result<(), SessionError> {
390         let mut chunk_info = ChunkInfo::new(
391             csid_type::VIDEO,
392             chunk_type::TYPE_0,
393             0,
394             data.len() as u32,
395             msg_type_id::VIDEO,
396             0,
397             data,
398         );
399 
400         self.packetizer.write_chunk(&mut chunk_info).await?;
401 
402         Ok(())
403     }
404 
405     pub fn on_result_connect(&mut self) -> Result<(), SessionError> {
406         self.state = ClientSessionState::CreateStream;
407         Ok(())
408     }
409 
410     pub fn on_result_create_stream(&mut self) -> Result<(), SessionError> {
411         match self.client_type {
412             ClientType::Play => {
413                 self.state = ClientSessionState::Play;
414             }
415             ClientType::Publish => {
416                 self.state = ClientSessionState::PublishingContent;
417             }
418         }
419         Ok(())
420     }
421 
422     pub fn on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError> {
423         self.unpacketizer
424             .update_max_chunk_size(chunk_size.clone() as usize);
425         Ok(())
426     }
427 
428     pub fn on_set_peer_bandwidth(&mut self) -> Result<(), SessionError> {
429         self.send_window_acknowledgement_size(250000)?;
430         Ok(())
431     }
432     pub fn on_error(&mut self) -> Result<(), SessionError> {
433         Ok(())
434     }
435 
436     pub fn on_status(&mut self, obj: &HashMap<String, Amf0ValueType>) -> Result<(), SessionError> {
437         Ok(())
438     }
439 }
440