1 use crate::chunk::packetizer::ChunkPacketizer;
2 
3 use {
4     super::{
5         common::Common,
6         define,
7         define::SessionType,
8         errors::{SessionError, SessionErrorValue},
9     },
10     crate::{
11         amf0::Amf0ValueType,
12         chunk::{
13             define::CHUNK_SIZE,
14             unpacketizer::{ChunkUnpacketizer, UnpackResult},
15         },
16         handshake,
17         handshake::{define::ClientHandshakeState, handshake_client::SimpleHandshakeClient},
18         messages::{define::RtmpMessageData, parser::MessageParser},
19         netconnection::writer::{ConnectProperties, NetConnection},
20         netstream::writer::NetStreamWriter,
21         protocol_control_messages::writer::ProtocolControlMessagesWriter,
22         user_control_messages::writer::EventMessagesWriter,
23         utils::RtmpUrlParser,
24     },
25     bytesio::{
26         bytes_writer::AsyncBytesWriter,
27         bytesio::{TNetIO, TcpIO},
28     },
29     indexmap::IndexMap,
30     std::sync::Arc,
31     //crate::utils::print::print,
32     streamhub::define::StreamHubEventSender,
33     streamhub::utils::RandomDigitCount,
34     streamhub::utils::Uuid,
35     tokio::{net::TcpStream, sync::Mutex},
36 };
37 
38 #[allow(dead_code)]
39 enum ClientSessionState {
40     Handshake,
41     Connect,
42     CreateStream,
43     Play,
44     PublishingContent,
45     StartPublish,
46     WaitStateChange,
47 }
48 
49 #[allow(dead_code)]
50 enum ClientSessionPlayState {
51     Handshake,
52     Connect,
53     CreateStream,
54     Play,
55 }
56 
57 #[allow(dead_code)]
58 enum ClientSessionPublishState {
59     Handshake,
60     Connect,
61     CreateStream,
62     PublishingContent,
63 }
64 #[allow(dead_code)]
65 #[derive(Clone, Debug, PartialEq)]
66 pub enum ClientType {
67     Play,
68     Publish,
69 }
70 pub struct ClientSession {
71     io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>,
72     common: Common,
73     handshaker: SimpleHandshakeClient,
74     unpacketizer: ChunkUnpacketizer,
75     //domain name with port
76     raw_domain_name: String,
77     app_name: String,
78     //stream name with parameters
79     raw_stream_name: String,
80     stream_name: String,
81     /* Used to mark the subscriber's the data producer
82     in channels and delete it from map when unsubscribe
83     is called. */
84     session_id: Uuid,
85     state: ClientSessionState,
86     client_type: ClientType,
87     sub_app_name: Option<String>,
88     sub_stream_name: Option<String>,
89     /*configure how many gops will be cached.*/
90     gop_num: usize,
91 }
92 
93 impl ClientSession {
new( stream: TcpStream, client_type: ClientType, raw_domain_name: String, app_name: String, raw_stream_name: String, event_producer: StreamHubEventSender, gop_num: usize, ) -> Self94     pub fn new(
95         stream: TcpStream,
96         client_type: ClientType,
97         raw_domain_name: String,
98         app_name: String,
99         raw_stream_name: String,
100         event_producer: StreamHubEventSender,
101         gop_num: usize,
102     ) -> Self {
103         let remote_addr = if let Ok(addr) = stream.peer_addr() {
104             log::info!("server session: {}", addr.to_string());
105             Some(addr)
106         } else {
107             None
108         };
109 
110         let tcp_io: Box<dyn TNetIO + Send + Sync> = Box::new(TcpIO::new(stream));
111         let net_io = Arc::new(Mutex::new(tcp_io));
112 
113         let subscriber_id = Uuid::new(RandomDigitCount::Four);
114 
115         let packetizer = if client_type == ClientType::Publish {
116             Some(ChunkPacketizer::new(Arc::clone(&net_io)))
117         } else {
118             None
119         };
120 
121         let common = Common::new(packetizer, event_producer, SessionType::Client, remote_addr);
122 
123         let (stream_name, _) = RtmpUrlParser::default()
124             .set_raw_stream_name(raw_stream_name.clone())
125             .parse_raw_stream_name();
126 
127         Self {
128             io: Arc::clone(&net_io),
129             common,
130             handshaker: SimpleHandshakeClient::new(Arc::clone(&net_io)),
131             unpacketizer: ChunkUnpacketizer::new(),
132             raw_domain_name,
133             app_name,
134             raw_stream_name,
135             stream_name,
136             session_id: subscriber_id,
137             state: ClientSessionState::Handshake,
138             client_type,
139             sub_app_name: None,
140             sub_stream_name: None,
141             gop_num,
142         }
143     }
144 
run(&mut self) -> Result<(), SessionError>145     pub async fn run(&mut self) -> Result<(), SessionError> {
146         loop {
147             match self.state {
148                 ClientSessionState::Handshake => {
149                     log::info!("[C -> S] handshake...");
150                     self.handshake().await?;
151                     continue;
152                 }
153                 ClientSessionState::Connect => {
154                     log::info!("[C -> S] connect...");
155                     self.send_connect(&(define::TRANSACTION_ID_CONNECT as f64))
156                         .await?;
157                     self.state = ClientSessionState::WaitStateChange;
158                 }
159                 ClientSessionState::CreateStream => {
160                     log::info!("[C -> S] CreateStream...");
161                     self.send_create_stream(&(define::TRANSACTION_ID_CREATE_STREAM as f64))
162                         .await?;
163                     self.state = ClientSessionState::WaitStateChange;
164                 }
165                 ClientSessionState::Play => {
166                     log::info!("[C -> S] Play...");
167                     self.send_play(&0.0, &self.raw_stream_name.clone(), &0.0, &0.0, &false)
168                         .await?;
169                     self.state = ClientSessionState::WaitStateChange;
170                 }
171                 ClientSessionState::PublishingContent => {
172                     log::info!("[C -> S] PublishingContent...");
173                     self.send_publish(&3.0, &self.raw_stream_name.clone(), &"live".to_string())
174                         .await?;
175                     self.state = ClientSessionState::WaitStateChange;
176                 }
177                 ClientSessionState::StartPublish => {
178                     log::info!("[C -> S] StartPublish...");
179                     self.common.send_channel_data().await?;
180                 }
181                 ClientSessionState::WaitStateChange => {}
182             }
183 
184             let data = self.io.lock().await.read().await?;
185             self.unpacketizer.extend_data(&data[..]);
186 
187             loop {
188                 match self.unpacketizer.read_chunks() {
189                     Ok(rv) => {
190                         if let UnpackResult::Chunks(chunks) = rv {
191                             for chunk_info in chunks.iter() {
192                                 if let Some(mut msg) =
193                                     MessageParser::new(chunk_info.clone()).parse()?
194                                 {
195                                     let timestamp = chunk_info.message_header.timestamp;
196                                     self.process_messages(&mut msg, &timestamp).await?;
197                                 }
198                             }
199                         }
200                     }
201                     Err(err) => {
202                         log::trace!("read trunks error: {}", err);
203                         break;
204                     }
205                 }
206             }
207         }
208     }
209 
handshake(&mut self) -> Result<(), SessionError>210     async fn handshake(&mut self) -> Result<(), SessionError> {
211         loop {
212             self.handshaker.handshake().await?;
213             if self.handshaker.state == ClientHandshakeState::Finish {
214                 log::info!("handshake finish");
215                 break;
216             }
217 
218             let mut bytes_len = 0;
219             while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE * 2 {
220                 let data = self.io.lock().await.read().await?;
221                 bytes_len += data.len();
222                 self.handshaker.extend_data(&data[..]);
223             }
224         }
225 
226         self.state = ClientSessionState::Connect;
227 
228         Ok(())
229     }
230 
process_messages( &mut self, msg: &mut RtmpMessageData, timestamp: &u32, ) -> Result<(), SessionError>231     pub async fn process_messages(
232         &mut self,
233         msg: &mut RtmpMessageData,
234         timestamp: &u32,
235     ) -> Result<(), SessionError> {
236         match msg {
237             RtmpMessageData::Amf0Command {
238                 command_name,
239                 transaction_id,
240                 command_object,
241                 others,
242             } => {
243                 log::info!("[C <- S] on_amf0_command_message...");
244                 self.on_amf0_command_message(command_name, transaction_id, command_object, others)
245                     .await?
246             }
247             RtmpMessageData::SetPeerBandwidth { .. } => {
248                 log::info!("[C <- S] on_set_peer_bandwidth...");
249                 self.on_set_peer_bandwidth().await?
250             }
251             RtmpMessageData::WindowAcknowledgementSize { .. } => {
252                 log::info!("[C <- S] on_windows_acknowledgement_size...");
253             }
254             RtmpMessageData::SetChunkSize { chunk_size } => {
255                 log::info!("[C <- S] on_set_chunk_size...");
256                 self.on_set_chunk_size(chunk_size)?;
257             }
258             RtmpMessageData::StreamBegin { stream_id } => {
259                 log::info!("[C <- S] on_stream_begin...");
260                 self.on_stream_begin(stream_id)?;
261             }
262             RtmpMessageData::StreamIsRecorded { stream_id } => {
263                 log::info!("[C <- S] on_stream_is_recorded...");
264                 self.on_stream_is_recorded(stream_id)?;
265             }
266             RtmpMessageData::AudioData { data } => {
267                 self.common.on_audio_data(data, timestamp).await?
268             }
269             RtmpMessageData::VideoData { data } => {
270                 self.common.on_video_data(data, timestamp).await?
271             }
272             RtmpMessageData::AmfData { raw_data } => {
273                 self.common.on_meta_data(raw_data, timestamp).await?;
274             }
275 
276             _ => {}
277         }
278         Ok(())
279     }
280 
on_amf0_command_message( &mut self, command_name: &Amf0ValueType, transaction_id: &Amf0ValueType, command_object: &Amf0ValueType, others: &mut Vec<Amf0ValueType>, ) -> Result<(), SessionError>281     pub async fn on_amf0_command_message(
282         &mut self,
283         command_name: &Amf0ValueType,
284         transaction_id: &Amf0ValueType,
285         command_object: &Amf0ValueType,
286         others: &mut Vec<Amf0ValueType>,
287     ) -> Result<(), SessionError> {
288         log::info!("[C <- S] on_amf0_command_message...");
289         let empty_cmd_name = &String::new();
290         let cmd_name = match command_name {
291             Amf0ValueType::UTF8String(str) => str,
292             _ => empty_cmd_name,
293         };
294 
295         let transaction_id = match transaction_id {
296             Amf0ValueType::Number(number) => *number as u8,
297             _ => 0,
298         };
299 
300         let empty_cmd_obj: IndexMap<String, Amf0ValueType> = IndexMap::new();
301         let _ = match command_object {
302             Amf0ValueType::Object(obj) => obj,
303             // Amf0ValueType::Null =>
304             _ => &empty_cmd_obj,
305         };
306 
307         match cmd_name.as_str() {
308             "_result" => match transaction_id {
309                 define::TRANSACTION_ID_CONNECT => {
310                     log::info!("[C <- S] on_result_connect...");
311                     self.on_result_connect().await?;
312                 }
313                 define::TRANSACTION_ID_CREATE_STREAM => {
314                     log::info!("[C <- S] on_result_create_stream...");
315                     self.on_result_create_stream()?;
316                 }
317                 _ => {}
318             },
319             "_error" => {
320                 self.on_error()?;
321             }
322             "onStatus" => {
323                 match others.remove(0) {
324                     Amf0ValueType::Object(obj) => self.on_status(&obj).await?,
325                     _ => {
326                         return Err(SessionError {
327                             value: SessionErrorValue::Amf0ValueCountNotCorrect,
328                         })
329                     }
330                 };
331             }
332 
333             _ => {}
334         }
335 
336         Ok(())
337     }
338 
send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError>339     pub async fn send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
340         self.send_set_chunk_size().await?;
341 
342         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
343         let mut properties = ConnectProperties::new_none();
344 
345         let url = format!(
346             "rtmp://{domain_name}/{app_name}",
347             domain_name = self.raw_domain_name,
348             app_name = self.app_name
349         );
350         properties.app = Some(self.app_name.clone());
351 
352         match self.client_type {
353             ClientType::Play => {
354                 properties.flash_ver = Some("LNX 9,0,124,2".to_string());
355                 properties.tc_url = Some(url.clone());
356                 properties.fpad = Some(false);
357                 properties.capabilities = Some(15_f64);
358                 properties.audio_codecs = Some(4071_f64);
359                 properties.video_codecs = Some(252_f64);
360                 properties.video_function = Some(1_f64);
361             }
362             ClientType::Publish => {
363                 properties.pub_type = Some("nonprivate".to_string());
364                 properties.flash_ver = Some("FMLE/3.0 (compatible; xiu)".to_string());
365                 properties.fpad = Some(false);
366                 properties.tc_url = Some(url.clone());
367             }
368         }
369 
370         netconnection
371             .write_connect(transaction_id, &properties)
372             .await?;
373 
374         Ok(())
375     }
376 
send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError>377     pub async fn send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
378         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
379         netconnection.write_create_stream(transaction_id).await?;
380 
381         Ok(())
382     }
383 
send_delete_stream( &mut self, transaction_id: &f64, stream_id: &f64, ) -> Result<(), SessionError>384     pub async fn send_delete_stream(
385         &mut self,
386         transaction_id: &f64,
387         stream_id: &f64,
388     ) -> Result<(), SessionError> {
389         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
390         netstream
391             .write_delete_stream(transaction_id, stream_id)
392             .await?;
393 
394         Ok(())
395     }
396 
send_publish( &mut self, transaction_id: &f64, stream_name: &String, stream_type: &String, ) -> Result<(), SessionError>397     pub async fn send_publish(
398         &mut self,
399         transaction_id: &f64,
400         stream_name: &String,
401         stream_type: &String,
402     ) -> Result<(), SessionError> {
403         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
404         netstream
405             .write_publish(transaction_id, stream_name, stream_type)
406             .await?;
407 
408         Ok(())
409     }
410 
send_play( &mut self, transaction_id: &f64, stream_name: &String, start: &f64, duration: &f64, reset: &bool, ) -> Result<(), SessionError>411     pub async fn send_play(
412         &mut self,
413         transaction_id: &f64,
414         stream_name: &String,
415         start: &f64,
416         duration: &f64,
417         reset: &bool,
418     ) -> Result<(), SessionError> {
419         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
420         netstream
421             .write_play(transaction_id, stream_name, start, duration, reset)
422             .await?;
423 
424         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
425         netconnection
426             .write_get_stream_length(transaction_id, stream_name)
427             .await?;
428 
429         self.send_set_buffer_length(1, 1300).await?;
430 
431         Ok(())
432     }
433 
send_set_chunk_size(&mut self) -> Result<(), SessionError>434     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
435         let mut controlmessage =
436             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
437         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
438         Ok(())
439     }
440 
send_window_acknowledgement_size( &mut self, window_size: u32, ) -> Result<(), SessionError>441     pub async fn send_window_acknowledgement_size(
442         &mut self,
443         window_size: u32,
444     ) -> Result<(), SessionError> {
445         let mut controlmessage =
446             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
447         controlmessage
448             .write_window_acknowledgement_size(window_size)
449             .await?;
450         Ok(())
451     }
452 
send_set_buffer_length( &mut self, stream_id: u32, ms: u32, ) -> Result<(), SessionError>453     pub async fn send_set_buffer_length(
454         &mut self,
455         stream_id: u32,
456         ms: u32,
457     ) -> Result<(), SessionError> {
458         let mut eventmessages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
459         eventmessages.write_set_buffer_length(stream_id, ms).await?;
460 
461         Ok(())
462     }
463 
on_result_connect(&mut self) -> Result<(), SessionError>464     pub async fn on_result_connect(&mut self) -> Result<(), SessionError> {
465         let mut controlmessage =
466             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
467         controlmessage.write_acknowledgement(3107).await?;
468 
469         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
470         netstream
471             .write_release_stream(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name)
472             .await?;
473         netstream
474             .write_fcpublish(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name)
475             .await?;
476 
477         self.state = ClientSessionState::CreateStream;
478 
479         Ok(())
480     }
481 
on_result_create_stream(&mut self) -> Result<(), SessionError>482     pub fn on_result_create_stream(&mut self) -> Result<(), SessionError> {
483         match self.client_type {
484             ClientType::Play => {
485                 self.state = ClientSessionState::Play;
486             }
487             ClientType::Publish => {
488                 self.state = ClientSessionState::PublishingContent;
489             }
490         }
491         Ok(())
492     }
493 
on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError>494     pub fn on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError> {
495         self.unpacketizer
496             .update_max_chunk_size(*chunk_size as usize);
497         Ok(())
498     }
499 
on_stream_is_recorded(&mut self, stream_id: &mut u32) -> Result<(), SessionError>500     pub fn on_stream_is_recorded(&mut self, stream_id: &mut u32) -> Result<(), SessionError> {
501         log::trace!("stream is recorded stream_id is {}", stream_id);
502         Ok(())
503     }
504 
on_stream_begin(&mut self, stream_id: &mut u32) -> Result<(), SessionError>505     pub fn on_stream_begin(&mut self, stream_id: &mut u32) -> Result<(), SessionError> {
506         log::trace!("stream is begin stream_id is {}", stream_id);
507         Ok(())
508     }
509 
on_set_peer_bandwidth(&mut self) -> Result<(), SessionError>510     pub async fn on_set_peer_bandwidth(&mut self) -> Result<(), SessionError> {
511         self.send_window_acknowledgement_size(5000000).await?;
512 
513         Ok(())
514     }
515 
on_error(&mut self) -> Result<(), SessionError>516     pub fn on_error(&mut self) -> Result<(), SessionError> {
517         Ok(())
518     }
519 
on_status( &mut self, obj: &IndexMap<String, Amf0ValueType>, ) -> Result<(), SessionError>520     pub async fn on_status(
521         &mut self,
522         obj: &IndexMap<String, Amf0ValueType>,
523     ) -> Result<(), SessionError> {
524         if let Some(Amf0ValueType::UTF8String(code_info)) = obj.get("code") {
525             match &code_info[..] {
526                 "NetStream.Publish.Start" => {
527                     self.state = ClientSessionState::StartPublish;
528                     //subscribe from local session and publish to remote rtmp server
529                     if let (Some(app_name), Some(stream_name)) =
530                         (&self.sub_app_name, &self.sub_stream_name)
531                     {
532                         self.common
533                             .subscribe_from_channels(
534                                 app_name.clone(),
535                                 stream_name.clone(),
536                                 self.session_id,
537                             )
538                             .await?;
539                     } else {
540                         self.common
541                             .subscribe_from_channels(
542                                 self.app_name.clone(),
543                                 self.stream_name.clone(),
544                                 self.session_id,
545                             )
546                             .await?;
547                     }
548                 }
549                 "NetStream.Publish.Reset" => {}
550                 "NetStream.Play.Start" => {
551                     //pull from remote rtmp server and publish to local session
552                     self.common
553                         .publish_to_channels(
554                             self.app_name.clone(),
555                             self.stream_name.clone(),
556                             self.session_id,
557                             self.gop_num,
558                         )
559                         .await?
560                 }
561                 _ => {}
562             }
563         }
564         log::trace!("{}", obj.len());
565         Ok(())
566     }
567 
subscribe(&mut self, app_name: String, stream_name: String)568     pub fn subscribe(&mut self, app_name: String, stream_name: String) {
569         self.sub_app_name = Some(app_name);
570         self.sub_stream_name = Some(stream_name);
571     }
572 }
573