1 use uuid::Uuid;
2 
3 use {
4     super::{
5         common::Common,
6         define,
7         define::SessionType,
8         errors::{SessionError, SessionErrorValue},
9     },
10     //crate::utils::print::print,
11     crate::{
12         amf0::Amf0ValueType,
13         channels::define::ChannelEventProducer,
14         chunk::{
15             define::CHUNK_SIZE,
16             unpacketizer::{ChunkUnpacketizer, UnpackResult},
17         },
18         handshake::{define::ClientHandshakeState, handshake_client::SimpleHandshakeClient},
19         messages::{define::RtmpMessageData, parser::MessageParser},
20         netconnection::writer::{ConnectProperties, NetConnection},
21         netstream::writer::NetStreamWriter,
22         protocol_control_messages::writer::ProtocolControlMessagesWriter,
23         user_control_messages::writer::EventMessagesWriter,
24     },
25     bytesio::{bytes_writer::AsyncBytesWriter, bytesio::BytesIO},
26     std::{collections::HashMap, sync::Arc},
27     tokio::{net::TcpStream, sync::Mutex},
28 };
29 
30 #[allow(dead_code)]
31 enum ClientSessionState {
32     Handshake,
33     Connect,
34     CreateStream,
35     Play,
36     PublishingContent,
37     StartPublish,
38     WaitStateChange,
39 }
40 
41 #[allow(dead_code)]
42 enum ClientSessionPlayState {
43     Handshake,
44     Connect,
45     CreateStream,
46     Play,
47 }
48 
49 #[allow(dead_code)]
50 enum ClientSessionPublishState {
51     Handshake,
52     Connect,
53     CreateStream,
54     PublishingContent,
55 }
56 #[allow(dead_code)]
57 pub enum ClientType {
58     Play,
59     Publish,
60 }
61 pub struct ClientSession {
62     io: Arc<Mutex<BytesIO>>,
63     common: Common,
64     handshaker: SimpleHandshakeClient,
65     unpacketizer: ChunkUnpacketizer,
66     app_name: String,
67     stream_name: String,
68     /* Used to mark the subscriber's the data producer
69     in channels and delete it from map when unsubscribe
70     is called. */
71     subscriber_id: Uuid,
72     state: ClientSessionState,
73     client_type: ClientType,
74 }
75 
76 impl ClientSession {
77     #[allow(dead_code)]
78     pub fn new(
79         stream: TcpStream,
80         client_type: ClientType,
81         app_name: String,
82         stream_name: String,
83         event_producer: ChannelEventProducer,
84     ) -> Self {
85         let net_io = Arc::new(Mutex::new(BytesIO::new(stream)));
86         let subscriber_id = Uuid::new_v4();
87 
88         Self {
89             io: Arc::clone(&net_io),
90             common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Client),
91             handshaker: SimpleHandshakeClient::new(Arc::clone(&net_io)),
92             unpacketizer: ChunkUnpacketizer::new(),
93             app_name,
94             stream_name,
95             client_type,
96             state: ClientSessionState::Handshake,
97             subscriber_id,
98         }
99     }
100 
101     pub async fn run(&mut self) -> Result<(), SessionError> {
102         loop {
103             match self.state {
104                 ClientSessionState::Handshake => {
105                     log::info!("[C -> S] handshake...");
106                     self.handshake().await?;
107                     continue;
108                 }
109                 ClientSessionState::Connect => {
110                     log::info!("[C -> S] connect...");
111                     self.send_connect(&(define::TRANSACTION_ID_CONNECT as f64))
112                         .await?;
113                     self.state = ClientSessionState::WaitStateChange;
114                 }
115                 ClientSessionState::CreateStream => {
116                     log::info!("[C -> S] CreateStream...");
117                     self.send_create_stream(&(define::TRANSACTION_ID_CREATE_STREAM as f64))
118                         .await?;
119                     self.state = ClientSessionState::WaitStateChange;
120                 }
121                 ClientSessionState::Play => {
122                     log::info!("[C -> S] Play...");
123                     self.send_play(&0.0, &self.stream_name.clone(), &0.0, &0.0, &false)
124                         .await?;
125                     self.state = ClientSessionState::WaitStateChange;
126                 }
127                 ClientSessionState::PublishingContent => {
128                     log::info!("[C -> S] PublishingContent...");
129                     self.send_publish(&0.0, &self.stream_name.clone(), &"live".to_string())
130                         .await?;
131                     self.state = ClientSessionState::WaitStateChange;
132                 }
133                 ClientSessionState::StartPublish => {
134                     log::info!("[C -> S] StartPublish...");
135                     self.common.send_channel_data().await?;
136                 }
137                 ClientSessionState::WaitStateChange => {}
138             }
139 
140             let data = self.io.lock().await.read().await?;
141             self.unpacketizer.extend_data(&data[..]);
142 
143             loop {
144                 let result = self.unpacketizer.read_chunks();
145 
146                 if let Ok(rv) = result {
147                     if let UnpackResult::Chunks(chunks) = rv {
148                         for chunk_info in chunks.iter() {
149                             let mut msg = MessageParser::new(chunk_info.clone()).parse()?;
150 
151                             let timestamp = chunk_info.message_header.timestamp;
152                             self.process_messages(&mut msg, &timestamp).await?;
153                         }
154                     }
155                 } else {
156                     break;
157                 }
158             }
159         }
160     }
161 
162     async fn handshake(&mut self) -> Result<(), SessionError> {
163         loop {
164             self.handshaker.handshake().await?;
165             if self.handshaker.state == ClientHandshakeState::Finish {
166                 log::info!("handshake finish");
167                 break;
168             }
169 
170             let data = self.io.lock().await.read().await?;
171             self.handshaker.extend_data(&data[..]);
172         }
173 
174         self.state = ClientSessionState::Connect;
175 
176         Ok(())
177     }
178 
179     pub async fn process_messages(
180         &mut self,
181         msg: &mut RtmpMessageData,
182         timestamp: &u32,
183     ) -> Result<(), SessionError> {
184         match msg {
185             RtmpMessageData::Amf0Command {
186                 command_name,
187                 transaction_id,
188                 command_object,
189                 others,
190             } => {
191                 log::info!("[C <- S] on_amf0_command_message...");
192                 self.on_amf0_command_message(command_name, transaction_id, command_object, others)
193                     .await?
194             }
195             RtmpMessageData::SetPeerBandwidth { .. } => {
196                 log::info!("[C <- S] on_set_peer_bandwidth...");
197                 self.on_set_peer_bandwidth().await?
198             }
199             RtmpMessageData::WindowAcknowledgementSize { .. } => {
200                 log::info!("[C <- S] on_windows_acknowledgement_size...");
201             }
202             RtmpMessageData::SetChunkSize { chunk_size } => {
203                 log::info!("[C <- S] on_set_chunk_size...");
204                 self.on_set_chunk_size(chunk_size)?;
205             }
206             RtmpMessageData::StreamBegin { stream_id } => {
207                 log::info!("[C <- S] on_stream_begin...");
208                 self.on_stream_begin(stream_id)?;
209             }
210             RtmpMessageData::StreamIsRecorded { stream_id } => {
211                 log::info!("[C <- S] on_stream_is_recorded...");
212                 self.on_stream_is_recorded(stream_id)?;
213             }
214             RtmpMessageData::AudioData { data } => self.common.on_audio_data(data, timestamp)?,
215             RtmpMessageData::VideoData { data } => self.common.on_video_data(data, timestamp)?,
216 
217             _ => {}
218         }
219         Ok(())
220     }
221 
222     pub async fn on_amf0_command_message(
223         &mut self,
224         command_name: &Amf0ValueType,
225         transaction_id: &Amf0ValueType,
226         command_object: &Amf0ValueType,
227         others: &mut Vec<Amf0ValueType>,
228     ) -> Result<(), SessionError> {
229         let empty_cmd_name = &String::new();
230         let cmd_name = match command_name {
231             Amf0ValueType::UTF8String(str) => str,
232             _ => empty_cmd_name,
233         };
234 
235         let transaction_id = match transaction_id {
236             Amf0ValueType::Number(number) => *number as u8,
237             _ => 0,
238         };
239 
240         let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new();
241         let _ = match command_object {
242             Amf0ValueType::Object(obj) => obj,
243             // Amf0ValueType::Null =>
244             _ => &empty_cmd_obj,
245         };
246 
247         match cmd_name.as_str() {
248             "_result" => match transaction_id {
249                 define::TRANSACTION_ID_CONNECT => {
250                     log::info!("[C <- S] on_result_connect...");
251                     self.on_result_connect().await?;
252                 }
253                 define::TRANSACTION_ID_CREATE_STREAM => {
254                     log::info!("[C <- S] on_result_create_stream...");
255                     self.on_result_create_stream()?;
256                 }
257                 _ => {}
258             },
259             "_error" => {
260                 self.on_error()?;
261             }
262             "onStatus" => {
263                 match others.remove(0) {
264                     Amf0ValueType::Object(obj) => self.on_status(&obj).await?,
265                     _ => {
266                         return Err(SessionError {
267                             value: SessionErrorValue::Amf0ValueCountNotCorrect,
268                         })
269                     }
270                 };
271             }
272 
273             _ => {}
274         }
275 
276         Ok(())
277     }
278 
279     pub async fn send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
280         self.send_set_chunk_size().await?;
281 
282         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
283         let mut properties = ConnectProperties::new_none();
284 
285         let url = format!("rtmp://localhost:1935/{app_name}", app_name = self.app_name);
286         properties.app = Some(self.app_name.clone());
287         properties.tc_url = Some(url.clone());
288 
289         match self.client_type {
290             ClientType::Play => {
291                 properties.flash_ver = Some("flashVerFMLE/3.0 (compatible; FMSc/1.0)".to_string());
292                 properties.swf_url = Some(url.clone());
293             }
294             ClientType::Publish => {
295                 properties.fpad = Some(false);
296                 properties.capabilities = Some(15_f64);
297                 properties.audio_codecs = Some(3191_f64);
298                 properties.video_codecs = Some(252_f64);
299                 properties.video_function = Some(1_f64);
300             }
301         }
302 
303         netconnection
304             .write_connect(transaction_id, &properties)
305             .await?;
306 
307         Ok(())
308     }
309 
310     pub async fn send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
311         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
312         netconnection.write_create_stream(transaction_id).await?;
313 
314         Ok(())
315     }
316 
317     pub async fn send_delete_stream(
318         &mut self,
319         transaction_id: &f64,
320         stream_id: &f64,
321     ) -> Result<(), SessionError> {
322         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
323         netstream
324             .write_delete_stream(transaction_id, stream_id)
325             .await?;
326 
327         Ok(())
328     }
329 
330     pub async fn send_publish(
331         &mut self,
332         transaction_id: &f64,
333         stream_name: &String,
334         stream_type: &String,
335     ) -> Result<(), SessionError> {
336         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
337         netstream
338             .write_publish(transaction_id, stream_name, stream_type)
339             .await?;
340 
341         Ok(())
342     }
343 
344     pub async fn send_play(
345         &mut self,
346         transaction_id: &f64,
347         stream_name: &String,
348         start: &f64,
349         duration: &f64,
350         reset: &bool,
351     ) -> Result<(), SessionError> {
352         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
353         netstream
354             .write_play(transaction_id, stream_name, start, duration, reset)
355             .await?;
356 
357         Ok(())
358     }
359 
360     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
361         let mut controlmessage =
362             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
363         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
364         Ok(())
365     }
366 
367     pub async fn send_window_acknowledgement_size(
368         &mut self,
369         window_size: u32,
370     ) -> Result<(), SessionError> {
371         let mut controlmessage =
372             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
373         controlmessage
374             .write_window_acknowledgement_size(window_size)
375             .await?;
376         Ok(())
377     }
378 
379     pub async fn send_set_buffer_length(
380         &mut self,
381         stream_id: u32,
382         ms: u32,
383     ) -> Result<(), SessionError> {
384         let mut eventmessages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
385         eventmessages.write_set_buffer_length(stream_id, ms).await?;
386 
387         Ok(())
388     }
389 
390     pub async fn on_result_connect(&mut self) -> Result<(), SessionError> {
391         let mut controlmessage =
392             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
393         controlmessage.write_acknowledgement(3107).await?;
394 
395         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
396         netstream
397             .write_release_stream(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name)
398             .await?;
399         netstream
400             .write_fcpublish(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name)
401             .await?;
402 
403         self.state = ClientSessionState::CreateStream;
404 
405         Ok(())
406     }
407 
408     pub fn on_result_create_stream(&mut self) -> Result<(), SessionError> {
409         match self.client_type {
410             ClientType::Play => {
411                 self.state = ClientSessionState::Play;
412             }
413             ClientType::Publish => {
414                 self.state = ClientSessionState::PublishingContent;
415             }
416         }
417         Ok(())
418     }
419 
420     pub fn on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError> {
421         self.unpacketizer
422             .update_max_chunk_size(*chunk_size as usize);
423         Ok(())
424     }
425 
426     pub fn on_stream_is_recorded(&mut self, stream_id: &mut u32) -> Result<(), SessionError> {
427         log::trace!("stream is recorded stream_id is {}", stream_id);
428         Ok(())
429     }
430 
431     pub fn on_stream_begin(&mut self, stream_id: &mut u32) -> Result<(), SessionError> {
432         log::trace!("stream is begin stream_id is {}", stream_id);
433         Ok(())
434     }
435 
436     pub async fn on_set_peer_bandwidth(&mut self) -> Result<(), SessionError> {
437         self.send_window_acknowledgement_size(250000).await?;
438         Ok(())
439     }
440 
441     pub fn on_error(&mut self) -> Result<(), SessionError> {
442         Ok(())
443     }
444 
445     pub async fn on_status(
446         &mut self,
447         obj: &HashMap<String, Amf0ValueType>,
448     ) -> Result<(), SessionError> {
449         if let Some(Amf0ValueType::UTF8String(code_info)) = obj.get("code") {
450             match &code_info[..] {
451                 "NetStream.Publish.Start" => {
452                     self.state = ClientSessionState::StartPublish;
453                     self.common
454                         .subscribe_from_channels(
455                             self.app_name.clone(),
456                             self.stream_name.clone(),
457                             self.subscriber_id,
458                         )
459                         .await?;
460                 }
461                 "NetStream.Publish.Reset" => {}
462 
463                 "NetStream.Play.Start" => {
464                     self.common
465                         .publish_to_channels(self.app_name.clone(), self.stream_name.clone())
466                         .await?
467                 }
468                 _ => {}
469             }
470         }
471         log::trace!("{}", obj.len());
472         Ok(())
473     }
474 }
475