1 use uuid::Uuid;
2 
3 use {
4     super::{
5         common::Common,
6         define,
7         define::SessionType,
8         errors::{SessionError, SessionErrorValue},
9     },
10     crate::utils::print::print,
11     crate::{
12         amf0::Amf0ValueType,
13         channels::define::ChannelEventProducer,
14         chunk::{
15             define::CHUNK_SIZE,
16             unpacketizer::{ChunkUnpacketizer, UnpackResult},
17         },
18         handshake::handshake::{ClientHandshakeState, SimpleHandshakeClient},
19         messages::{define::RtmpMessageData, parser::MessageParser},
20         netconnection::writer::{ConnectProperties, NetConnection},
21         netstream::writer::NetStreamWriter,
22         protocol_control_messages::writer::ProtocolControlMessagesWriter,
23         user_control_messages::writer::EventMessagesWriter,
24     },
25     bytesio::{bytes_writer::AsyncBytesWriter, bytesio::BytesIO},
26     std::{collections::HashMap, sync::Arc},
27     tokio::{net::TcpStream, sync::Mutex},
28 };
29 
30 #[allow(dead_code)]
31 enum ClientSessionState {
32     Handshake,
33     Connect,
34     CreateStream,
35     Play,
36     PublishingContent,
37     StartPublish,
38     WaitStateChange,
39 }
40 
41 #[allow(dead_code)]
42 enum ClientSessionPlayState {
43     Handshake,
44     Connect,
45     CreateStream,
46     Play,
47 }
48 
49 #[allow(dead_code)]
50 enum ClientSessionPublishState {
51     Handshake,
52     Connect,
53     CreateStream,
54     PublishingContent,
55 }
56 #[allow(dead_code)]
57 pub enum ClientType {
58     Play,
59     Publish,
60 }
61 pub struct ClientSession {
62     io: Arc<Mutex<BytesIO>>,
63     common: Common,
64 
65     handshaker: SimpleHandshakeClient,
66 
67     unpacketizer: ChunkUnpacketizer,
68 
69     app_name: String,
70     stream_name: String,
71 
72     /* Used to mark the subscriber's the data producer
73     in channels and delete it from map when unsubscribe
74     is called. */
75     subscriber_id: Uuid,
76 
77     state: ClientSessionState,
78     client_type: ClientType,
79 }
80 
81 impl ClientSession {
82     #[allow(dead_code)]
83     pub fn new(
84         stream: TcpStream,
85         client_type: ClientType,
86         app_name: String,
87         stream_name: String,
88         event_producer: ChannelEventProducer,
89     ) -> Self {
90         let net_io = Arc::new(Mutex::new(BytesIO::new(stream)));
91         let subscriber_id = Uuid::new_v4();
92 
93         Self {
94             io: Arc::clone(&net_io),
95             common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Client),
96 
97             handshaker: SimpleHandshakeClient::new(Arc::clone(&net_io)),
98 
99             unpacketizer: ChunkUnpacketizer::new(),
100 
101             app_name,
102             stream_name,
103             client_type,
104 
105             state: ClientSessionState::Handshake,
106             subscriber_id,
107         }
108     }
109 
110     pub async fn run(&mut self) -> Result<(), SessionError> {
111         loop {
112             match self.state {
113                 ClientSessionState::Handshake => {
114                     log::info!("[C -> S] handshake...");
115                     self.handshake().await?;
116                     continue;
117                 }
118                 ClientSessionState::Connect => {
119                     log::info!("[C -> S] connect...");
120                     self.send_connect(&(define::TRANSACTION_ID_CONNECT as f64))
121                         .await?;
122                     self.state = ClientSessionState::WaitStateChange;
123                 }
124                 ClientSessionState::CreateStream => {
125                     log::info!("[C -> S] CreateStream...");
126                     self.send_create_stream(&(define::TRANSACTION_ID_CREATE_STREAM as f64))
127                         .await?;
128                     self.state = ClientSessionState::WaitStateChange;
129                 }
130                 ClientSessionState::Play => {
131                     log::info!("[C -> S] Play...");
132                     self.send_play(&0.0, &self.stream_name.clone(), &0.0, &0.0, &false)
133                         .await?;
134                     self.state = ClientSessionState::WaitStateChange;
135                 }
136                 ClientSessionState::PublishingContent => {
137                     log::info!("[C -> S] PublishingContent...");
138                     self.send_publish(&0.0, &self.stream_name.clone(), &"live".to_string())
139                         .await?;
140                     self.state = ClientSessionState::WaitStateChange;
141                 }
142                 ClientSessionState::StartPublish => {
143                     log::info!("[C -> S] StartPublish...");
144                     self.common.send_channel_data().await?;
145                 }
146                 ClientSessionState::WaitStateChange => {}
147             }
148 
149             let data = self.io.lock().await.read().await?;
150             self.unpacketizer.extend_data(&data[..]);
151             let result = self.unpacketizer.read_chunk()?;
152 
153             match result {
154                 UnpackResult::ChunkInfo(chunk_info) => {
155                     let mut message_parser = MessageParser::new(chunk_info.clone());
156                     let mut msg = message_parser.parse()?;
157                     let timestamp = chunk_info.message_header.timestamp;
158 
159                     self.process_messages(&mut msg, &timestamp).await?;
160                 }
161                 _ => {}
162             }
163         }
164 
165         // Ok(())
166     }
167 
168     async fn handshake(&mut self) -> Result<(), SessionError> {
169         loop {
170             self.handshaker.handshake().await?;
171             if self.handshaker.state == ClientHandshakeState::Finish {
172                 log::info!("handshake finish");
173                 break;
174             }
175 
176             let data = self.io.lock().await.read().await?;
177             print(data.clone());
178             self.handshaker.extend_data(&data[..]);
179         }
180 
181         self.state = ClientSessionState::Connect;
182 
183         Ok(())
184     }
185 
186     pub async fn process_messages(
187         &mut self,
188         msg: &mut RtmpMessageData,
189         timestamp: &u32,
190     ) -> Result<(), SessionError> {
191         match msg {
192             RtmpMessageData::Amf0Command {
193                 command_name,
194                 transaction_id,
195                 command_object,
196                 others,
197             } => {
198                 self.on_amf0_command_message(command_name, transaction_id, command_object, others)
199                     .await?
200             }
201             RtmpMessageData::SetPeerBandwidth { properties } => {
202                 log::trace!(
203                     "process_messages SetPeerBandwidth windows size: {}",
204                     properties.window_size
205                 );
206                 self.on_set_peer_bandwidth().await?
207             }
208             RtmpMessageData::SetChunkSize { chunk_size } => self.on_set_chunk_size(chunk_size)?,
209 
210             RtmpMessageData::StreamBegin { stream_id } => self.on_stream_begin(stream_id)?,
211 
212             RtmpMessageData::StreamIsRecorded { stream_id } => {
213                 self.on_stream_is_recorded(stream_id)?
214             }
215 
216             RtmpMessageData::AudioData { data } => self.common.on_audio_data(data, timestamp)?,
217 
218             RtmpMessageData::VideoData { data } => self.common.on_video_data(data, timestamp)?,
219 
220             _ => {}
221         }
222         Ok(())
223     }
224 
225     pub async fn on_amf0_command_message(
226         &mut self,
227         command_name: &Amf0ValueType,
228         transaction_id: &Amf0ValueType,
229         command_object: &Amf0ValueType,
230         others: &mut Vec<Amf0ValueType>,
231     ) -> Result<(), SessionError> {
232         let empty_cmd_name = &String::new();
233         let cmd_name = match command_name {
234             Amf0ValueType::UTF8String(str) => str,
235             _ => empty_cmd_name,
236         };
237 
238         let transaction_id = match transaction_id {
239             Amf0ValueType::Number(number) => number.clone() as u8,
240             _ => 0,
241         };
242 
243         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
244         let _ = match command_object {
245             Amf0ValueType::Object(obj) => obj,
246             // Amf0ValueType::Null =>
247             _ => &empty_cmd_obj,
248         };
249 
250         match cmd_name.as_str() {
251             "_result" => match transaction_id {
252                 define::TRANSACTION_ID_CONNECT => {
253                     self.on_result_connect().await?;
254                 }
255                 define::TRANSACTION_ID_CREATE_STREAM => {
256                     self.on_result_create_stream()?;
257                 }
258                 _ => {}
259             },
260             "_error" => {
261                 self.on_error()?;
262             }
263             "onStatus" => {
264                 match others.remove(0) {
265                     Amf0ValueType::Object(obj) => self.on_status(&obj).await?,
266                     _ => {
267                         return Err(SessionError {
268                             value: SessionErrorValue::Amf0ValueCountNotCorrect,
269                         })
270                     }
271                 };
272             }
273 
274             _ => {}
275         }
276 
277         Ok(())
278     }
279 
280     pub async fn send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
281         self.send_set_chunk_size().await?;
282 
283         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
284 
285         let mut properties = ConnectProperties::new_none();
286 
287         let url = format!("rtmp://localhost:1935/{app_name}", app_name = self.app_name);
288         properties.app = Some(self.app_name.clone());
289         properties.tc_url = Some(url.clone());
290 
291         match self.client_type {
292             ClientType::Play => {
293                 properties.flash_ver = Some("flashVerFMLE/3.0 (compatible; FMSc/1.0)".to_string());
294                 properties.swf_url = Some(url.clone());
295             }
296             ClientType::Publish => {
297                 properties.fpad = Some(false);
298                 properties.capabilities = Some(15_f64);
299                 properties.audio_codecs = Some(3191_f64);
300                 properties.video_codecs = Some(252_f64);
301                 properties.video_function = Some(1_f64);
302             }
303         }
304 
305         netconnection.write_connect(transaction_id, &properties).await?;
306 
307         // let mut chunk_info = ChunkInfo::new(
308         //     csid_type::COMMAND_AMF0_AMF3,
309         //     chunk_type::TYPE_0,
310         //     0,
311         //     data.len() as u32,
312         //     msg_type_id::COMMAND_AMF0,
313         //     0,
314         //     data,
315         // );
316 
317         // self.packetizer.write_chunk(&mut chunk_info).await?;
318         Ok(())
319     }
320 
321     pub async fn send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
322         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
323         netconnection.write_create_stream(transaction_id).await?;
324 
325         Ok(())
326     }
327 
328     pub async fn send_delete_stream(
329         &mut self,
330         transaction_id: &f64,
331         stream_id: &f64,
332     ) -> Result<(), SessionError> {
333         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
334         netstream
335             .write_delete_stream(transaction_id, stream_id)
336             .await?;
337 
338         Ok(())
339     }
340 
341     pub async fn send_publish(
342         &mut self,
343         transaction_id: &f64,
344         stream_name: &String,
345         stream_type: &String,
346     ) -> Result<(), SessionError> {
347         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
348         netstream
349             .write_publish(transaction_id, stream_name, stream_type)
350             .await?;
351 
352         Ok(())
353     }
354 
355     pub async fn send_play(
356         &mut self,
357         transaction_id: &f64,
358         stream_name: &String,
359         start: &f64,
360         duration: &f64,
361         reset: &bool,
362     ) -> Result<(), SessionError> {
363         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
364         netstream
365             .write_play(transaction_id, stream_name, start, duration, reset)
366             .await?;
367 
368         Ok(())
369     }
370 
371     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
372         let mut controlmessage =
373             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
374         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
375         Ok(())
376     }
377 
378     pub async fn send_window_acknowledgement_size(
379         &mut self,
380         window_size: u32,
381     ) -> Result<(), SessionError> {
382         let mut controlmessage =
383             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
384         controlmessage
385             .write_window_acknowledgement_size(window_size)
386             .await?;
387         Ok(())
388     }
389 
390     pub async fn send_set_buffer_length(
391         &mut self,
392         stream_id: u32,
393         ms: u32,
394     ) -> Result<(), SessionError> {
395         let mut eventmessages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
396         eventmessages.write_set_buffer_length(stream_id, ms).await?;
397 
398         Ok(())
399     }
400 
401     pub async fn on_result_connect(&mut self) -> Result<(), SessionError> {
402         let mut controlmessage =
403             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
404         controlmessage.write_acknowledgement(3107).await?;
405 
406         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
407         netstream
408             .write_release_stream(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name)
409             .await?;
410         netstream
411             .write_fcpublish(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name)
412             .await?;
413 
414         self.state = ClientSessionState::CreateStream;
415 
416         Ok(())
417     }
418 
419     pub fn on_result_create_stream(&mut self) -> Result<(), SessionError> {
420         match self.client_type {
421             ClientType::Play => {
422                 self.state = ClientSessionState::Play;
423             }
424             ClientType::Publish => {
425                 self.state = ClientSessionState::PublishingContent;
426             }
427         }
428         Ok(())
429     }
430 
431     pub fn on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError> {
432         self.unpacketizer
433             .update_max_chunk_size(chunk_size.clone() as usize);
434         Ok(())
435     }
436 
437     pub fn on_stream_is_recorded(&mut self, stream_id: &mut u32) -> Result<(), SessionError> {
438         log::trace!("stream is recorded stream_id is {}", stream_id);
439         Ok(())
440     }
441 
442     pub fn on_stream_begin(&mut self, stream_id: &mut u32) -> Result<(), SessionError> {
443         log::trace!("stream is begin stream_id is {}", stream_id);
444         Ok(())
445     }
446 
447     pub async fn on_set_peer_bandwidth(&mut self) -> Result<(), SessionError> {
448         self.send_window_acknowledgement_size(250000).await?;
449         Ok(())
450     }
451 
452     pub fn on_error(&mut self) -> Result<(), SessionError> {
453         Ok(())
454     }
455 
456     pub async fn on_status(
457         &mut self,
458         obj: &HashMap<String, Amf0ValueType>,
459     ) -> Result<(), SessionError> {
460         if let Some(Amf0ValueType::UTF8String(code_info)) = obj.get("code") {
461             match &code_info[..] {
462                 "NetStream.Publish.Start" => {
463                     self.state = ClientSessionState::StartPublish;
464                     self.common
465                         .subscribe_from_channels(
466                             self.app_name.clone(),
467                             self.stream_name.clone(),
468                             self.subscriber_id,
469                         )
470                         .await?;
471                 }
472                 "NetStream.Publish.Reset" => {}
473 
474                 "NetStream.Play.Start" => {
475                     self.common
476                         .publish_to_channels(self.app_name.clone(), self.stream_name.clone())
477                         .await?
478                 }
479                 _ => {}
480             }
481         }
482         log::trace!("{}", obj.len());
483         Ok(())
484     }
485 }
486