1 use {
2     super::{
3         common::Common,
4         define,
5         define::SessionType,
6         errors::{SessionError, SessionErrorValue},
7     },
8     //crate::utils::print::print,
9     crate::{
10         amf0::Amf0ValueType,
11         channels::define::ChannelEventProducer,
12         chunk::{
13             define::CHUNK_SIZE,
14             unpacketizer::{ChunkUnpacketizer, UnpackResult},
15         },
16         handshake,
17         handshake::{define::ClientHandshakeState, handshake_client::SimpleHandshakeClient},
18         messages::{define::RtmpMessageData, parser::MessageParser},
19         netconnection::writer::{ConnectProperties, NetConnection},
20         netstream::writer::NetStreamWriter,
21         protocol_control_messages::writer::ProtocolControlMessagesWriter,
22         user_control_messages::writer::EventMessagesWriter,
23         utils::RtmpUrlParser,
24     },
25     bytesio::{bytes_writer::AsyncBytesWriter, bytesio::BytesIO},
26     indexmap::IndexMap,
27     std::sync::Arc,
28     tokio::{net::TcpStream, sync::Mutex},
29     uuid::Uuid,
30 };
31 
32 #[allow(dead_code)]
33 enum ClientSessionState {
34     Handshake,
35     Connect,
36     CreateStream,
37     Play,
38     PublishingContent,
39     StartPublish,
40     WaitStateChange,
41 }
42 
43 #[allow(dead_code)]
44 enum ClientSessionPlayState {
45     Handshake,
46     Connect,
47     CreateStream,
48     Play,
49 }
50 
51 #[allow(dead_code)]
52 enum ClientSessionPublishState {
53     Handshake,
54     Connect,
55     CreateStream,
56     PublishingContent,
57 }
58 #[allow(dead_code)]
59 pub enum ClientType {
60     Play,
61     Publish,
62 }
63 pub struct ClientSession {
64     io: Arc<Mutex<BytesIO>>,
65     common: Common,
66     handshaker: SimpleHandshakeClient,
67     unpacketizer: ChunkUnpacketizer,
68     //domain name with port
69     raw_domain_name: String,
70     app_name: String,
71     //stream name with parameters
72     raw_stream_name: String,
73     stream_name: String,
74     /* Used to mark the subscriber's the data producer
75     in channels and delete it from map when unsubscribe
76     is called. */
77     session_id: Uuid,
78     state: ClientSessionState,
79     client_type: ClientType,
80     sub_app_name: Option<String>,
81     sub_stream_name: Option<String>,
82 }
83 
84 impl ClientSession {
85     pub fn new(
86         stream: TcpStream,
87         client_type: ClientType,
88         raw_domain_name: String,
89         app_name: String,
90         raw_stream_name: String,
91         event_producer: ChannelEventProducer,
92     ) -> Self {
93         let remote_addr = if let Ok(addr) = stream.peer_addr() {
94             log::info!("server session: {}", addr.to_string());
95             Some(addr)
96         } else {
97             None
98         };
99 
100         let net_io = Arc::new(Mutex::new(BytesIO::new(stream)));
101         let subscriber_id = Uuid::new_v4();
102 
103         let common = Common::new(
104             Arc::clone(&net_io),
105             event_producer,
106             SessionType::Client,
107             remote_addr,
108         );
109 
110         let (stream_name, _) = RtmpUrlParser::default()
111             .set_raw_stream_name(raw_stream_name.clone())
112             .parse_raw_stream_name();
113 
114         Self {
115             io: Arc::clone(&net_io),
116             common,
117             handshaker: SimpleHandshakeClient::new(Arc::clone(&net_io)),
118             unpacketizer: ChunkUnpacketizer::new(),
119             raw_domain_name,
120             app_name,
121             raw_stream_name,
122             stream_name,
123             session_id: subscriber_id,
124             state: ClientSessionState::Handshake,
125             client_type,
126             sub_app_name: None,
127             sub_stream_name: None,
128         }
129     }
130 
131     pub async fn run(&mut self) -> Result<(), SessionError> {
132         loop {
133             match self.state {
134                 ClientSessionState::Handshake => {
135                     log::info!("[C -> S] handshake...");
136                     self.handshake().await?;
137                     continue;
138                 }
139                 ClientSessionState::Connect => {
140                     log::info!("[C -> S] connect...");
141                     self.send_connect(&(define::TRANSACTION_ID_CONNECT as f64))
142                         .await?;
143                     self.state = ClientSessionState::WaitStateChange;
144                 }
145                 ClientSessionState::CreateStream => {
146                     log::info!("[C -> S] CreateStream...");
147                     self.send_create_stream(&(define::TRANSACTION_ID_CREATE_STREAM as f64))
148                         .await?;
149                     self.state = ClientSessionState::WaitStateChange;
150                 }
151                 ClientSessionState::Play => {
152                     log::info!("[C -> S] Play...");
153                     self.send_play(&0.0, &self.raw_stream_name.clone(), &0.0, &0.0, &false)
154                         .await?;
155                     self.state = ClientSessionState::WaitStateChange;
156                 }
157                 ClientSessionState::PublishingContent => {
158                     log::info!("[C -> S] PublishingContent...");
159                     self.send_publish(&3.0, &self.raw_stream_name.clone(), &"live".to_string())
160                         .await?;
161                     self.state = ClientSessionState::WaitStateChange;
162                 }
163                 ClientSessionState::StartPublish => {
164                     log::info!("[C -> S] StartPublish...");
165                     self.common.send_channel_data().await?;
166                 }
167                 ClientSessionState::WaitStateChange => {}
168             }
169 
170             let data = self.io.lock().await.read().await?;
171             self.unpacketizer.extend_data(&data[..]);
172 
173             loop {
174                 match self.unpacketizer.read_chunks() {
175                     Ok(rv) => {
176                         if let UnpackResult::Chunks(chunks) = rv {
177                             for chunk_info in chunks.iter() {
178                                 let mut msg = MessageParser::new(chunk_info.clone()).parse()?;
179                                 let timestamp = chunk_info.message_header.timestamp;
180                                 self.process_messages(&mut msg, &timestamp).await?;
181                             }
182                         }
183                     }
184                     Err(err) => {
185                         log::trace!("read trunks error: {}", err);
186                         break;
187                     }
188                 }
189             }
190         }
191     }
192 
193     async fn handshake(&mut self) -> Result<(), SessionError> {
194         loop {
195             self.handshaker.handshake().await?;
196             if self.handshaker.state == ClientHandshakeState::Finish {
197                 log::info!("handshake finish");
198                 break;
199             }
200 
201             let mut bytes_len = 0;
202             while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE * 2 {
203                 let data = self.io.lock().await.read().await?;
204                 bytes_len += data.len();
205                 self.handshaker.extend_data(&data[..]);
206             }
207         }
208 
209         self.state = ClientSessionState::Connect;
210 
211         Ok(())
212     }
213 
214     pub async fn process_messages(
215         &mut self,
216         msg: &mut RtmpMessageData,
217         timestamp: &u32,
218     ) -> Result<(), SessionError> {
219         match msg {
220             RtmpMessageData::Amf0Command {
221                 command_name,
222                 transaction_id,
223                 command_object,
224                 others,
225             } => {
226                 log::info!("[C <- S] on_amf0_command_message...");
227                 self.on_amf0_command_message(command_name, transaction_id, command_object, others)
228                     .await?
229             }
230             RtmpMessageData::SetPeerBandwidth { .. } => {
231                 log::info!("[C <- S] on_set_peer_bandwidth...");
232                 self.on_set_peer_bandwidth().await?
233             }
234             RtmpMessageData::WindowAcknowledgementSize { .. } => {
235                 log::info!("[C <- S] on_windows_acknowledgement_size...");
236             }
237             RtmpMessageData::SetChunkSize { chunk_size } => {
238                 log::info!("[C <- S] on_set_chunk_size...");
239                 self.on_set_chunk_size(chunk_size)?;
240             }
241             RtmpMessageData::StreamBegin { stream_id } => {
242                 log::info!("[C <- S] on_stream_begin...");
243                 self.on_stream_begin(stream_id)?;
244             }
245             RtmpMessageData::StreamIsRecorded { stream_id } => {
246                 log::info!("[C <- S] on_stream_is_recorded...");
247                 self.on_stream_is_recorded(stream_id)?;
248             }
249             RtmpMessageData::AudioData { data } => self.common.on_audio_data(data, timestamp)?,
250             RtmpMessageData::VideoData { data } => self.common.on_video_data(data, timestamp)?,
251             RtmpMessageData::AmfData { raw_data } => {
252                 self.common.on_meta_data(raw_data, timestamp)?;
253             }
254 
255             _ => {}
256         }
257         Ok(())
258     }
259 
260     pub async fn on_amf0_command_message(
261         &mut self,
262         command_name: &Amf0ValueType,
263         transaction_id: &Amf0ValueType,
264         command_object: &Amf0ValueType,
265         others: &mut Vec<Amf0ValueType>,
266     ) -> Result<(), SessionError> {
267         log::info!("[C <- S] on_amf0_command_message...");
268         let empty_cmd_name = &String::new();
269         let cmd_name = match command_name {
270             Amf0ValueType::UTF8String(str) => str,
271             _ => empty_cmd_name,
272         };
273 
274         let transaction_id = match transaction_id {
275             Amf0ValueType::Number(number) => *number as u8,
276             _ => 0,
277         };
278 
279         let empty_cmd_obj: IndexMap<String, Amf0ValueType> = IndexMap::new();
280         let _ = match command_object {
281             Amf0ValueType::Object(obj) => obj,
282             // Amf0ValueType::Null =>
283             _ => &empty_cmd_obj,
284         };
285 
286         match cmd_name.as_str() {
287             "_result" => match transaction_id {
288                 define::TRANSACTION_ID_CONNECT => {
289                     log::info!("[C <- S] on_result_connect...");
290                     self.on_result_connect().await?;
291                 }
292                 define::TRANSACTION_ID_CREATE_STREAM => {
293                     log::info!("[C <- S] on_result_create_stream...");
294                     self.on_result_create_stream()?;
295                 }
296                 _ => {}
297             },
298             "_error" => {
299                 self.on_error()?;
300             }
301             "onStatus" => {
302                 match others.remove(0) {
303                     Amf0ValueType::Object(obj) => self.on_status(&obj).await?,
304                     _ => {
305                         return Err(SessionError {
306                             value: SessionErrorValue::Amf0ValueCountNotCorrect,
307                         })
308                     }
309                 };
310             }
311 
312             _ => {}
313         }
314 
315         Ok(())
316     }
317 
318     pub async fn send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
319         self.send_set_chunk_size().await?;
320 
321         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
322         let mut properties = ConnectProperties::new_none();
323 
324         let url = format!(
325             "rtmp://{domain_name}/{app_name}",
326             domain_name = self.raw_domain_name,
327             app_name = self.app_name
328         );
329         properties.app = Some(self.app_name.clone());
330 
331         match self.client_type {
332             ClientType::Play => {
333                 properties.flash_ver = Some("LNX 9,0,124,2".to_string());
334                 properties.tc_url = Some(url.clone());
335                 properties.fpad = Some(false);
336                 properties.capabilities = Some(15_f64);
337                 properties.audio_codecs = Some(4071_f64);
338                 properties.video_codecs = Some(252_f64);
339                 properties.video_function = Some(1_f64);
340             }
341             ClientType::Publish => {
342                 properties.pub_type = Some("nonprivate".to_string());
343                 properties.flash_ver = Some("FMLE/3.0 (compatible; xiu)".to_string());
344                 properties.fpad = Some(false);
345                 properties.tc_url = Some(url.clone());
346             }
347         }
348 
349         netconnection
350             .write_connect(transaction_id, &properties)
351             .await?;
352 
353         Ok(())
354     }
355 
356     pub async fn send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
357         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
358         netconnection.write_create_stream(transaction_id).await?;
359 
360         Ok(())
361     }
362 
363     pub async fn send_delete_stream(
364         &mut self,
365         transaction_id: &f64,
366         stream_id: &f64,
367     ) -> Result<(), SessionError> {
368         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
369         netstream
370             .write_delete_stream(transaction_id, stream_id)
371             .await?;
372 
373         Ok(())
374     }
375 
376     pub async fn send_publish(
377         &mut self,
378         transaction_id: &f64,
379         stream_name: &String,
380         stream_type: &String,
381     ) -> Result<(), SessionError> {
382         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
383         netstream
384             .write_publish(transaction_id, stream_name, stream_type)
385             .await?;
386 
387         Ok(())
388     }
389 
390     pub async fn send_play(
391         &mut self,
392         transaction_id: &f64,
393         stream_name: &String,
394         start: &f64,
395         duration: &f64,
396         reset: &bool,
397     ) -> Result<(), SessionError> {
398         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
399         netstream
400             .write_play(transaction_id, stream_name, start, duration, reset)
401             .await?;
402 
403         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
404         netconnection
405             .write_get_stream_length(transaction_id, stream_name)
406             .await?;
407 
408         self.send_set_buffer_length(1, 1300).await?;
409 
410         Ok(())
411     }
412 
413     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
414         let mut controlmessage =
415             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
416         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
417         Ok(())
418     }
419 
420     pub async fn send_window_acknowledgement_size(
421         &mut self,
422         window_size: u32,
423     ) -> Result<(), SessionError> {
424         let mut controlmessage =
425             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
426         controlmessage
427             .write_window_acknowledgement_size(window_size)
428             .await?;
429         Ok(())
430     }
431 
432     pub async fn send_set_buffer_length(
433         &mut self,
434         stream_id: u32,
435         ms: u32,
436     ) -> Result<(), SessionError> {
437         let mut eventmessages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
438         eventmessages.write_set_buffer_length(stream_id, ms).await?;
439 
440         Ok(())
441     }
442 
443     pub async fn on_result_connect(&mut self) -> Result<(), SessionError> {
444         let mut controlmessage =
445             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
446         controlmessage.write_acknowledgement(3107).await?;
447 
448         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
449         netstream
450             .write_release_stream(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name)
451             .await?;
452         netstream
453             .write_fcpublish(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name)
454             .await?;
455 
456         self.state = ClientSessionState::CreateStream;
457 
458         Ok(())
459     }
460 
461     pub fn on_result_create_stream(&mut self) -> Result<(), SessionError> {
462         match self.client_type {
463             ClientType::Play => {
464                 self.state = ClientSessionState::Play;
465             }
466             ClientType::Publish => {
467                 self.state = ClientSessionState::PublishingContent;
468             }
469         }
470         Ok(())
471     }
472 
473     pub fn on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError> {
474         self.unpacketizer
475             .update_max_chunk_size(*chunk_size as usize);
476         Ok(())
477     }
478 
479     pub fn on_stream_is_recorded(&mut self, stream_id: &mut u32) -> Result<(), SessionError> {
480         log::trace!("stream is recorded stream_id is {}", stream_id);
481         Ok(())
482     }
483 
484     pub fn on_stream_begin(&mut self, stream_id: &mut u32) -> Result<(), SessionError> {
485         log::trace!("stream is begin stream_id is {}", stream_id);
486         Ok(())
487     }
488 
489     pub async fn on_set_peer_bandwidth(&mut self) -> Result<(), SessionError> {
490         self.send_window_acknowledgement_size(5000000).await?;
491 
492         Ok(())
493     }
494 
495     pub fn on_error(&mut self) -> Result<(), SessionError> {
496         Ok(())
497     }
498 
499     pub async fn on_status(
500         &mut self,
501         obj: &IndexMap<String, Amf0ValueType>,
502     ) -> Result<(), SessionError> {
503         if let Some(Amf0ValueType::UTF8String(code_info)) = obj.get("code") {
504             match &code_info[..] {
505                 "NetStream.Publish.Start" => {
506                     self.state = ClientSessionState::StartPublish;
507                     //subscribe from local session and publish to remote rtmp server
508                     if let (Some(app_name), Some(stream_name)) =
509                         (&self.sub_app_name, &self.sub_stream_name)
510                     {
511                         self.common
512                             .subscribe_from_channels(
513                                 app_name.clone(),
514                                 stream_name.clone(),
515                                 self.session_id,
516                             )
517                             .await?;
518                     } else {
519                         self.common
520                             .subscribe_from_channels(
521                                 self.app_name.clone(),
522                                 self.stream_name.clone(),
523                                 self.session_id,
524                             )
525                             .await?;
526                     }
527                 }
528                 "NetStream.Publish.Reset" => {}
529                 "NetStream.Play.Start" => {
530                     //pull from remote rtmp server and publish to local session
531                     self.common
532                         .publish_to_channels(
533                             self.app_name.clone(),
534                             self.stream_name.clone(),
535                             self.session_id,
536                         )
537                         .await?
538                 }
539                 _ => {}
540             }
541         }
542         log::trace!("{}", obj.len());
543         Ok(())
544     }
545 
546     pub fn subscribe(&mut self, app_name: String, stream_name: String) {
547         self.sub_app_name = Some(app_name);
548         self.sub_stream_name = Some(stream_name);
549     }
550 }
551