1 use crate::chunk::packetizer::ChunkPacketizer;
2 
3 use {
4     super::{
5         common::Common,
6         define,
7         define::SessionType,
8         errors::{SessionError, SessionErrorValue},
9     },
10     crate::{
11         amf0::Amf0ValueType,
12         chunk::{
13             define::CHUNK_SIZE,
14             unpacketizer::{ChunkUnpacketizer, UnpackResult},
15         },
16         handshake,
17         handshake::{define::ClientHandshakeState, handshake_client::SimpleHandshakeClient},
18         messages::{define::RtmpMessageData, parser::MessageParser},
19         netconnection::writer::{ConnectProperties, NetConnection},
20         netstream::writer::NetStreamWriter,
21         protocol_control_messages::writer::ProtocolControlMessagesWriter,
22         user_control_messages::writer::EventMessagesWriter,
23         utils::RtmpUrlParser,
24     },
25     bytesio::{
26         bytes_writer::AsyncBytesWriter,
27         bytesio::{TNetIO, TcpIO},
28     },
29     indexmap::IndexMap,
30     std::sync::Arc,
31     //crate::utils::print::print,
32     streamhub::define::StreamHubEventSender,
33     streamhub::utils::RandomDigitCount,
34     streamhub::utils::Uuid,
35     tokio::{net::TcpStream, sync::Mutex},
36 };
37 
38 #[allow(dead_code)]
39 enum ClientSessionState {
40     Handshake,
41     Connect,
42     CreateStream,
43     Play,
44     PublishingContent,
45     StartPublish,
46     WaitStateChange,
47 }
48 
49 #[allow(dead_code)]
50 enum ClientSessionPlayState {
51     Handshake,
52     Connect,
53     CreateStream,
54     Play,
55 }
56 
57 #[allow(dead_code)]
58 enum ClientSessionPublishState {
59     Handshake,
60     Connect,
61     CreateStream,
62     PublishingContent,
63 }
64 #[allow(dead_code)]
65 #[derive(Clone, Debug, PartialEq)]
66 pub enum ClientType {
67     Play,
68     Publish,
69 }
70 pub struct ClientSession {
71     io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>,
72     common: Common,
73     handshaker: SimpleHandshakeClient,
74     unpacketizer: ChunkUnpacketizer,
75     //domain name with port
76     raw_domain_name: String,
77     app_name: String,
78     //stream name with parameters
79     raw_stream_name: String,
80     stream_name: String,
81     /* Used to mark the subscriber's the data producer
82     in channels and delete it from map when unsubscribe
83     is called. */
84     session_id: Uuid,
85     state: ClientSessionState,
86     client_type: ClientType,
87     sub_app_name: Option<String>,
88     sub_stream_name: Option<String>,
89     /*configure how many gops will be cached.*/
90     gop_num: usize,
91 }
92 
93 impl ClientSession {
94     pub fn new(
95         stream: TcpStream,
96         client_type: ClientType,
97         raw_domain_name: String,
98         app_name: String,
99         raw_stream_name: String,
100         event_producer: StreamHubEventSender,
101         gop_num: usize,
102     ) -> Self {
103         let remote_addr = if let Ok(addr) = stream.peer_addr() {
104             log::info!("server session: {}", addr.to_string());
105             Some(addr)
106         } else {
107             None
108         };
109 
110         let tcp_io: Box<dyn TNetIO + Send + Sync> = Box::new(TcpIO::new(stream));
111         let net_io = Arc::new(Mutex::new(tcp_io));
112 
113         let subscriber_id = Uuid::new(RandomDigitCount::Four);
114 
115         let packetizer = if client_type == ClientType::Publish {
116             Some(ChunkPacketizer::new(Arc::clone(&net_io)))
117         } else {
118             None
119         };
120 
121         let common = Common::new(packetizer, event_producer, SessionType::Client, remote_addr);
122 
123         let (stream_name, _) = RtmpUrlParser::default()
124             .set_raw_stream_name(raw_stream_name.clone())
125             .parse_raw_stream_name();
126 
127         Self {
128             io: Arc::clone(&net_io),
129             common,
130             handshaker: SimpleHandshakeClient::new(Arc::clone(&net_io)),
131             unpacketizer: ChunkUnpacketizer::new(),
132             raw_domain_name,
133             app_name,
134             raw_stream_name,
135             stream_name,
136             session_id: subscriber_id,
137             state: ClientSessionState::Handshake,
138             client_type,
139             sub_app_name: None,
140             sub_stream_name: None,
141             gop_num,
142         }
143     }
144 
145     pub async fn run(&mut self) -> Result<(), SessionError> {
146         loop {
147             match self.state {
148                 ClientSessionState::Handshake => {
149                     log::info!("[C -> S] handshake...");
150                     self.handshake().await?;
151                     continue;
152                 }
153                 ClientSessionState::Connect => {
154                     log::info!("[C -> S] connect...");
155                     self.send_connect(&(define::TRANSACTION_ID_CONNECT as f64))
156                         .await?;
157                     self.state = ClientSessionState::WaitStateChange;
158                 }
159                 ClientSessionState::CreateStream => {
160                     log::info!("[C -> S] CreateStream...");
161                     self.send_create_stream(&(define::TRANSACTION_ID_CREATE_STREAM as f64))
162                         .await?;
163                     self.state = ClientSessionState::WaitStateChange;
164                 }
165                 ClientSessionState::Play => {
166                     log::info!("[C -> S] Play...");
167                     self.send_play(&0.0, &self.raw_stream_name.clone(), &0.0, &0.0, &false)
168                         .await?;
169                     self.state = ClientSessionState::WaitStateChange;
170                 }
171                 ClientSessionState::PublishingContent => {
172                     log::info!("[C -> S] PublishingContent...");
173                     self.send_publish(&3.0, &self.raw_stream_name.clone(), &"live".to_string())
174                         .await?;
175                     self.state = ClientSessionState::WaitStateChange;
176                 }
177                 ClientSessionState::StartPublish => {
178                     log::info!("[C -> S] StartPublish...");
179                     self.common.send_channel_data().await?;
180                 }
181                 ClientSessionState::WaitStateChange => {}
182             }
183 
184             let data = self.io.lock().await.read().await?;
185             self.unpacketizer.extend_data(&data[..]);
186 
187             loop {
188                 match self.unpacketizer.read_chunks() {
189                     Ok(rv) => {
190                         if let UnpackResult::Chunks(chunks) = rv {
191                             for chunk_info in chunks.iter() {
192                                 let mut msg = MessageParser::new(chunk_info.clone()).parse()?;
193                                 let timestamp = chunk_info.message_header.timestamp;
194                                 self.process_messages(&mut msg, &timestamp).await?;
195                             }
196                         }
197                     }
198                     Err(err) => {
199                         log::trace!("read trunks error: {}", err);
200                         break;
201                     }
202                 }
203             }
204         }
205     }
206 
207     async fn handshake(&mut self) -> Result<(), SessionError> {
208         loop {
209             self.handshaker.handshake().await?;
210             if self.handshaker.state == ClientHandshakeState::Finish {
211                 log::info!("handshake finish");
212                 break;
213             }
214 
215             let mut bytes_len = 0;
216             while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE * 2 {
217                 let data = self.io.lock().await.read().await?;
218                 bytes_len += data.len();
219                 self.handshaker.extend_data(&data[..]);
220             }
221         }
222 
223         self.state = ClientSessionState::Connect;
224 
225         Ok(())
226     }
227 
228     pub async fn process_messages(
229         &mut self,
230         msg: &mut RtmpMessageData,
231         timestamp: &u32,
232     ) -> Result<(), SessionError> {
233         match msg {
234             RtmpMessageData::Amf0Command {
235                 command_name,
236                 transaction_id,
237                 command_object,
238                 others,
239             } => {
240                 log::info!("[C <- S] on_amf0_command_message...");
241                 self.on_amf0_command_message(command_name, transaction_id, command_object, others)
242                     .await?
243             }
244             RtmpMessageData::SetPeerBandwidth { .. } => {
245                 log::info!("[C <- S] on_set_peer_bandwidth...");
246                 self.on_set_peer_bandwidth().await?
247             }
248             RtmpMessageData::WindowAcknowledgementSize { .. } => {
249                 log::info!("[C <- S] on_windows_acknowledgement_size...");
250             }
251             RtmpMessageData::SetChunkSize { chunk_size } => {
252                 log::info!("[C <- S] on_set_chunk_size...");
253                 self.on_set_chunk_size(chunk_size)?;
254             }
255             RtmpMessageData::StreamBegin { stream_id } => {
256                 log::info!("[C <- S] on_stream_begin...");
257                 self.on_stream_begin(stream_id)?;
258             }
259             RtmpMessageData::StreamIsRecorded { stream_id } => {
260                 log::info!("[C <- S] on_stream_is_recorded...");
261                 self.on_stream_is_recorded(stream_id)?;
262             }
263             RtmpMessageData::AudioData { data } => {
264                 self.common.on_audio_data(data, timestamp).await?
265             }
266             RtmpMessageData::VideoData { data } => {
267                 self.common.on_video_data(data, timestamp).await?
268             }
269             RtmpMessageData::AmfData { raw_data } => {
270                 self.common.on_meta_data(raw_data, timestamp).await?;
271             }
272 
273             _ => {}
274         }
275         Ok(())
276     }
277 
278     pub async fn on_amf0_command_message(
279         &mut self,
280         command_name: &Amf0ValueType,
281         transaction_id: &Amf0ValueType,
282         command_object: &Amf0ValueType,
283         others: &mut Vec<Amf0ValueType>,
284     ) -> Result<(), SessionError> {
285         log::info!("[C <- S] on_amf0_command_message...");
286         let empty_cmd_name = &String::new();
287         let cmd_name = match command_name {
288             Amf0ValueType::UTF8String(str) => str,
289             _ => empty_cmd_name,
290         };
291 
292         let transaction_id = match transaction_id {
293             Amf0ValueType::Number(number) => *number as u8,
294             _ => 0,
295         };
296 
297         let empty_cmd_obj: IndexMap<String, Amf0ValueType> = IndexMap::new();
298         let _ = match command_object {
299             Amf0ValueType::Object(obj) => obj,
300             // Amf0ValueType::Null =>
301             _ => &empty_cmd_obj,
302         };
303 
304         match cmd_name.as_str() {
305             "_result" => match transaction_id {
306                 define::TRANSACTION_ID_CONNECT => {
307                     log::info!("[C <- S] on_result_connect...");
308                     self.on_result_connect().await?;
309                 }
310                 define::TRANSACTION_ID_CREATE_STREAM => {
311                     log::info!("[C <- S] on_result_create_stream...");
312                     self.on_result_create_stream()?;
313                 }
314                 _ => {}
315             },
316             "_error" => {
317                 self.on_error()?;
318             }
319             "onStatus" => {
320                 match others.remove(0) {
321                     Amf0ValueType::Object(obj) => self.on_status(&obj).await?,
322                     _ => {
323                         return Err(SessionError {
324                             value: SessionErrorValue::Amf0ValueCountNotCorrect,
325                         })
326                     }
327                 };
328             }
329 
330             _ => {}
331         }
332 
333         Ok(())
334     }
335 
336     pub async fn send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
337         self.send_set_chunk_size().await?;
338 
339         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
340         let mut properties = ConnectProperties::new_none();
341 
342         let url = format!(
343             "rtmp://{domain_name}/{app_name}",
344             domain_name = self.raw_domain_name,
345             app_name = self.app_name
346         );
347         properties.app = Some(self.app_name.clone());
348 
349         match self.client_type {
350             ClientType::Play => {
351                 properties.flash_ver = Some("LNX 9,0,124,2".to_string());
352                 properties.tc_url = Some(url.clone());
353                 properties.fpad = Some(false);
354                 properties.capabilities = Some(15_f64);
355                 properties.audio_codecs = Some(4071_f64);
356                 properties.video_codecs = Some(252_f64);
357                 properties.video_function = Some(1_f64);
358             }
359             ClientType::Publish => {
360                 properties.pub_type = Some("nonprivate".to_string());
361                 properties.flash_ver = Some("FMLE/3.0 (compatible; xiu)".to_string());
362                 properties.fpad = Some(false);
363                 properties.tc_url = Some(url.clone());
364             }
365         }
366 
367         netconnection
368             .write_connect(transaction_id, &properties)
369             .await?;
370 
371         Ok(())
372     }
373 
374     pub async fn send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
375         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
376         netconnection.write_create_stream(transaction_id).await?;
377 
378         Ok(())
379     }
380 
381     pub async fn send_delete_stream(
382         &mut self,
383         transaction_id: &f64,
384         stream_id: &f64,
385     ) -> Result<(), SessionError> {
386         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
387         netstream
388             .write_delete_stream(transaction_id, stream_id)
389             .await?;
390 
391         Ok(())
392     }
393 
394     pub async fn send_publish(
395         &mut self,
396         transaction_id: &f64,
397         stream_name: &String,
398         stream_type: &String,
399     ) -> Result<(), SessionError> {
400         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
401         netstream
402             .write_publish(transaction_id, stream_name, stream_type)
403             .await?;
404 
405         Ok(())
406     }
407 
408     pub async fn send_play(
409         &mut self,
410         transaction_id: &f64,
411         stream_name: &String,
412         start: &f64,
413         duration: &f64,
414         reset: &bool,
415     ) -> Result<(), SessionError> {
416         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
417         netstream
418             .write_play(transaction_id, stream_name, start, duration, reset)
419             .await?;
420 
421         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
422         netconnection
423             .write_get_stream_length(transaction_id, stream_name)
424             .await?;
425 
426         self.send_set_buffer_length(1, 1300).await?;
427 
428         Ok(())
429     }
430 
431     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
432         let mut controlmessage =
433             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
434         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
435         Ok(())
436     }
437 
438     pub async fn send_window_acknowledgement_size(
439         &mut self,
440         window_size: u32,
441     ) -> Result<(), SessionError> {
442         let mut controlmessage =
443             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
444         controlmessage
445             .write_window_acknowledgement_size(window_size)
446             .await?;
447         Ok(())
448     }
449 
450     pub async fn send_set_buffer_length(
451         &mut self,
452         stream_id: u32,
453         ms: u32,
454     ) -> Result<(), SessionError> {
455         let mut eventmessages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
456         eventmessages.write_set_buffer_length(stream_id, ms).await?;
457 
458         Ok(())
459     }
460 
461     pub async fn on_result_connect(&mut self) -> Result<(), SessionError> {
462         let mut controlmessage =
463             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
464         controlmessage.write_acknowledgement(3107).await?;
465 
466         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
467         netstream
468             .write_release_stream(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name)
469             .await?;
470         netstream
471             .write_fcpublish(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name)
472             .await?;
473 
474         self.state = ClientSessionState::CreateStream;
475 
476         Ok(())
477     }
478 
479     pub fn on_result_create_stream(&mut self) -> Result<(), SessionError> {
480         match self.client_type {
481             ClientType::Play => {
482                 self.state = ClientSessionState::Play;
483             }
484             ClientType::Publish => {
485                 self.state = ClientSessionState::PublishingContent;
486             }
487         }
488         Ok(())
489     }
490 
491     pub fn on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError> {
492         self.unpacketizer
493             .update_max_chunk_size(*chunk_size as usize);
494         Ok(())
495     }
496 
497     pub fn on_stream_is_recorded(&mut self, stream_id: &mut u32) -> Result<(), SessionError> {
498         log::trace!("stream is recorded stream_id is {}", stream_id);
499         Ok(())
500     }
501 
502     pub fn on_stream_begin(&mut self, stream_id: &mut u32) -> Result<(), SessionError> {
503         log::trace!("stream is begin stream_id is {}", stream_id);
504         Ok(())
505     }
506 
507     pub async fn on_set_peer_bandwidth(&mut self) -> Result<(), SessionError> {
508         self.send_window_acknowledgement_size(5000000).await?;
509 
510         Ok(())
511     }
512 
513     pub fn on_error(&mut self) -> Result<(), SessionError> {
514         Ok(())
515     }
516 
517     pub async fn on_status(
518         &mut self,
519         obj: &IndexMap<String, Amf0ValueType>,
520     ) -> Result<(), SessionError> {
521         if let Some(Amf0ValueType::UTF8String(code_info)) = obj.get("code") {
522             match &code_info[..] {
523                 "NetStream.Publish.Start" => {
524                     self.state = ClientSessionState::StartPublish;
525                     //subscribe from local session and publish to remote rtmp server
526                     if let (Some(app_name), Some(stream_name)) =
527                         (&self.sub_app_name, &self.sub_stream_name)
528                     {
529                         self.common
530                             .subscribe_from_channels(
531                                 app_name.clone(),
532                                 stream_name.clone(),
533                                 self.session_id,
534                             )
535                             .await?;
536                     } else {
537                         self.common
538                             .subscribe_from_channels(
539                                 self.app_name.clone(),
540                                 self.stream_name.clone(),
541                                 self.session_id,
542                             )
543                             .await?;
544                     }
545                 }
546                 "NetStream.Publish.Reset" => {}
547                 "NetStream.Play.Start" => {
548                     //pull from remote rtmp server and publish to local session
549                     self.common
550                         .publish_to_channels(
551                             self.app_name.clone(),
552                             self.stream_name.clone(),
553                             self.session_id,
554                             self.gop_num,
555                         )
556                         .await?
557                 }
558                 _ => {}
559             }
560         }
561         log::trace!("{}", obj.len());
562         Ok(())
563     }
564 
565     pub fn subscribe(&mut self, app_name: String, stream_name: String) {
566         self.sub_app_name = Some(app_name);
567         self.sub_stream_name = Some(stream_name);
568     }
569 }
570