1 use uuid::Uuid;
2 
3 use {
4     super::{
5         common::Common,
6         define,
7         define::SessionType,
8         errors::{SessionError, SessionErrorValue},
9     },
10     crate::utils::print::print,
11     crate::{
12         amf0::Amf0ValueType,
13         channels::define::ChannelEventProducer,
14         chunk::{
15             define::{chunk_type, csid_type, CHUNK_SIZE},
16             packetizer::ChunkPacketizer,
17             unpacketizer::{ChunkUnpacketizer, UnpackResult},
18             ChunkInfo,
19         },
20         handshake::handshake::{ClientHandshakeState, SimpleHandshakeClient},
21         messages::{
22             define::{msg_type_id, RtmpMessageData},
23             parser::MessageParser,
24         },
25         netconnection::commands::{ConnectProperties, NetConnection},
26         netstream::writer::NetStreamWriter,
27         protocol_control_messages::writer::ProtocolControlMessagesWriter,
28         user_control_messages::writer::EventMessagesWriter,
29     },
30     bytesio::{
31         bytes_writer::{AsyncBytesWriter, BytesWriter},
32         bytesio::BytesIO,
33     },
34     std::{collections::HashMap, sync::Arc},
35     tokio::{net::TcpStream, sync::Mutex},
36 };
37 
38 #[allow(dead_code)]
39 enum ClientSessionState {
40     Handshake,
41     Connect,
42     CreateStream,
43     Play,
44     PublishingContent,
45     StartPublish,
46     WaitStateChange,
47 }
48 
49 #[allow(dead_code)]
50 enum ClientSessionPlayState {
51     Handshake,
52     Connect,
53     CreateStream,
54     Play,
55 }
56 
57 #[allow(dead_code)]
58 enum ClientSessionPublishState {
59     Handshake,
60     Connect,
61     CreateStream,
62     PublishingContent,
63 }
64 #[allow(dead_code)]
65 pub enum ClientType {
66     Play,
67     Publish,
68 }
69 pub struct ClientSession {
70     io: Arc<Mutex<BytesIO>>,
71     common: Common,
72 
73     handshaker: SimpleHandshakeClient,
74 
75     packetizer: ChunkPacketizer,
76     unpacketizer: ChunkUnpacketizer,
77 
78     app_name: String,
79     stream_name: String,
80 
81     /* Used to mark the subscriber's the data producer
82     in channels and delete it from map when unsubscribe
83     is called. */
84     subscriber_id: Uuid,
85 
86     state: ClientSessionState,
87     client_type: ClientType,
88 }
89 
90 impl ClientSession {
91     #[allow(dead_code)]
92     pub fn new(
93         stream: TcpStream,
94         client_type: ClientType,
95         app_name: String,
96         stream_name: String,
97         event_producer: ChannelEventProducer,
98     ) -> Self {
99         let net_io = Arc::new(Mutex::new(BytesIO::new(stream)));
100         let subscriber_id = Uuid::new_v4();
101 
102         Self {
103             io: Arc::clone(&net_io),
104             common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Client),
105 
106             handshaker: SimpleHandshakeClient::new(Arc::clone(&net_io)),
107 
108             packetizer: ChunkPacketizer::new(Arc::clone(&net_io)),
109             unpacketizer: ChunkUnpacketizer::new(),
110 
111             app_name,
112             stream_name,
113             client_type,
114 
115             state: ClientSessionState::Handshake,
116             subscriber_id,
117         }
118     }
119 
120     pub async fn run(&mut self) -> Result<(), SessionError> {
121         loop {
122             match self.state {
123                 ClientSessionState::Handshake => {
124                     log::info!("[C -> S] handshake...");
125                     self.handshake().await?;
126                     continue;
127                 }
128                 ClientSessionState::Connect => {
129                     log::info!("[C -> S] connect...");
130                     self.send_connect(&(define::TRANSACTION_ID_CONNECT as f64))
131                         .await?;
132                     self.state = ClientSessionState::WaitStateChange;
133                 }
134                 ClientSessionState::CreateStream => {
135                     log::info!("[C -> S] CreateStream...");
136                     self.send_create_stream(&(define::TRANSACTION_ID_CREATE_STREAM as f64))
137                         .await?;
138                     self.state = ClientSessionState::WaitStateChange;
139                 }
140                 ClientSessionState::Play => {
141                     log::info!("[C -> S] Play...");
142                     self.send_play(&0.0, &self.stream_name.clone(), &0.0, &0.0, &false)
143                         .await?;
144                     self.state = ClientSessionState::WaitStateChange;
145                 }
146                 ClientSessionState::PublishingContent => {
147                     log::info!("[C -> S] PublishingContent...");
148                     self.send_publish(&0.0, &self.stream_name.clone(), &"live".to_string())
149                         .await?;
150                     self.state = ClientSessionState::WaitStateChange;
151                 }
152                 ClientSessionState::StartPublish => {
153                     log::info!("[C -> S] StartPublish...");
154                     self.common.send_channel_data().await?;
155                 }
156                 ClientSessionState::WaitStateChange => {}
157             }
158 
159             let data = self.io.lock().await.read().await?;
160             self.unpacketizer.extend_data(&data[..]);
161             let result = self.unpacketizer.read_chunk()?;
162 
163             match result {
164                 UnpackResult::ChunkInfo(chunk_info) => {
165                     let mut message_parser = MessageParser::new(chunk_info.clone());
166                     let mut msg = message_parser.parse()?;
167                     let timestamp = chunk_info.message_header.timestamp;
168 
169                     self.process_messages(&mut msg, &timestamp).await?;
170                 }
171                 _ => {}
172             }
173         }
174 
175         // Ok(())
176     }
177 
178     async fn handshake(&mut self) -> Result<(), SessionError> {
179         loop {
180             self.handshaker.handshake().await?;
181             if self.handshaker.state == ClientHandshakeState::Finish {
182                 log::info!("handshake finish");
183                 break;
184             }
185 
186             let data = self.io.lock().await.read().await?;
187             print(data.clone());
188             self.handshaker.extend_data(&data[..]);
189         }
190 
191         self.state = ClientSessionState::Connect;
192 
193         Ok(())
194     }
195 
196     pub async fn process_messages(
197         &mut self,
198         msg: &mut RtmpMessageData,
199         timestamp: &u32,
200     ) -> Result<(), SessionError> {
201         match msg {
202             RtmpMessageData::Amf0Command {
203                 command_name,
204                 transaction_id,
205                 command_object,
206                 others,
207             } => {
208                 self.on_amf0_command_message(command_name, transaction_id, command_object, others)
209                     .await?
210             }
211             RtmpMessageData::SetPeerBandwidth { properties } => {
212                 log::trace!(
213                     "process_messages SetPeerBandwidth windows size: {}",
214                     properties.window_size
215                 );
216                 self.on_set_peer_bandwidth().await?
217             }
218             RtmpMessageData::SetChunkSize { chunk_size } => self.on_set_chunk_size(chunk_size)?,
219 
220             RtmpMessageData::StreamBegin { stream_id } => self.on_stream_begin(stream_id)?,
221 
222             RtmpMessageData::StreamIsRecorded { stream_id } => {
223                 self.on_stream_is_recorded(stream_id)?
224             }
225 
226             RtmpMessageData::AudioData { data } => self.common.on_audio_data(data, timestamp)?,
227 
228             RtmpMessageData::VideoData { data } => self.common.on_video_data(data, timestamp)?,
229 
230             _ => {}
231         }
232         Ok(())
233     }
234 
235     pub async fn on_amf0_command_message(
236         &mut self,
237         command_name: &Amf0ValueType,
238         transaction_id: &Amf0ValueType,
239         command_object: &Amf0ValueType,
240         others: &mut Vec<Amf0ValueType>,
241     ) -> Result<(), SessionError> {
242         let empty_cmd_name = &String::new();
243         let cmd_name = match command_name {
244             Amf0ValueType::UTF8String(str) => str,
245             _ => empty_cmd_name,
246         };
247 
248         let transaction_id = match transaction_id {
249             Amf0ValueType::Number(number) => number.clone() as u8,
250             _ => 0,
251         };
252 
253         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
254         let _ = match command_object {
255             Amf0ValueType::Object(obj) => obj,
256             // Amf0ValueType::Null =>
257             _ => &empty_cmd_obj,
258         };
259 
260         match cmd_name.as_str() {
261             "_result" => match transaction_id {
262                 define::TRANSACTION_ID_CONNECT => {
263                     self.on_result_connect().await?;
264                 }
265                 define::TRANSACTION_ID_CREATE_STREAM => {
266                     self.on_result_create_stream()?;
267                 }
268                 _ => {}
269             },
270             "_error" => {
271                 self.on_error()?;
272             }
273             "onStatus" => {
274                 match others.remove(0) {
275                     Amf0ValueType::Object(obj) => self.on_status(&obj).await?,
276                     _ => {
277                         return Err(SessionError {
278                             value: SessionErrorValue::Amf0ValueCountNotCorrect,
279                         })
280                     }
281                 };
282             }
283 
284             _ => {}
285         }
286 
287         Ok(())
288     }
289 
290     pub async fn send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
291         self.send_set_chunk_size().await?;
292 
293         let mut netconnection = NetConnection::new(BytesWriter::new());
294 
295         let mut properties = ConnectProperties::new_none();
296 
297         let url = format!("rtmp://localhost:1935/{app_name}", app_name = self.app_name);
298         properties.app = Some(self.app_name.clone());
299         properties.tc_url = Some(url.clone());
300 
301         match self.client_type {
302             ClientType::Play => {
303                 properties.flash_ver = Some("flashVerFMLE/3.0 (compatible; FMSc/1.0)".to_string());
304                 properties.swf_url = Some(url.clone());
305             }
306             ClientType::Publish => {
307                 properties.fpad = Some(false);
308                 properties.capabilities = Some(15_f64);
309                 properties.audio_codecs = Some(3191_f64);
310                 properties.video_codecs = Some(252_f64);
311                 properties.video_function = Some(1_f64);
312             }
313         }
314 
315         let data = netconnection.connect(transaction_id, &properties)?;
316 
317         let mut chunk_info = ChunkInfo::new(
318             csid_type::COMMAND_AMF0_AMF3,
319             chunk_type::TYPE_0,
320             0,
321             data.len() as u32,
322             msg_type_id::COMMAND_AMF0,
323             0,
324             data,
325         );
326 
327         self.packetizer.write_chunk(&mut chunk_info).await?;
328         Ok(())
329     }
330 
331     pub async fn send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
332         let mut netconnection = NetConnection::new(BytesWriter::new());
333         let data = netconnection.create_stream(transaction_id)?;
334 
335         let mut chunk_info = ChunkInfo::new(
336             csid_type::COMMAND_AMF0_AMF3,
337             chunk_type::TYPE_0,
338             0,
339             data.len() as u32,
340             msg_type_id::COMMAND_AMF0,
341             0,
342             data,
343         );
344 
345         self.packetizer.write_chunk(&mut chunk_info).await?;
346 
347         Ok(())
348     }
349 
350     pub async fn send_delete_stream(
351         &mut self,
352         transaction_id: &f64,
353         stream_id: &f64,
354     ) -> Result<(), SessionError> {
355         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
356         netstream.delete_stream(transaction_id, stream_id).await?;
357 
358         Ok(())
359     }
360 
361     pub async fn send_publish(
362         &mut self,
363         transaction_id: &f64,
364         stream_name: &String,
365         stream_type: &String,
366     ) -> Result<(), SessionError> {
367         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
368         netstream
369             .publish(transaction_id, stream_name, stream_type)
370             .await?;
371 
372         Ok(())
373     }
374 
375     pub async fn send_play(
376         &mut self,
377         transaction_id: &f64,
378         stream_name: &String,
379         start: &f64,
380         duration: &f64,
381         reset: &bool,
382     ) -> Result<(), SessionError> {
383         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
384         netstream
385             .play(transaction_id, stream_name, start, duration, reset)
386             .await?;
387 
388         Ok(())
389     }
390 
391     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
392         let mut controlmessage =
393             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
394         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
395         Ok(())
396     }
397 
398     pub async fn send_window_acknowledgement_size(
399         &mut self,
400         window_size: u32,
401     ) -> Result<(), SessionError> {
402         let mut controlmessage =
403             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
404         controlmessage
405             .write_window_acknowledgement_size(window_size)
406             .await?;
407         Ok(())
408     }
409 
410     pub async fn send_set_buffer_length(
411         &mut self,
412         stream_id: u32,
413         ms: u32,
414     ) -> Result<(), SessionError> {
415         let mut eventmessages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
416         eventmessages.write_set_buffer_length(stream_id, ms).await?;
417 
418         Ok(())
419     }
420 
421     pub async fn on_result_connect(&mut self) -> Result<(), SessionError> {
422         let mut controlmessage =
423             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
424         controlmessage.write_acknowledgement(3107).await?;
425 
426         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
427         netstream
428             .release_stream(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name)
429             .await?;
430         netstream
431             .fcpublish(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name)
432             .await?;
433 
434         self.state = ClientSessionState::CreateStream;
435 
436         Ok(())
437     }
438 
439     pub fn on_result_create_stream(&mut self) -> Result<(), SessionError> {
440         match self.client_type {
441             ClientType::Play => {
442                 self.state = ClientSessionState::Play;
443             }
444             ClientType::Publish => {
445                 self.state = ClientSessionState::PublishingContent;
446             }
447         }
448         Ok(())
449     }
450 
451     pub fn on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError> {
452         self.unpacketizer
453             .update_max_chunk_size(chunk_size.clone() as usize);
454         Ok(())
455     }
456 
457     pub fn on_stream_is_recorded(&mut self, stream_id: &mut u32) -> Result<(), SessionError> {
458         log::trace!("stream is recorded stream_id is {}", stream_id);
459         Ok(())
460     }
461 
462     pub fn on_stream_begin(&mut self, stream_id: &mut u32) -> Result<(), SessionError> {
463         log::trace!("stream is begin stream_id is {}", stream_id);
464         Ok(())
465     }
466 
467     pub async fn on_set_peer_bandwidth(&mut self) -> Result<(), SessionError> {
468         self.send_window_acknowledgement_size(250000).await?;
469         Ok(())
470     }
471 
472     pub fn on_error(&mut self) -> Result<(), SessionError> {
473         Ok(())
474     }
475 
476     pub async fn on_status(
477         &mut self,
478         obj: &HashMap<String, Amf0ValueType>,
479     ) -> Result<(), SessionError> {
480         if let Some(Amf0ValueType::UTF8String(code_info)) = obj.get("code") {
481             match &code_info[..] {
482                 "NetStream.Publish.Start" => {
483                     self.state = ClientSessionState::StartPublish;
484                     self.common
485                         .subscribe_from_channels(
486                             self.app_name.clone(),
487                             self.stream_name.clone(),
488                             self.subscriber_id,
489                         )
490                         .await?;
491                 }
492                 "NetStream.Publish.Reset" => {}
493 
494                 "NetStream.Play.Start" => {
495                     self.common
496                         .publish_to_channels(self.app_name.clone(), self.stream_name.clone())
497                         .await?
498                 }
499                 _ => {}
500             }
501         }
502         log::trace!("{}", obj.len());
503         Ok(())
504     }
505 }
506