113bac29aSHarlan use crate::chunk::packetizer::ChunkPacketizer;
213bac29aSHarlan 
345abfa1fSHarlanC use {
445abfa1fSHarlanC     super::{
592df423eSHarlanC         common::Common,
645abfa1fSHarlanC         define,
716394c08SHarlanC         define::SessionType,
845abfa1fSHarlanC         errors::{SessionError, SessionErrorValue},
945abfa1fSHarlanC     },
1045abfa1fSHarlanC     crate::{
1145abfa1fSHarlanC         amf0::Amf0ValueType,
1245abfa1fSHarlanC         chunk::{
132f7fa101SHarlanC             define::CHUNK_SIZE,
1445abfa1fSHarlanC             unpacketizer::{ChunkUnpacketizer, UnpackResult},
1545abfa1fSHarlanC         },
16c8d4d932SHarlan         handshake,
172d9c981bSHarlanC         handshake::{define::ClientHandshakeState, handshake_client::SimpleHandshakeClient},
182f7fa101SHarlanC         messages::{define::RtmpMessageData, parser::MessageParser},
192f7fa101SHarlanC         netconnection::writer::{ConnectProperties, NetConnection},
2045abfa1fSHarlanC         netstream::writer::NetStreamWriter,
2145abfa1fSHarlanC         protocol_control_messages::writer::ProtocolControlMessagesWriter,
2245abfa1fSHarlanC         user_control_messages::writer::EventMessagesWriter,
23c8d4d932SHarlan         utils::RtmpUrlParser,
2445abfa1fSHarlanC     },
258e71d710SHarlan     bytesio::{
268e71d710SHarlan         bytes_writer::AsyncBytesWriter,
278e71d710SHarlan         bytesio::{TNetIO, TcpIO},
288e71d710SHarlan     },
29c8d4d932SHarlan     indexmap::IndexMap,
30c8d4d932SHarlan     std::sync::Arc,
318e71d710SHarlan     //crate::utils::print::print,
328e71d710SHarlan     streamhub::define::StreamHubEventSender,
338e71d710SHarlan     streamhub::utils::RandomDigitCount,
348e71d710SHarlan     streamhub::utils::Uuid,
3545abfa1fSHarlanC     tokio::{net::TcpStream, sync::Mutex},
3645abfa1fSHarlanC };
374d5a5966SHarlanC 
38f9029ceaSHarlanC #[allow(dead_code)]
390bce4e4cSHarlanC enum ClientSessionState {
400bce4e4cSHarlanC     Handshake,
410bce4e4cSHarlanC     Connect,
420bce4e4cSHarlanC     CreateStream,
430bce4e4cSHarlanC     Play,
440bce4e4cSHarlanC     PublishingContent,
4592df423eSHarlanC     StartPublish,
4637ccd32dSHarlanC     WaitStateChange,
470bce4e4cSHarlanC }
480bce4e4cSHarlanC 
49f9029ceaSHarlanC #[allow(dead_code)]
500bce4e4cSHarlanC enum ClientSessionPlayState {
510bce4e4cSHarlanC     Handshake,
520bce4e4cSHarlanC     Connect,
530bce4e4cSHarlanC     CreateStream,
540bce4e4cSHarlanC     Play,
550bce4e4cSHarlanC }
560bce4e4cSHarlanC 
57f9029ceaSHarlanC #[allow(dead_code)]
580bce4e4cSHarlanC enum ClientSessionPublishState {
590bce4e4cSHarlanC     Handshake,
600bce4e4cSHarlanC     Connect,
610bce4e4cSHarlanC     CreateStream,
620bce4e4cSHarlanC     PublishingContent,
630bce4e4cSHarlanC }
64f9029ceaSHarlanC #[allow(dead_code)]
6513bac29aSHarlan #[derive(Clone, Debug, PartialEq)]
66216fd691SHarlanC pub enum ClientType {
670bce4e4cSHarlanC     Play,
680bce4e4cSHarlanC     Publish,
690bce4e4cSHarlanC }
70fe91dfa7SHarlanC pub struct ClientSession {
718e71d710SHarlan     io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>,
7292df423eSHarlanC     common: Common,
7392df423eSHarlanC     handshaker: SimpleHandshakeClient,
740bce4e4cSHarlanC     unpacketizer: ChunkUnpacketizer,
75c8d4d932SHarlan     //domain name with port
76c8d4d932SHarlan     raw_domain_name: String,
77216fd691SHarlanC     app_name: String,
78c8d4d932SHarlan     //stream name with parameters
79c8d4d932SHarlan     raw_stream_name: String,
806ae9716eSHarlanC     stream_name: String,
8188d91efdSHarlanC     /* Used to mark the subscriber's the data producer
8288d91efdSHarlanC     in channels and delete it from map when unsubscribe
8388d91efdSHarlanC     is called. */
84976f65a6SHarlan     session_id: Uuid,
8592df423eSHarlanC     state: ClientSessionState,
8692df423eSHarlanC     client_type: ClientType,
8795feb96cSHarlan     sub_app_name: Option<String>,
8895feb96cSHarlan     sub_stream_name: Option<String>,
898e71d710SHarlan     /*configure how many gops will be cached.*/
908e71d710SHarlan     gop_num: usize,
910bce4e4cSHarlanC }
920bce4e4cSHarlanC 
93fe91dfa7SHarlanC impl ClientSession {
new( stream: TcpStream, client_type: ClientType, raw_domain_name: String, app_name: String, raw_stream_name: String, event_producer: StreamHubEventSender, gop_num: usize, ) -> Self94216fd691SHarlanC     pub fn new(
95216fd691SHarlanC         stream: TcpStream,
96216fd691SHarlanC         client_type: ClientType,
97c8d4d932SHarlan         raw_domain_name: String,
98216fd691SHarlanC         app_name: String,
99c8d4d932SHarlan         raw_stream_name: String,
1008e71d710SHarlan         event_producer: StreamHubEventSender,
1018e71d710SHarlan         gop_num: usize,
102216fd691SHarlanC     ) -> Self {
103976f65a6SHarlan         let remote_addr = if let Ok(addr) = stream.peer_addr() {
104976f65a6SHarlan             log::info!("server session: {}", addr.to_string());
105976f65a6SHarlan             Some(addr)
106976f65a6SHarlan         } else {
107976f65a6SHarlan             None
108976f65a6SHarlan         };
109976f65a6SHarlan 
1108e71d710SHarlan         let tcp_io: Box<dyn TNetIO + Send + Sync> = Box::new(TcpIO::new(stream));
1118e71d710SHarlan         let net_io = Arc::new(Mutex::new(tcp_io));
1128e71d710SHarlan 
1138e71d710SHarlan         let subscriber_id = Uuid::new(RandomDigitCount::Four);
1144d5a5966SHarlanC 
11513bac29aSHarlan         let packetizer = if client_type == ClientType::Publish {
11613bac29aSHarlan             Some(ChunkPacketizer::new(Arc::clone(&net_io)))
11713bac29aSHarlan         } else {
11813bac29aSHarlan             None
11913bac29aSHarlan         };
12013bac29aSHarlan 
12113bac29aSHarlan         let common = Common::new(packetizer, event_producer, SessionType::Client, remote_addr);
122c8d4d932SHarlan 
123c8d4d932SHarlan         let (stream_name, _) = RtmpUrlParser::default()
124c8d4d932SHarlan             .set_raw_stream_name(raw_stream_name.clone())
125c8d4d932SHarlan             .parse_raw_stream_name();
126c8d4d932SHarlan 
127c8d4d932SHarlan         Self {
128c8d4d932SHarlan             io: Arc::clone(&net_io),
129c8d4d932SHarlan             common,
13092df423eSHarlanC             handshaker: SimpleHandshakeClient::new(Arc::clone(&net_io)),
1310bce4e4cSHarlanC             unpacketizer: ChunkUnpacketizer::new(),
132c8d4d932SHarlan             raw_domain_name,
13388d91efdSHarlanC             app_name,
134c8d4d932SHarlan             raw_stream_name,
13588d91efdSHarlanC             stream_name,
136976f65a6SHarlan             session_id: subscriber_id,
137c8d4d932SHarlan             state: ClientSessionState::Handshake,
138c8d4d932SHarlan             client_type,
13995feb96cSHarlan             sub_app_name: None,
14095feb96cSHarlan             sub_stream_name: None,
1418e71d710SHarlan             gop_num,
1420bce4e4cSHarlanC         }
1430bce4e4cSHarlanC     }
1440bce4e4cSHarlanC 
run(&mut self) -> Result<(), SessionError>1450bce4e4cSHarlanC     pub async fn run(&mut self) -> Result<(), SessionError> {
1460bce4e4cSHarlanC         loop {
1470bce4e4cSHarlanC             match self.state {
1480bce4e4cSHarlanC                 ClientSessionState::Handshake => {
14988325f54SHarlanC                     log::info!("[C -> S] handshake...");
1500bce4e4cSHarlanC                     self.handshake().await?;
15137ccd32dSHarlanC                     continue;
1520bce4e4cSHarlanC                 }
1530bce4e4cSHarlanC                 ClientSessionState::Connect => {
15488325f54SHarlanC                     log::info!("[C -> S] connect...");
15535844cc6SHarlanC                     self.send_connect(&(define::TRANSACTION_ID_CONNECT as f64))
15635844cc6SHarlanC                         .await?;
15737ccd32dSHarlanC                     self.state = ClientSessionState::WaitStateChange;
1580bce4e4cSHarlanC                 }
1590bce4e4cSHarlanC                 ClientSessionState::CreateStream => {
16088325f54SHarlanC                     log::info!("[C -> S] CreateStream...");
16135844cc6SHarlanC                     self.send_create_stream(&(define::TRANSACTION_ID_CREATE_STREAM as f64))
16235844cc6SHarlanC                         .await?;
16337ccd32dSHarlanC                     self.state = ClientSessionState::WaitStateChange;
1640bce4e4cSHarlanC                 }
1650bce4e4cSHarlanC                 ClientSessionState::Play => {
16688325f54SHarlanC                     log::info!("[C -> S] Play...");
167c8d4d932SHarlan                     self.send_play(&0.0, &self.raw_stream_name.clone(), &0.0, &0.0, &false)
16835844cc6SHarlanC                         .await?;
16937ccd32dSHarlanC                     self.state = ClientSessionState::WaitStateChange;
1700bce4e4cSHarlanC                 }
1710bce4e4cSHarlanC                 ClientSessionState::PublishingContent => {
17288325f54SHarlanC                     log::info!("[C -> S] PublishingContent...");
173c8d4d932SHarlan                     self.send_publish(&3.0, &self.raw_stream_name.clone(), &"live".to_string())
17435844cc6SHarlanC                         .await?;
17537ccd32dSHarlanC                     self.state = ClientSessionState::WaitStateChange;
1760bce4e4cSHarlanC                 }
17792df423eSHarlanC                 ClientSessionState::StartPublish => {
17888325f54SHarlanC                     log::info!("[C -> S] StartPublish...");
17992df423eSHarlanC                     self.common.send_channel_data().await?;
18092df423eSHarlanC                 }
18137ccd32dSHarlanC                 ClientSessionState::WaitStateChange => {}
1820bce4e4cSHarlanC             }
1830bce4e4cSHarlanC 
18435844cc6SHarlanC             let data = self.io.lock().await.read().await?;
1850bce4e4cSHarlanC             self.unpacketizer.extend_data(&data[..]);
1860bce4e4cSHarlanC 
1872d9c981bSHarlanC             loop {
188c8d4d932SHarlan                 match self.unpacketizer.read_chunks() {
189c8d4d932SHarlan                     Ok(rv) => {
1900ca99c20SHarlan                         if let UnpackResult::Chunks(chunks) = rv {
1912d9c981bSHarlanC                             for chunk_info in chunks.iter() {
192*69de9bbdSHarlanC                                 if let Some(mut msg) =
193*69de9bbdSHarlanC                                     MessageParser::new(chunk_info.clone()).parse()?
194*69de9bbdSHarlanC                                 {
195b97dcde8SHarlanC                                     let timestamp = chunk_info.message_header.timestamp;
196b97dcde8SHarlanC                                     self.process_messages(&mut msg, &timestamp).await?;
1970bce4e4cSHarlanC                                 }
1982d9c981bSHarlanC                             }
199c8d4d932SHarlan                         }
200*69de9bbdSHarlanC                     }
201c8d4d932SHarlan                     Err(err) => {
202c8d4d932SHarlan                         log::trace!("read trunks error: {}", err);
2032d9c981bSHarlanC                         break;
2042d9c981bSHarlanC                     }
2052d9c981bSHarlanC                 }
2060bce4e4cSHarlanC             }
2070bce4e4cSHarlanC         }
208c8d4d932SHarlan     }
2090bce4e4cSHarlanC 
handshake(&mut self) -> Result<(), SessionError>2100bce4e4cSHarlanC     async fn handshake(&mut self) -> Result<(), SessionError> {
2110bce4e4cSHarlanC         loop {
2120bce4e4cSHarlanC             self.handshaker.handshake().await?;
2130bce4e4cSHarlanC             if self.handshaker.state == ClientHandshakeState::Finish {
21488325f54SHarlanC                 log::info!("handshake finish");
2150bce4e4cSHarlanC                 break;
2160bce4e4cSHarlanC             }
2170bce4e4cSHarlanC 
218c8d4d932SHarlan             let mut bytes_len = 0;
219c8d4d932SHarlan             while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE * 2 {
22035844cc6SHarlanC                 let data = self.io.lock().await.read().await?;
221c8d4d932SHarlan                 bytes_len += data.len();
2220bce4e4cSHarlanC                 self.handshaker.extend_data(&data[..]);
2230bce4e4cSHarlanC             }
224c8d4d932SHarlan         }
22537ccd32dSHarlanC 
2260bce4e4cSHarlanC         self.state = ClientSessionState::Connect;
2270bce4e4cSHarlanC 
2280bce4e4cSHarlanC         Ok(())
2290bce4e4cSHarlanC     }
2300bce4e4cSHarlanC 
process_messages( &mut self, msg: &mut RtmpMessageData, timestamp: &u32, ) -> Result<(), SessionError>231fe91dfa7SHarlanC     pub async fn process_messages(
232fe91dfa7SHarlanC         &mut self,
233fe91dfa7SHarlanC         msg: &mut RtmpMessageData,
234b97dcde8SHarlanC         timestamp: &u32,
235fe91dfa7SHarlanC     ) -> Result<(), SessionError> {
2360bce4e4cSHarlanC         match msg {
2374b4045b2SHarlanC             RtmpMessageData::Amf0Command {
2380bce4e4cSHarlanC                 command_name,
2390bce4e4cSHarlanC                 transaction_id,
2400bce4e4cSHarlanC                 command_object,
2410bce4e4cSHarlanC                 others,
24292df423eSHarlanC             } => {
2432d9c981bSHarlanC                 log::info!("[C <- S] on_amf0_command_message...");
244b97dcde8SHarlanC                 self.on_amf0_command_message(command_name, transaction_id, command_object, others)
24592df423eSHarlanC                     .await?
24692df423eSHarlanC             }
2472d9c981bSHarlanC             RtmpMessageData::SetPeerBandwidth { .. } => {
2482d9c981bSHarlanC                 log::info!("[C <- S] on_set_peer_bandwidth...");
249fe91dfa7SHarlanC                 self.on_set_peer_bandwidth().await?
250fe91dfa7SHarlanC             }
2512d9c981bSHarlanC             RtmpMessageData::WindowAcknowledgementSize { .. } => {
2522d9c981bSHarlanC                 log::info!("[C <- S] on_windows_acknowledgement_size...");
2532d9c981bSHarlanC             }
2542d9c981bSHarlanC             RtmpMessageData::SetChunkSize { chunk_size } => {
2552d9c981bSHarlanC                 log::info!("[C <- S] on_set_chunk_size...");
2562d9c981bSHarlanC                 self.on_set_chunk_size(chunk_size)?;
2572d9c981bSHarlanC             }
2582d9c981bSHarlanC             RtmpMessageData::StreamBegin { stream_id } => {
2592d9c981bSHarlanC                 log::info!("[C <- S] on_stream_begin...");
2602d9c981bSHarlanC                 self.on_stream_begin(stream_id)?;
2612d9c981bSHarlanC             }
262b97dcde8SHarlanC             RtmpMessageData::StreamIsRecorded { stream_id } => {
2632d9c981bSHarlanC                 log::info!("[C <- S] on_stream_is_recorded...");
2642d9c981bSHarlanC                 self.on_stream_is_recorded(stream_id)?;
265f9029ceaSHarlanC             }
2668e71d710SHarlan             RtmpMessageData::AudioData { data } => {
2678e71d710SHarlan                 self.common.on_audio_data(data, timestamp).await?
2688e71d710SHarlan             }
2698e71d710SHarlan             RtmpMessageData::VideoData { data } => {
2708e71d710SHarlan                 self.common.on_video_data(data, timestamp).await?
2718e71d710SHarlan             }
27295feb96cSHarlan             RtmpMessageData::AmfData { raw_data } => {
2738e71d710SHarlan                 self.common.on_meta_data(raw_data, timestamp).await?;
27495feb96cSHarlan             }
2750bce4e4cSHarlanC 
2760bce4e4cSHarlanC             _ => {}
2770bce4e4cSHarlanC         }
2780bce4e4cSHarlanC         Ok(())
2790bce4e4cSHarlanC     }
2800bce4e4cSHarlanC 
on_amf0_command_message( &mut self, command_name: &Amf0ValueType, transaction_id: &Amf0ValueType, command_object: &Amf0ValueType, others: &mut Vec<Amf0ValueType>, ) -> Result<(), SessionError>281b97dcde8SHarlanC     pub async fn on_amf0_command_message(
2820bce4e4cSHarlanC         &mut self,
2830bce4e4cSHarlanC         command_name: &Amf0ValueType,
2840bce4e4cSHarlanC         transaction_id: &Amf0ValueType,
2850bce4e4cSHarlanC         command_object: &Amf0ValueType,
2860bce4e4cSHarlanC         others: &mut Vec<Amf0ValueType>,
2870bce4e4cSHarlanC     ) -> Result<(), SessionError> {
288c8d4d932SHarlan         log::info!("[C <- S] on_amf0_command_message...");
2890bce4e4cSHarlanC         let empty_cmd_name = &String::new();
2900bce4e4cSHarlanC         let cmd_name = match command_name {
2910bce4e4cSHarlanC             Amf0ValueType::UTF8String(str) => str,
2920bce4e4cSHarlanC             _ => empty_cmd_name,
2930bce4e4cSHarlanC         };
2940bce4e4cSHarlanC 
2950bce4e4cSHarlanC         let transaction_id = match transaction_id {
29685c0af6aSLuca Barbato             Amf0ValueType::Number(number) => *number as u8,
2970bce4e4cSHarlanC             _ => 0,
2980bce4e4cSHarlanC         };
2990bce4e4cSHarlanC 
300c8d4d932SHarlan         let empty_cmd_obj: IndexMap<String, Amf0ValueType> = IndexMap::new();
301f9029ceaSHarlanC         let _ = match command_object {
3020bce4e4cSHarlanC             Amf0ValueType::Object(obj) => obj,
3030bce4e4cSHarlanC             // Amf0ValueType::Null =>
3040bce4e4cSHarlanC             _ => &empty_cmd_obj,
3050bce4e4cSHarlanC         };
3060bce4e4cSHarlanC 
3070bce4e4cSHarlanC         match cmd_name.as_str() {
30816394c08SHarlanC             "_result" => match transaction_id {
3090bce4e4cSHarlanC                 define::TRANSACTION_ID_CONNECT => {
3102d9c981bSHarlanC                     log::info!("[C <- S] on_result_connect...");
31192df423eSHarlanC                     self.on_result_connect().await?;
3120bce4e4cSHarlanC                 }
3130bce4e4cSHarlanC                 define::TRANSACTION_ID_CREATE_STREAM => {
3142d9c981bSHarlanC                     log::info!("[C <- S] on_result_create_stream...");
3150bce4e4cSHarlanC                     self.on_result_create_stream()?;
3160bce4e4cSHarlanC                 }
3170bce4e4cSHarlanC                 _ => {}
3180bce4e4cSHarlanC             },
3190bce4e4cSHarlanC             "_error" => {
3200bce4e4cSHarlanC                 self.on_error()?;
3210bce4e4cSHarlanC             }
3220bce4e4cSHarlanC             "onStatus" => {
3230bce4e4cSHarlanC                 match others.remove(0) {
32492df423eSHarlanC                     Amf0ValueType::Object(obj) => self.on_status(&obj).await?,
325f9029ceaSHarlanC                     _ => {
326f9029ceaSHarlanC                         return Err(SessionError {
3270bce4e4cSHarlanC                             value: SessionErrorValue::Amf0ValueCountNotCorrect,
328f9029ceaSHarlanC                         })
329f9029ceaSHarlanC                     }
3300bce4e4cSHarlanC                 };
3310bce4e4cSHarlanC             }
3320bce4e4cSHarlanC 
3330bce4e4cSHarlanC             _ => {}
3340bce4e4cSHarlanC         }
3350bce4e4cSHarlanC 
3360bce4e4cSHarlanC         Ok(())
3370bce4e4cSHarlanC     }
3380bce4e4cSHarlanC 
send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError>33935844cc6SHarlanC     pub async fn send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
34092df423eSHarlanC         self.send_set_chunk_size().await?;
3410bce4e4cSHarlanC 
3422f7fa101SHarlanC         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
343d92d519bSHarlanC         let mut properties = ConnectProperties::new_none();
344d92d519bSHarlanC 
345c8d4d932SHarlan         let url = format!(
346c8d4d932SHarlan             "rtmp://{domain_name}/{app_name}",
347c8d4d932SHarlan             domain_name = self.raw_domain_name,
348c8d4d932SHarlan             app_name = self.app_name
349c8d4d932SHarlan         );
350d92d519bSHarlanC         properties.app = Some(self.app_name.clone());
351d92d519bSHarlanC 
352d92d519bSHarlanC         match self.client_type {
353d92d519bSHarlanC             ClientType::Play => {
354c8d4d932SHarlan                 properties.flash_ver = Some("LNX 9,0,124,2".to_string());
355c8d4d932SHarlan                 properties.tc_url = Some(url.clone());
356d92d519bSHarlanC                 properties.fpad = Some(false);
357d92d519bSHarlanC                 properties.capabilities = Some(15_f64);
358c8d4d932SHarlan                 properties.audio_codecs = Some(4071_f64);
359d92d519bSHarlanC                 properties.video_codecs = Some(252_f64);
360d92d519bSHarlanC                 properties.video_function = Some(1_f64);
361d92d519bSHarlanC             }
362c8d4d932SHarlan             ClientType::Publish => {
363c8d4d932SHarlan                 properties.pub_type = Some("nonprivate".to_string());
364c8d4d932SHarlan                 properties.flash_ver = Some("FMLE/3.0 (compatible; xiu)".to_string());
365c8d4d932SHarlan                 properties.fpad = Some(false);
366c8d4d932SHarlan                 properties.tc_url = Some(url.clone());
367c8d4d932SHarlan             }
368d92d519bSHarlanC         }
369d92d519bSHarlanC 
3702d9c981bSHarlanC         netconnection
3712d9c981bSHarlanC             .write_connect(transaction_id, &properties)
3722d9c981bSHarlanC             .await?;
3730bce4e4cSHarlanC 
3740bce4e4cSHarlanC         Ok(())
3750bce4e4cSHarlanC     }
3760bce4e4cSHarlanC 
send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError>37735844cc6SHarlanC     pub async fn send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
3782f7fa101SHarlanC         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
3792f7fa101SHarlanC         netconnection.write_create_stream(transaction_id).await?;
3800bce4e4cSHarlanC 
3810bce4e4cSHarlanC         Ok(())
3820bce4e4cSHarlanC     }
3830bce4e4cSHarlanC 
send_delete_stream( &mut self, transaction_id: &f64, stream_id: &f64, ) -> Result<(), SessionError>38435844cc6SHarlanC     pub async fn send_delete_stream(
3850bce4e4cSHarlanC         &mut self,
3860bce4e4cSHarlanC         transaction_id: &f64,
3870bce4e4cSHarlanC         stream_id: &f64,
3880bce4e4cSHarlanC     ) -> Result<(), SessionError> {
3898842de45SHarlanC         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
3902f7fa101SHarlanC         netstream
3912f7fa101SHarlanC             .write_delete_stream(transaction_id, stream_id)
3922f7fa101SHarlanC             .await?;
3930bce4e4cSHarlanC 
3940bce4e4cSHarlanC         Ok(())
3950bce4e4cSHarlanC     }
3960bce4e4cSHarlanC 
send_publish( &mut self, transaction_id: &f64, stream_name: &String, stream_type: &String, ) -> Result<(), SessionError>39735844cc6SHarlanC     pub async fn send_publish(
3980bce4e4cSHarlanC         &mut self,
3990bce4e4cSHarlanC         transaction_id: &f64,
4000bce4e4cSHarlanC         stream_name: &String,
4010bce4e4cSHarlanC         stream_type: &String,
4020bce4e4cSHarlanC     ) -> Result<(), SessionError> {
4038842de45SHarlanC         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
404383722b2SHarlanC         netstream
4052f7fa101SHarlanC             .write_publish(transaction_id, stream_name, stream_type)
406383722b2SHarlanC             .await?;
4070bce4e4cSHarlanC 
4080bce4e4cSHarlanC         Ok(())
4090bce4e4cSHarlanC     }
4100bce4e4cSHarlanC 
send_play( &mut self, transaction_id: &f64, stream_name: &String, start: &f64, duration: &f64, reset: &bool, ) -> Result<(), SessionError>41135844cc6SHarlanC     pub async fn send_play(
4120bce4e4cSHarlanC         &mut self,
4130bce4e4cSHarlanC         transaction_id: &f64,
4140bce4e4cSHarlanC         stream_name: &String,
4150bce4e4cSHarlanC         start: &f64,
4160bce4e4cSHarlanC         duration: &f64,
4170bce4e4cSHarlanC         reset: &bool,
4180bce4e4cSHarlanC     ) -> Result<(), SessionError> {
4198842de45SHarlanC         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
420383722b2SHarlanC         netstream
4212f7fa101SHarlanC             .write_play(transaction_id, stream_name, start, duration, reset)
422383722b2SHarlanC             .await?;
4230bce4e4cSHarlanC 
424c8d4d932SHarlan         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
425c8d4d932SHarlan         netconnection
426c8d4d932SHarlan             .write_get_stream_length(transaction_id, stream_name)
427c8d4d932SHarlan             .await?;
428c8d4d932SHarlan 
4298e71d710SHarlan         self.send_set_buffer_length(1, 1300).await?;
430c8d4d932SHarlan 
4310bce4e4cSHarlanC         Ok(())
4320bce4e4cSHarlanC     }
4330bce4e4cSHarlanC 
send_set_chunk_size(&mut self) -> Result<(), SessionError>434e0acb0c0SHarlanC     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
435383722b2SHarlanC         let mut controlmessage =
436383722b2SHarlanC             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
437e0acb0c0SHarlanC         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
4380bce4e4cSHarlanC         Ok(())
4390bce4e4cSHarlanC     }
4400bce4e4cSHarlanC 
send_window_acknowledgement_size( &mut self, window_size: u32, ) -> Result<(), SessionError>441e0acb0c0SHarlanC     pub async fn send_window_acknowledgement_size(
4420bce4e4cSHarlanC         &mut self,
4430bce4e4cSHarlanC         window_size: u32,
4440bce4e4cSHarlanC     ) -> Result<(), SessionError> {
445383722b2SHarlanC         let mut controlmessage =
446383722b2SHarlanC             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
447e0acb0c0SHarlanC         controlmessage
448e0acb0c0SHarlanC             .write_window_acknowledgement_size(window_size)
449e0acb0c0SHarlanC             .await?;
4500bce4e4cSHarlanC         Ok(())
4510bce4e4cSHarlanC     }
4520bce4e4cSHarlanC 
send_set_buffer_length( &mut self, stream_id: u32, ms: u32, ) -> Result<(), SessionError>4534d5a5966SHarlanC     pub async fn send_set_buffer_length(
4544d5a5966SHarlanC         &mut self,
4554d5a5966SHarlanC         stream_id: u32,
4564d5a5966SHarlanC         ms: u32,
4574d5a5966SHarlanC     ) -> Result<(), SessionError> {
45897f0b5afSHarlanC         let mut eventmessages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
45997f0b5afSHarlanC         eventmessages.write_set_buffer_length(stream_id, ms).await?;
4600bce4e4cSHarlanC 
4610bce4e4cSHarlanC         Ok(())
4620bce4e4cSHarlanC     }
4630bce4e4cSHarlanC 
on_result_connect(&mut self) -> Result<(), SessionError>46492df423eSHarlanC     pub async fn on_result_connect(&mut self) -> Result<(), SessionError> {
46592df423eSHarlanC         let mut controlmessage =
46692df423eSHarlanC             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
46792df423eSHarlanC         controlmessage.write_acknowledgement(3107).await?;
46892df423eSHarlanC 
4698842de45SHarlanC         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
47092df423eSHarlanC         netstream
4712f7fa101SHarlanC             .write_release_stream(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name)
47292df423eSHarlanC             .await?;
47392df423eSHarlanC         netstream
4742f7fa101SHarlanC             .write_fcpublish(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name)
47592df423eSHarlanC             .await?;
47692df423eSHarlanC 
4770bce4e4cSHarlanC         self.state = ClientSessionState::CreateStream;
47892df423eSHarlanC 
4790bce4e4cSHarlanC         Ok(())
4800bce4e4cSHarlanC     }
4810bce4e4cSHarlanC 
on_result_create_stream(&mut self) -> Result<(), SessionError>4820bce4e4cSHarlanC     pub fn on_result_create_stream(&mut self) -> Result<(), SessionError> {
4830bce4e4cSHarlanC         match self.client_type {
4840bce4e4cSHarlanC             ClientType::Play => {
4850bce4e4cSHarlanC                 self.state = ClientSessionState::Play;
4860bce4e4cSHarlanC             }
4870bce4e4cSHarlanC             ClientType::Publish => {
4880bce4e4cSHarlanC                 self.state = ClientSessionState::PublishingContent;
4890bce4e4cSHarlanC             }
4900bce4e4cSHarlanC         }
4910bce4e4cSHarlanC         Ok(())
4920bce4e4cSHarlanC     }
4930bce4e4cSHarlanC 
on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError>4940bce4e4cSHarlanC     pub fn on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError> {
4950bce4e4cSHarlanC         self.unpacketizer
49685c0af6aSLuca Barbato             .update_max_chunk_size(*chunk_size as usize);
4970bce4e4cSHarlanC         Ok(())
4980bce4e4cSHarlanC     }
4990bce4e4cSHarlanC 
on_stream_is_recorded(&mut self, stream_id: &mut u32) -> Result<(), SessionError>500b97dcde8SHarlanC     pub fn on_stream_is_recorded(&mut self, stream_id: &mut u32) -> Result<(), SessionError> {
50188325f54SHarlanC         log::trace!("stream is recorded stream_id is {}", stream_id);
502b97dcde8SHarlanC         Ok(())
503b97dcde8SHarlanC     }
504b97dcde8SHarlanC 
on_stream_begin(&mut self, stream_id: &mut u32) -> Result<(), SessionError>505b97dcde8SHarlanC     pub fn on_stream_begin(&mut self, stream_id: &mut u32) -> Result<(), SessionError> {
50688325f54SHarlanC         log::trace!("stream is begin stream_id is {}", stream_id);
507b97dcde8SHarlanC         Ok(())
508b97dcde8SHarlanC     }
509b97dcde8SHarlanC 
on_set_peer_bandwidth(&mut self) -> Result<(), SessionError>510e0acb0c0SHarlanC     pub async fn on_set_peer_bandwidth(&mut self) -> Result<(), SessionError> {
511c8d4d932SHarlan         self.send_window_acknowledgement_size(5000000).await?;
512c8d4d932SHarlan 
5130bce4e4cSHarlanC         Ok(())
5140bce4e4cSHarlanC     }
51592df423eSHarlanC 
on_error(&mut self) -> Result<(), SessionError>5160bce4e4cSHarlanC     pub fn on_error(&mut self) -> Result<(), SessionError> {
5170bce4e4cSHarlanC         Ok(())
5180bce4e4cSHarlanC     }
5190bce4e4cSHarlanC 
on_status( &mut self, obj: &IndexMap<String, Amf0ValueType>, ) -> Result<(), SessionError>52092df423eSHarlanC     pub async fn on_status(
52192df423eSHarlanC         &mut self,
522c8d4d932SHarlan         obj: &IndexMap<String, Amf0ValueType>,
52392df423eSHarlanC     ) -> Result<(), SessionError> {
52492df423eSHarlanC         if let Some(Amf0ValueType::UTF8String(code_info)) = obj.get("code") {
525b97dcde8SHarlanC             match &code_info[..] {
526b97dcde8SHarlanC                 "NetStream.Publish.Start" => {
52792df423eSHarlanC                     self.state = ClientSessionState::StartPublish;
52895feb96cSHarlan                     //subscribe from local session and publish to remote rtmp server
52995feb96cSHarlan                     if let (Some(app_name), Some(stream_name)) =
53095feb96cSHarlan                         (&self.sub_app_name, &self.sub_stream_name)
53195feb96cSHarlan                     {
53295feb96cSHarlan                         self.common
53395feb96cSHarlan                             .subscribe_from_channels(
53495feb96cSHarlan                                 app_name.clone(),
53595feb96cSHarlan                                 stream_name.clone(),
53695feb96cSHarlan                                 self.session_id,
53795feb96cSHarlan                             )
53895feb96cSHarlan                             .await?;
53995feb96cSHarlan                     } else {
54092df423eSHarlanC                         self.common
54192df423eSHarlanC                             .subscribe_from_channels(
54292df423eSHarlanC                                 self.app_name.clone(),
54392df423eSHarlanC                                 self.stream_name.clone(),
544976f65a6SHarlan                                 self.session_id,
54592df423eSHarlanC                             )
54692df423eSHarlanC                             .await?;
54792df423eSHarlanC                     }
54895feb96cSHarlan                 }
549b97dcde8SHarlanC                 "NetStream.Publish.Reset" => {}
5503cf13c0aSHarlanC                 "NetStream.Play.Start" => {
55195feb96cSHarlan                     //pull from remote rtmp server and publish to local session
5523cf13c0aSHarlanC                     self.common
553976f65a6SHarlan                         .publish_to_channels(
554976f65a6SHarlan                             self.app_name.clone(),
555976f65a6SHarlan                             self.stream_name.clone(),
556976f65a6SHarlan                             self.session_id,
5578e71d710SHarlan                             self.gop_num,
558976f65a6SHarlan                         )
5593cf13c0aSHarlanC                         .await?
5603cf13c0aSHarlanC                 }
561b97dcde8SHarlanC                 _ => {}
56292df423eSHarlanC             }
56392df423eSHarlanC         }
56488325f54SHarlanC         log::trace!("{}", obj.len());
5650bce4e4cSHarlanC         Ok(())
5660bce4e4cSHarlanC     }
56795feb96cSHarlan 
subscribe(&mut self, app_name: String, stream_name: String)568c8d4d932SHarlan     pub fn subscribe(&mut self, app_name: String, stream_name: String) {
569ded9d193SHarlan         self.sub_app_name = Some(app_name);
570ded9d193SHarlan         self.sub_stream_name = Some(stream_name);
57195feb96cSHarlan     }
5720bce4e4cSHarlanC }
573