1 use {
2     super::{
3         common::Common,
4         define,
5         define::SessionType,
6         errors::{SessionError, SessionErrorValue},
7     },
8     crate::utils::print::print,
9     crate::{
10         amf0::Amf0ValueType,
11         channels::define::ChannelEventProducer,
12         chunk::{
13             define::{chunk_type, csid_type, CHUNK_SIZE},
14             packetizer::ChunkPacketizer,
15             unpacketizer::{ChunkUnpacketizer, UnpackResult},
16             ChunkInfo,
17         },
18         handshake::handshake::{ClientHandshakeState, SimpleHandshakeClient},
19         messages::{
20             define::{msg_type_id, RtmpMessageData},
21             parser::MessageParser,
22         },
23         netconnection::commands::{ConnectProperties, NetConnection},
24         netstream::writer::NetStreamWriter,
25         protocol_control_messages::writer::ProtocolControlMessagesWriter,
26         user_control_messages::writer::EventMessagesWriter,
27     },
28     bytes::BytesMut,
29     networkio::{
30         bytes_writer::{AsyncBytesWriter, BytesWriter},
31         networkio::NetworkIO,
32     },
33     std::{collections::HashMap, sync::Arc},
34     tokio::{net::TcpStream, sync::Mutex},
35 };
36 
37 #[allow(dead_code)]
38 enum ClientSessionState {
39     Handshake,
40     Connect,
41     CreateStream,
42     Play,
43     PublishingContent,
44     StartPublish,
45     WaitStateChange,
46 }
47 
48 #[allow(dead_code)]
49 enum ClientSessionPlayState {
50     Handshake,
51     Connect,
52     CreateStream,
53     Play,
54 }
55 
56 #[allow(dead_code)]
57 enum ClientSessionPublishState {
58     Handshake,
59     Connect,
60     CreateStream,
61     PublishingContent,
62 }
63 #[allow(dead_code)]
64 pub enum ClientType {
65     Play,
66     Publish,
67 }
68 pub struct ClientSession {
69     io: Arc<Mutex<NetworkIO>>,
70     common: Common,
71 
72     handshaker: SimpleHandshakeClient,
73 
74     packetizer: ChunkPacketizer,
75     unpacketizer: ChunkUnpacketizer,
76 
77     app_name: String,
78     stream_name: String,
79     session_type: u8,
80     session_id: u64,
81 
82     state: ClientSessionState,
83     client_type: ClientType,
84     connect_command_object: Option<HashMap<String, Amf0ValueType>>,
85 }
86 
87 impl ClientSession {
88     #[allow(dead_code)]
89     pub fn new(
90         stream: TcpStream,
91         client_type: ClientType,
92         app_name: String,
93         stream_name: String,
94         event_producer: ChannelEventProducer,
95         session_id: u64,
96     ) -> Self {
97         let net_io = Arc::new(Mutex::new(NetworkIO::new(stream)));
98 
99         Self {
100             io: Arc::clone(&net_io),
101             common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Client),
102 
103             handshaker: SimpleHandshakeClient::new(Arc::clone(&net_io)),
104 
105             packetizer: ChunkPacketizer::new(Arc::clone(&net_io)),
106             unpacketizer: ChunkUnpacketizer::new(),
107 
108             app_name: app_name,
109             stream_name: stream_name,
110             client_type: client_type,
111 
112             state: ClientSessionState::Handshake,
113             session_type: 0,
114             session_id: session_id,
115             connect_command_object: None,
116         }
117     }
118 
119     pub fn set_connect_command_object(&mut self, object: HashMap<String, Amf0ValueType>) {
120         self.connect_command_object = Some(object);
121     }
122 
123     pub async fn run(&mut self) -> Result<(), SessionError> {
124         loop {
125             match self.state {
126                 ClientSessionState::Handshake => {
127                     println!("handshake");
128                     self.handshake().await?;
129                     continue;
130                 }
131                 ClientSessionState::Connect => {
132                     println!("connect");
133                     self.send_connect(&(define::TRANSACTION_ID_CONNECT as f64))
134                         .await?;
135                     self.state = ClientSessionState::WaitStateChange;
136                 }
137                 ClientSessionState::CreateStream => {
138                     println!("CreateStream");
139                     self.send_create_stream(&(define::TRANSACTION_ID_CREATE_STREAM as f64))
140                         .await?;
141                     self.state = ClientSessionState::WaitStateChange;
142                 }
143                 ClientSessionState::Play => {
144                     self.send_play(&0.0, &self.stream_name.clone(), &0.0, &0.0, &false)
145                         .await?;
146                     self.state = ClientSessionState::WaitStateChange;
147                 }
148                 ClientSessionState::PublishingContent => {
149                     println!("PublishingContent");
150                     self.send_publish(&0.0, &self.stream_name.clone(), &"live".to_string())
151                         .await?;
152                     self.state = ClientSessionState::WaitStateChange;
153                 }
154                 ClientSessionState::StartPublish => {
155                     println!("StartPublish");
156                     self.common.send_channel_data().await?;
157                 }
158                 ClientSessionState::WaitStateChange => {}
159             }
160 
161             let data = self.io.lock().await.read().await?;
162             self.unpacketizer.extend_data(&data[..]);
163             let result = self.unpacketizer.read_chunk()?;
164 
165             match result {
166                 UnpackResult::ChunkInfo(chunk_info) => {
167                     let mut message_parser =
168                         MessageParser::new(chunk_info.clone(), self.session_type);
169                     let mut msg = message_parser.parse()?;
170                     let timestamp = chunk_info.message_header.timestamp;
171 
172                     self.process_messages(&mut msg, &timestamp).await?;
173                 }
174                 _ => {}
175             }
176         }
177 
178         // Ok(())
179     }
180 
181     async fn handshake(&mut self) -> Result<(), SessionError> {
182         loop {
183             self.handshaker.handshake().await?;
184             if self.handshaker.state == ClientHandshakeState::Finish {
185                 println!("handshake finish");
186                 break;
187             }
188 
189             let data = self.io.lock().await.read().await?;
190             print(data.clone());
191             self.handshaker.extend_data(&data[..]);
192         }
193 
194         self.state = ClientSessionState::Connect;
195 
196         Ok(())
197     }
198 
199     pub async fn process_messages(
200         &mut self,
201         msg: &mut RtmpMessageData,
202         timestamp: &u32,
203     ) -> Result<(), SessionError> {
204         match msg {
205             RtmpMessageData::Amf0Command {
206                 command_name,
207                 transaction_id,
208                 command_object,
209                 others,
210             } => {
211                 self.on_amf0_command_message(command_name, transaction_id, command_object, others)
212                     .await?
213             }
214             RtmpMessageData::SetPeerBandwidth { properties } => {
215                 print!("{}", properties.window_size);
216                 self.on_set_peer_bandwidth().await?
217             }
218             RtmpMessageData::SetChunkSize { chunk_size } => self.on_set_chunk_size(chunk_size)?,
219 
220             RtmpMessageData::StreamBegin { stream_id } => self.on_stream_begin(stream_id)?,
221 
222             RtmpMessageData::StreamIsRecorded { stream_id } => {
223                 self.on_stream_is_recorded(stream_id)?
224             }
225 
226             RtmpMessageData::AudioData { data } => self.common.on_audio_data(data, timestamp)?,
227 
228             RtmpMessageData::VideoData { data } => self.common.on_video_data(data, timestamp)?,
229 
230             _ => {}
231         }
232         Ok(())
233     }
234 
235     pub async fn on_amf0_command_message(
236         &mut self,
237         command_name: &Amf0ValueType,
238         transaction_id: &Amf0ValueType,
239         command_object: &Amf0ValueType,
240         others: &mut Vec<Amf0ValueType>,
241     ) -> Result<(), SessionError> {
242         let empty_cmd_name = &String::new();
243         let cmd_name = match command_name {
244             Amf0ValueType::UTF8String(str) => str,
245             _ => empty_cmd_name,
246         };
247 
248         let transaction_id = match transaction_id {
249             Amf0ValueType::Number(number) => number.clone() as u8,
250             _ => 0,
251         };
252 
253         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
254         let _ = match command_object {
255             Amf0ValueType::Object(obj) => obj,
256             // Amf0ValueType::Null =>
257             _ => &empty_cmd_obj,
258         };
259 
260         match cmd_name.as_str() {
261             "_result" => match transaction_id {
262                 define::TRANSACTION_ID_CONNECT => {
263                     self.on_result_connect().await?;
264                 }
265                 define::TRANSACTION_ID_CREATE_STREAM => {
266                     self.on_result_create_stream()?;
267                 }
268                 _ => {}
269             },
270             "_error" => {
271                 self.on_error()?;
272             }
273             "onStatus" => {
274                 match others.remove(0) {
275                     Amf0ValueType::Object(obj) => self.on_status(&obj).await?,
276                     _ => {
277                         return Err(SessionError {
278                             value: SessionErrorValue::Amf0ValueCountNotCorrect,
279                         })
280                     }
281                 };
282             }
283 
284             _ => {}
285         }
286 
287         Ok(())
288     }
289 
290     pub async fn send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
291         self.send_set_chunk_size().await?;
292 
293         // let properties = ConnectProperties::new(self.app_name.clone());
294         let mut netconnection = NetConnection::new(BytesWriter::new());
295         let data = netconnection
296             .connect_with_value(transaction_id, self.connect_command_object.clone().unwrap())?;
297 
298         let mut chunk_info = ChunkInfo::new(
299             csid_type::COMMAND_AMF0_AMF3,
300             chunk_type::TYPE_0,
301             0,
302             data.len() as u32,
303             msg_type_id::COMMAND_AMF0,
304             0,
305             data,
306         );
307 
308         self.packetizer.write_chunk(&mut chunk_info).await?;
309         Ok(())
310     }
311 
312     pub async fn send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
313         let mut netconnection = NetConnection::new(BytesWriter::new());
314         let data = netconnection.create_stream(transaction_id)?;
315 
316         let mut chunk_info = ChunkInfo::new(
317             csid_type::COMMAND_AMF0_AMF3,
318             chunk_type::TYPE_0,
319             0,
320             data.len() as u32,
321             msg_type_id::COMMAND_AMF0,
322             0,
323             data,
324         );
325 
326         self.packetizer.write_chunk(&mut chunk_info).await?;
327 
328         Ok(())
329     }
330 
331     pub async fn send_delete_stream(
332         &mut self,
333         transaction_id: &f64,
334         stream_id: &f64,
335     ) -> Result<(), SessionError> {
336         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
337         netstream.delete_stream(transaction_id, stream_id).await?;
338 
339         Ok(())
340     }
341 
342     pub async fn send_publish(
343         &mut self,
344         transaction_id: &f64,
345         stream_name: &String,
346         stream_type: &String,
347     ) -> Result<(), SessionError> {
348         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
349         netstream
350             .publish(transaction_id, stream_name, stream_type)
351             .await?;
352 
353         Ok(())
354     }
355 
356     pub async fn send_play(
357         &mut self,
358         transaction_id: &f64,
359         stream_name: &String,
360         start: &f64,
361         duration: &f64,
362         reset: &bool,
363     ) -> Result<(), SessionError> {
364         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
365         netstream
366             .play(transaction_id, stream_name, start, duration, reset)
367             .await?;
368 
369         Ok(())
370     }
371 
372     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
373         let mut controlmessage =
374             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
375         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
376         Ok(())
377     }
378 
379     pub async fn send_window_acknowledgement_size(
380         &mut self,
381         window_size: u32,
382     ) -> Result<(), SessionError> {
383         let mut controlmessage =
384             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
385         controlmessage
386             .write_window_acknowledgement_size(window_size)
387             .await?;
388         Ok(())
389     }
390 
391     pub async fn send_set_buffer_length(
392         &mut self,
393         stream_id: u32,
394         ms: u32,
395     ) -> Result<(), SessionError> {
396         let mut eventmessages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
397         eventmessages.write_set_buffer_length(stream_id, ms).await?;
398 
399         Ok(())
400     }
401 
402     pub async fn on_result_connect(&mut self) -> Result<(), SessionError> {
403         let mut controlmessage =
404             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
405         controlmessage.write_acknowledgement(3107).await?;
406 
407         let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io));
408         netstream
409             .release_stream(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name)
410             .await?;
411         netstream
412             .fcpublish(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name)
413             .await?;
414 
415         self.state = ClientSessionState::CreateStream;
416 
417         Ok(())
418     }
419 
420     pub fn on_result_create_stream(&mut self) -> Result<(), SessionError> {
421         match self.client_type {
422             ClientType::Play => {
423                 self.state = ClientSessionState::Play;
424             }
425             ClientType::Publish => {
426                 self.state = ClientSessionState::PublishingContent;
427             }
428         }
429         Ok(())
430     }
431 
432     pub fn on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError> {
433         self.unpacketizer
434             .update_max_chunk_size(chunk_size.clone() as usize);
435         Ok(())
436     }
437 
438     pub fn on_stream_is_recorded(&mut self, stream_id: &mut u32) -> Result<(), SessionError> {
439         Ok(())
440     }
441 
442     pub fn on_stream_begin(&mut self, stream_id: &mut u32) -> Result<(), SessionError> {
443         Ok(())
444     }
445 
446     pub async fn on_set_peer_bandwidth(&mut self) -> Result<(), SessionError> {
447         self.send_window_acknowledgement_size(250000).await?;
448         Ok(())
449     }
450 
451     pub fn on_error(&mut self) -> Result<(), SessionError> {
452         Ok(())
453     }
454 
455     pub async fn on_status(
456         &mut self,
457         obj: &HashMap<String, Amf0ValueType>,
458     ) -> Result<(), SessionError> {
459         println!("on_status===");
460         if let Some(Amf0ValueType::UTF8String(code_info)) = obj.get("code") {
461             match &code_info[..] {
462                 "NetStream.Publish.Start" => {
463                     self.state = ClientSessionState::StartPublish;
464                     self.common
465                         .subscribe_from_channels(
466                             self.app_name.clone(),
467                             self.stream_name.clone(),
468                             self.session_id,
469                         )
470                         .await?;
471                 }
472                 "NetStream.Publish.Reset" => {}
473 
474                 // "NetStream.Play.Start" => self.common.publish_to_channels(
475                 //     self.app_name.clone(),
476                 //     self.stream_name.clone(),
477                 //     connect_command_object,
478                 // ),
479 
480                 _ => {}
481             }
482         }
483         println!("{}", obj.len());
484         Ok(())
485     }
486 }
487