113bac29aSHarlan use crate::chunk::packetizer::ChunkPacketizer; 213bac29aSHarlan 345abfa1fSHarlanC use { 445abfa1fSHarlanC super::{ 592df423eSHarlanC common::Common, 645abfa1fSHarlanC define, 716394c08SHarlanC define::SessionType, 845abfa1fSHarlanC errors::{SessionError, SessionErrorValue}, 945abfa1fSHarlanC }, 1045abfa1fSHarlanC crate::{ 1145abfa1fSHarlanC amf0::Amf0ValueType, 1245abfa1fSHarlanC chunk::{ 132f7fa101SHarlanC define::CHUNK_SIZE, 1445abfa1fSHarlanC unpacketizer::{ChunkUnpacketizer, UnpackResult}, 1545abfa1fSHarlanC }, 16c8d4d932SHarlan handshake, 172d9c981bSHarlanC handshake::{define::ClientHandshakeState, handshake_client::SimpleHandshakeClient}, 182f7fa101SHarlanC messages::{define::RtmpMessageData, parser::MessageParser}, 192f7fa101SHarlanC netconnection::writer::{ConnectProperties, NetConnection}, 2045abfa1fSHarlanC netstream::writer::NetStreamWriter, 2145abfa1fSHarlanC protocol_control_messages::writer::ProtocolControlMessagesWriter, 2245abfa1fSHarlanC user_control_messages::writer::EventMessagesWriter, 23c8d4d932SHarlan utils::RtmpUrlParser, 2445abfa1fSHarlanC }, 258e71d710SHarlan bytesio::{ 268e71d710SHarlan bytes_writer::AsyncBytesWriter, 278e71d710SHarlan bytesio::{TNetIO, TcpIO}, 288e71d710SHarlan }, 29c8d4d932SHarlan indexmap::IndexMap, 30c8d4d932SHarlan std::sync::Arc, 318e71d710SHarlan //crate::utils::print::print, 328e71d710SHarlan streamhub::define::StreamHubEventSender, 338e71d710SHarlan streamhub::utils::RandomDigitCount, 348e71d710SHarlan streamhub::utils::Uuid, 3545abfa1fSHarlanC tokio::{net::TcpStream, sync::Mutex}, 3645abfa1fSHarlanC }; 374d5a5966SHarlanC 38f9029ceaSHarlanC #[allow(dead_code)] 390bce4e4cSHarlanC enum ClientSessionState { 400bce4e4cSHarlanC Handshake, 410bce4e4cSHarlanC Connect, 420bce4e4cSHarlanC CreateStream, 430bce4e4cSHarlanC Play, 440bce4e4cSHarlanC PublishingContent, 4592df423eSHarlanC StartPublish, 4637ccd32dSHarlanC WaitStateChange, 470bce4e4cSHarlanC } 480bce4e4cSHarlanC 49f9029ceaSHarlanC #[allow(dead_code)] 500bce4e4cSHarlanC enum ClientSessionPlayState { 510bce4e4cSHarlanC Handshake, 520bce4e4cSHarlanC Connect, 530bce4e4cSHarlanC CreateStream, 540bce4e4cSHarlanC Play, 550bce4e4cSHarlanC } 560bce4e4cSHarlanC 57f9029ceaSHarlanC #[allow(dead_code)] 580bce4e4cSHarlanC enum ClientSessionPublishState { 590bce4e4cSHarlanC Handshake, 600bce4e4cSHarlanC Connect, 610bce4e4cSHarlanC CreateStream, 620bce4e4cSHarlanC PublishingContent, 630bce4e4cSHarlanC } 64f9029ceaSHarlanC #[allow(dead_code)] 6513bac29aSHarlan #[derive(Clone, Debug, PartialEq)] 66216fd691SHarlanC pub enum ClientType { 670bce4e4cSHarlanC Play, 680bce4e4cSHarlanC Publish, 690bce4e4cSHarlanC } 70fe91dfa7SHarlanC pub struct ClientSession { 718e71d710SHarlan io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, 7292df423eSHarlanC common: Common, 7392df423eSHarlanC handshaker: SimpleHandshakeClient, 740bce4e4cSHarlanC unpacketizer: ChunkUnpacketizer, 75c8d4d932SHarlan //domain name with port 76c8d4d932SHarlan raw_domain_name: String, 77216fd691SHarlanC app_name: String, 78c8d4d932SHarlan //stream name with parameters 79c8d4d932SHarlan raw_stream_name: String, 806ae9716eSHarlanC stream_name: String, 8188d91efdSHarlanC /* Used to mark the subscriber's the data producer 8288d91efdSHarlanC in channels and delete it from map when unsubscribe 8388d91efdSHarlanC is called. */ 84976f65a6SHarlan session_id: Uuid, 8592df423eSHarlanC state: ClientSessionState, 8692df423eSHarlanC client_type: ClientType, 8795feb96cSHarlan sub_app_name: Option<String>, 8895feb96cSHarlan sub_stream_name: Option<String>, 898e71d710SHarlan /*configure how many gops will be cached.*/ 908e71d710SHarlan gop_num: usize, 910bce4e4cSHarlanC } 920bce4e4cSHarlanC 93fe91dfa7SHarlanC impl ClientSession { new( stream: TcpStream, client_type: ClientType, raw_domain_name: String, app_name: String, raw_stream_name: String, event_producer: StreamHubEventSender, gop_num: usize, ) -> Self94216fd691SHarlanC pub fn new( 95216fd691SHarlanC stream: TcpStream, 96216fd691SHarlanC client_type: ClientType, 97c8d4d932SHarlan raw_domain_name: String, 98216fd691SHarlanC app_name: String, 99c8d4d932SHarlan raw_stream_name: String, 1008e71d710SHarlan event_producer: StreamHubEventSender, 1018e71d710SHarlan gop_num: usize, 102216fd691SHarlanC ) -> Self { 103976f65a6SHarlan let remote_addr = if let Ok(addr) = stream.peer_addr() { 104976f65a6SHarlan log::info!("server session: {}", addr.to_string()); 105976f65a6SHarlan Some(addr) 106976f65a6SHarlan } else { 107976f65a6SHarlan None 108976f65a6SHarlan }; 109976f65a6SHarlan 1108e71d710SHarlan let tcp_io: Box<dyn TNetIO + Send + Sync> = Box::new(TcpIO::new(stream)); 1118e71d710SHarlan let net_io = Arc::new(Mutex::new(tcp_io)); 1128e71d710SHarlan 1138e71d710SHarlan let subscriber_id = Uuid::new(RandomDigitCount::Four); 1144d5a5966SHarlanC 11513bac29aSHarlan let packetizer = if client_type == ClientType::Publish { 11613bac29aSHarlan Some(ChunkPacketizer::new(Arc::clone(&net_io))) 11713bac29aSHarlan } else { 11813bac29aSHarlan None 11913bac29aSHarlan }; 12013bac29aSHarlan 12113bac29aSHarlan let common = Common::new(packetizer, event_producer, SessionType::Client, remote_addr); 122c8d4d932SHarlan 123c8d4d932SHarlan let (stream_name, _) = RtmpUrlParser::default() 124c8d4d932SHarlan .set_raw_stream_name(raw_stream_name.clone()) 125c8d4d932SHarlan .parse_raw_stream_name(); 126c8d4d932SHarlan 127c8d4d932SHarlan Self { 128c8d4d932SHarlan io: Arc::clone(&net_io), 129c8d4d932SHarlan common, 13092df423eSHarlanC handshaker: SimpleHandshakeClient::new(Arc::clone(&net_io)), 1310bce4e4cSHarlanC unpacketizer: ChunkUnpacketizer::new(), 132c8d4d932SHarlan raw_domain_name, 13388d91efdSHarlanC app_name, 134c8d4d932SHarlan raw_stream_name, 13588d91efdSHarlanC stream_name, 136976f65a6SHarlan session_id: subscriber_id, 137c8d4d932SHarlan state: ClientSessionState::Handshake, 138c8d4d932SHarlan client_type, 13995feb96cSHarlan sub_app_name: None, 14095feb96cSHarlan sub_stream_name: None, 1418e71d710SHarlan gop_num, 1420bce4e4cSHarlanC } 1430bce4e4cSHarlanC } 1440bce4e4cSHarlanC run(&mut self) -> Result<(), SessionError>1450bce4e4cSHarlanC pub async fn run(&mut self) -> Result<(), SessionError> { 1460bce4e4cSHarlanC loop { 1470bce4e4cSHarlanC match self.state { 1480bce4e4cSHarlanC ClientSessionState::Handshake => { 14988325f54SHarlanC log::info!("[C -> S] handshake..."); 1500bce4e4cSHarlanC self.handshake().await?; 15137ccd32dSHarlanC continue; 1520bce4e4cSHarlanC } 1530bce4e4cSHarlanC ClientSessionState::Connect => { 15488325f54SHarlanC log::info!("[C -> S] connect..."); 15535844cc6SHarlanC self.send_connect(&(define::TRANSACTION_ID_CONNECT as f64)) 15635844cc6SHarlanC .await?; 15737ccd32dSHarlanC self.state = ClientSessionState::WaitStateChange; 1580bce4e4cSHarlanC } 1590bce4e4cSHarlanC ClientSessionState::CreateStream => { 16088325f54SHarlanC log::info!("[C -> S] CreateStream..."); 16135844cc6SHarlanC self.send_create_stream(&(define::TRANSACTION_ID_CREATE_STREAM as f64)) 16235844cc6SHarlanC .await?; 16337ccd32dSHarlanC self.state = ClientSessionState::WaitStateChange; 1640bce4e4cSHarlanC } 1650bce4e4cSHarlanC ClientSessionState::Play => { 16688325f54SHarlanC log::info!("[C -> S] Play..."); 167c8d4d932SHarlan self.send_play(&0.0, &self.raw_stream_name.clone(), &0.0, &0.0, &false) 16835844cc6SHarlanC .await?; 16937ccd32dSHarlanC self.state = ClientSessionState::WaitStateChange; 1700bce4e4cSHarlanC } 1710bce4e4cSHarlanC ClientSessionState::PublishingContent => { 17288325f54SHarlanC log::info!("[C -> S] PublishingContent..."); 173c8d4d932SHarlan self.send_publish(&3.0, &self.raw_stream_name.clone(), &"live".to_string()) 17435844cc6SHarlanC .await?; 17537ccd32dSHarlanC self.state = ClientSessionState::WaitStateChange; 1760bce4e4cSHarlanC } 17792df423eSHarlanC ClientSessionState::StartPublish => { 17888325f54SHarlanC log::info!("[C -> S] StartPublish..."); 17992df423eSHarlanC self.common.send_channel_data().await?; 18092df423eSHarlanC } 18137ccd32dSHarlanC ClientSessionState::WaitStateChange => {} 1820bce4e4cSHarlanC } 1830bce4e4cSHarlanC 18435844cc6SHarlanC let data = self.io.lock().await.read().await?; 1850bce4e4cSHarlanC self.unpacketizer.extend_data(&data[..]); 1860bce4e4cSHarlanC 1872d9c981bSHarlanC loop { 188c8d4d932SHarlan match self.unpacketizer.read_chunks() { 189c8d4d932SHarlan Ok(rv) => { 1900ca99c20SHarlan if let UnpackResult::Chunks(chunks) = rv { 1912d9c981bSHarlanC for chunk_info in chunks.iter() { 192*69de9bbdSHarlanC if let Some(mut msg) = 193*69de9bbdSHarlanC MessageParser::new(chunk_info.clone()).parse()? 194*69de9bbdSHarlanC { 195b97dcde8SHarlanC let timestamp = chunk_info.message_header.timestamp; 196b97dcde8SHarlanC self.process_messages(&mut msg, ×tamp).await?; 1970bce4e4cSHarlanC } 1982d9c981bSHarlanC } 199c8d4d932SHarlan } 200*69de9bbdSHarlanC } 201c8d4d932SHarlan Err(err) => { 202c8d4d932SHarlan log::trace!("read trunks error: {}", err); 2032d9c981bSHarlanC break; 2042d9c981bSHarlanC } 2052d9c981bSHarlanC } 2060bce4e4cSHarlanC } 2070bce4e4cSHarlanC } 208c8d4d932SHarlan } 2090bce4e4cSHarlanC handshake(&mut self) -> Result<(), SessionError>2100bce4e4cSHarlanC async fn handshake(&mut self) -> Result<(), SessionError> { 2110bce4e4cSHarlanC loop { 2120bce4e4cSHarlanC self.handshaker.handshake().await?; 2130bce4e4cSHarlanC if self.handshaker.state == ClientHandshakeState::Finish { 21488325f54SHarlanC log::info!("handshake finish"); 2150bce4e4cSHarlanC break; 2160bce4e4cSHarlanC } 2170bce4e4cSHarlanC 218c8d4d932SHarlan let mut bytes_len = 0; 219c8d4d932SHarlan while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE * 2 { 22035844cc6SHarlanC let data = self.io.lock().await.read().await?; 221c8d4d932SHarlan bytes_len += data.len(); 2220bce4e4cSHarlanC self.handshaker.extend_data(&data[..]); 2230bce4e4cSHarlanC } 224c8d4d932SHarlan } 22537ccd32dSHarlanC 2260bce4e4cSHarlanC self.state = ClientSessionState::Connect; 2270bce4e4cSHarlanC 2280bce4e4cSHarlanC Ok(()) 2290bce4e4cSHarlanC } 2300bce4e4cSHarlanC process_messages( &mut self, msg: &mut RtmpMessageData, timestamp: &u32, ) -> Result<(), SessionError>231fe91dfa7SHarlanC pub async fn process_messages( 232fe91dfa7SHarlanC &mut self, 233fe91dfa7SHarlanC msg: &mut RtmpMessageData, 234b97dcde8SHarlanC timestamp: &u32, 235fe91dfa7SHarlanC ) -> Result<(), SessionError> { 2360bce4e4cSHarlanC match msg { 2374b4045b2SHarlanC RtmpMessageData::Amf0Command { 2380bce4e4cSHarlanC command_name, 2390bce4e4cSHarlanC transaction_id, 2400bce4e4cSHarlanC command_object, 2410bce4e4cSHarlanC others, 24292df423eSHarlanC } => { 2432d9c981bSHarlanC log::info!("[C <- S] on_amf0_command_message..."); 244b97dcde8SHarlanC self.on_amf0_command_message(command_name, transaction_id, command_object, others) 24592df423eSHarlanC .await? 24692df423eSHarlanC } 2472d9c981bSHarlanC RtmpMessageData::SetPeerBandwidth { .. } => { 2482d9c981bSHarlanC log::info!("[C <- S] on_set_peer_bandwidth..."); 249fe91dfa7SHarlanC self.on_set_peer_bandwidth().await? 250fe91dfa7SHarlanC } 2512d9c981bSHarlanC RtmpMessageData::WindowAcknowledgementSize { .. } => { 2522d9c981bSHarlanC log::info!("[C <- S] on_windows_acknowledgement_size..."); 2532d9c981bSHarlanC } 2542d9c981bSHarlanC RtmpMessageData::SetChunkSize { chunk_size } => { 2552d9c981bSHarlanC log::info!("[C <- S] on_set_chunk_size..."); 2562d9c981bSHarlanC self.on_set_chunk_size(chunk_size)?; 2572d9c981bSHarlanC } 2582d9c981bSHarlanC RtmpMessageData::StreamBegin { stream_id } => { 2592d9c981bSHarlanC log::info!("[C <- S] on_stream_begin..."); 2602d9c981bSHarlanC self.on_stream_begin(stream_id)?; 2612d9c981bSHarlanC } 262b97dcde8SHarlanC RtmpMessageData::StreamIsRecorded { stream_id } => { 2632d9c981bSHarlanC log::info!("[C <- S] on_stream_is_recorded..."); 2642d9c981bSHarlanC self.on_stream_is_recorded(stream_id)?; 265f9029ceaSHarlanC } 2668e71d710SHarlan RtmpMessageData::AudioData { data } => { 2678e71d710SHarlan self.common.on_audio_data(data, timestamp).await? 2688e71d710SHarlan } 2698e71d710SHarlan RtmpMessageData::VideoData { data } => { 2708e71d710SHarlan self.common.on_video_data(data, timestamp).await? 2718e71d710SHarlan } 27295feb96cSHarlan RtmpMessageData::AmfData { raw_data } => { 2738e71d710SHarlan self.common.on_meta_data(raw_data, timestamp).await?; 27495feb96cSHarlan } 2750bce4e4cSHarlanC 2760bce4e4cSHarlanC _ => {} 2770bce4e4cSHarlanC } 2780bce4e4cSHarlanC Ok(()) 2790bce4e4cSHarlanC } 2800bce4e4cSHarlanC on_amf0_command_message( &mut self, command_name: &Amf0ValueType, transaction_id: &Amf0ValueType, command_object: &Amf0ValueType, others: &mut Vec<Amf0ValueType>, ) -> Result<(), SessionError>281b97dcde8SHarlanC pub async fn on_amf0_command_message( 2820bce4e4cSHarlanC &mut self, 2830bce4e4cSHarlanC command_name: &Amf0ValueType, 2840bce4e4cSHarlanC transaction_id: &Amf0ValueType, 2850bce4e4cSHarlanC command_object: &Amf0ValueType, 2860bce4e4cSHarlanC others: &mut Vec<Amf0ValueType>, 2870bce4e4cSHarlanC ) -> Result<(), SessionError> { 288c8d4d932SHarlan log::info!("[C <- S] on_amf0_command_message..."); 2890bce4e4cSHarlanC let empty_cmd_name = &String::new(); 2900bce4e4cSHarlanC let cmd_name = match command_name { 2910bce4e4cSHarlanC Amf0ValueType::UTF8String(str) => str, 2920bce4e4cSHarlanC _ => empty_cmd_name, 2930bce4e4cSHarlanC }; 2940bce4e4cSHarlanC 2950bce4e4cSHarlanC let transaction_id = match transaction_id { 29685c0af6aSLuca Barbato Amf0ValueType::Number(number) => *number as u8, 2970bce4e4cSHarlanC _ => 0, 2980bce4e4cSHarlanC }; 2990bce4e4cSHarlanC 300c8d4d932SHarlan let empty_cmd_obj: IndexMap<String, Amf0ValueType> = IndexMap::new(); 301f9029ceaSHarlanC let _ = match command_object { 3020bce4e4cSHarlanC Amf0ValueType::Object(obj) => obj, 3030bce4e4cSHarlanC // Amf0ValueType::Null => 3040bce4e4cSHarlanC _ => &empty_cmd_obj, 3050bce4e4cSHarlanC }; 3060bce4e4cSHarlanC 3070bce4e4cSHarlanC match cmd_name.as_str() { 30816394c08SHarlanC "_result" => match transaction_id { 3090bce4e4cSHarlanC define::TRANSACTION_ID_CONNECT => { 3102d9c981bSHarlanC log::info!("[C <- S] on_result_connect..."); 31192df423eSHarlanC self.on_result_connect().await?; 3120bce4e4cSHarlanC } 3130bce4e4cSHarlanC define::TRANSACTION_ID_CREATE_STREAM => { 3142d9c981bSHarlanC log::info!("[C <- S] on_result_create_stream..."); 3150bce4e4cSHarlanC self.on_result_create_stream()?; 3160bce4e4cSHarlanC } 3170bce4e4cSHarlanC _ => {} 3180bce4e4cSHarlanC }, 3190bce4e4cSHarlanC "_error" => { 3200bce4e4cSHarlanC self.on_error()?; 3210bce4e4cSHarlanC } 3220bce4e4cSHarlanC "onStatus" => { 3230bce4e4cSHarlanC match others.remove(0) { 32492df423eSHarlanC Amf0ValueType::Object(obj) => self.on_status(&obj).await?, 325f9029ceaSHarlanC _ => { 326f9029ceaSHarlanC return Err(SessionError { 3270bce4e4cSHarlanC value: SessionErrorValue::Amf0ValueCountNotCorrect, 328f9029ceaSHarlanC }) 329f9029ceaSHarlanC } 3300bce4e4cSHarlanC }; 3310bce4e4cSHarlanC } 3320bce4e4cSHarlanC 3330bce4e4cSHarlanC _ => {} 3340bce4e4cSHarlanC } 3350bce4e4cSHarlanC 3360bce4e4cSHarlanC Ok(()) 3370bce4e4cSHarlanC } 3380bce4e4cSHarlanC send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError>33935844cc6SHarlanC pub async fn send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 34092df423eSHarlanC self.send_set_chunk_size().await?; 3410bce4e4cSHarlanC 3422f7fa101SHarlanC let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 343d92d519bSHarlanC let mut properties = ConnectProperties::new_none(); 344d92d519bSHarlanC 345c8d4d932SHarlan let url = format!( 346c8d4d932SHarlan "rtmp://{domain_name}/{app_name}", 347c8d4d932SHarlan domain_name = self.raw_domain_name, 348c8d4d932SHarlan app_name = self.app_name 349c8d4d932SHarlan ); 350d92d519bSHarlanC properties.app = Some(self.app_name.clone()); 351d92d519bSHarlanC 352d92d519bSHarlanC match self.client_type { 353d92d519bSHarlanC ClientType::Play => { 354c8d4d932SHarlan properties.flash_ver = Some("LNX 9,0,124,2".to_string()); 355c8d4d932SHarlan properties.tc_url = Some(url.clone()); 356d92d519bSHarlanC properties.fpad = Some(false); 357d92d519bSHarlanC properties.capabilities = Some(15_f64); 358c8d4d932SHarlan properties.audio_codecs = Some(4071_f64); 359d92d519bSHarlanC properties.video_codecs = Some(252_f64); 360d92d519bSHarlanC properties.video_function = Some(1_f64); 361d92d519bSHarlanC } 362c8d4d932SHarlan ClientType::Publish => { 363c8d4d932SHarlan properties.pub_type = Some("nonprivate".to_string()); 364c8d4d932SHarlan properties.flash_ver = Some("FMLE/3.0 (compatible; xiu)".to_string()); 365c8d4d932SHarlan properties.fpad = Some(false); 366c8d4d932SHarlan properties.tc_url = Some(url.clone()); 367c8d4d932SHarlan } 368d92d519bSHarlanC } 369d92d519bSHarlanC 3702d9c981bSHarlanC netconnection 3712d9c981bSHarlanC .write_connect(transaction_id, &properties) 3722d9c981bSHarlanC .await?; 3730bce4e4cSHarlanC 3740bce4e4cSHarlanC Ok(()) 3750bce4e4cSHarlanC } 3760bce4e4cSHarlanC send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError>37735844cc6SHarlanC pub async fn send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 3782f7fa101SHarlanC let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 3792f7fa101SHarlanC netconnection.write_create_stream(transaction_id).await?; 3800bce4e4cSHarlanC 3810bce4e4cSHarlanC Ok(()) 3820bce4e4cSHarlanC } 3830bce4e4cSHarlanC send_delete_stream( &mut self, transaction_id: &f64, stream_id: &f64, ) -> Result<(), SessionError>38435844cc6SHarlanC pub async fn send_delete_stream( 3850bce4e4cSHarlanC &mut self, 3860bce4e4cSHarlanC transaction_id: &f64, 3870bce4e4cSHarlanC stream_id: &f64, 3880bce4e4cSHarlanC ) -> Result<(), SessionError> { 3898842de45SHarlanC let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 3902f7fa101SHarlanC netstream 3912f7fa101SHarlanC .write_delete_stream(transaction_id, stream_id) 3922f7fa101SHarlanC .await?; 3930bce4e4cSHarlanC 3940bce4e4cSHarlanC Ok(()) 3950bce4e4cSHarlanC } 3960bce4e4cSHarlanC send_publish( &mut self, transaction_id: &f64, stream_name: &String, stream_type: &String, ) -> Result<(), SessionError>39735844cc6SHarlanC pub async fn send_publish( 3980bce4e4cSHarlanC &mut self, 3990bce4e4cSHarlanC transaction_id: &f64, 4000bce4e4cSHarlanC stream_name: &String, 4010bce4e4cSHarlanC stream_type: &String, 4020bce4e4cSHarlanC ) -> Result<(), SessionError> { 4038842de45SHarlanC let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 404383722b2SHarlanC netstream 4052f7fa101SHarlanC .write_publish(transaction_id, stream_name, stream_type) 406383722b2SHarlanC .await?; 4070bce4e4cSHarlanC 4080bce4e4cSHarlanC Ok(()) 4090bce4e4cSHarlanC } 4100bce4e4cSHarlanC send_play( &mut self, transaction_id: &f64, stream_name: &String, start: &f64, duration: &f64, reset: &bool, ) -> Result<(), SessionError>41135844cc6SHarlanC pub async fn send_play( 4120bce4e4cSHarlanC &mut self, 4130bce4e4cSHarlanC transaction_id: &f64, 4140bce4e4cSHarlanC stream_name: &String, 4150bce4e4cSHarlanC start: &f64, 4160bce4e4cSHarlanC duration: &f64, 4170bce4e4cSHarlanC reset: &bool, 4180bce4e4cSHarlanC ) -> Result<(), SessionError> { 4198842de45SHarlanC let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 420383722b2SHarlanC netstream 4212f7fa101SHarlanC .write_play(transaction_id, stream_name, start, duration, reset) 422383722b2SHarlanC .await?; 4230bce4e4cSHarlanC 424c8d4d932SHarlan let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 425c8d4d932SHarlan netconnection 426c8d4d932SHarlan .write_get_stream_length(transaction_id, stream_name) 427c8d4d932SHarlan .await?; 428c8d4d932SHarlan 4298e71d710SHarlan self.send_set_buffer_length(1, 1300).await?; 430c8d4d932SHarlan 4310bce4e4cSHarlanC Ok(()) 4320bce4e4cSHarlanC } 4330bce4e4cSHarlanC send_set_chunk_size(&mut self) -> Result<(), SessionError>434e0acb0c0SHarlanC pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 435383722b2SHarlanC let mut controlmessage = 436383722b2SHarlanC ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 437e0acb0c0SHarlanC controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 4380bce4e4cSHarlanC Ok(()) 4390bce4e4cSHarlanC } 4400bce4e4cSHarlanC send_window_acknowledgement_size( &mut self, window_size: u32, ) -> Result<(), SessionError>441e0acb0c0SHarlanC pub async fn send_window_acknowledgement_size( 4420bce4e4cSHarlanC &mut self, 4430bce4e4cSHarlanC window_size: u32, 4440bce4e4cSHarlanC ) -> Result<(), SessionError> { 445383722b2SHarlanC let mut controlmessage = 446383722b2SHarlanC ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 447e0acb0c0SHarlanC controlmessage 448e0acb0c0SHarlanC .write_window_acknowledgement_size(window_size) 449e0acb0c0SHarlanC .await?; 4500bce4e4cSHarlanC Ok(()) 4510bce4e4cSHarlanC } 4520bce4e4cSHarlanC send_set_buffer_length( &mut self, stream_id: u32, ms: u32, ) -> Result<(), SessionError>4534d5a5966SHarlanC pub async fn send_set_buffer_length( 4544d5a5966SHarlanC &mut self, 4554d5a5966SHarlanC stream_id: u32, 4564d5a5966SHarlanC ms: u32, 4574d5a5966SHarlanC ) -> Result<(), SessionError> { 45897f0b5afSHarlanC let mut eventmessages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 45997f0b5afSHarlanC eventmessages.write_set_buffer_length(stream_id, ms).await?; 4600bce4e4cSHarlanC 4610bce4e4cSHarlanC Ok(()) 4620bce4e4cSHarlanC } 4630bce4e4cSHarlanC on_result_connect(&mut self) -> Result<(), SessionError>46492df423eSHarlanC pub async fn on_result_connect(&mut self) -> Result<(), SessionError> { 46592df423eSHarlanC let mut controlmessage = 46692df423eSHarlanC ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 46792df423eSHarlanC controlmessage.write_acknowledgement(3107).await?; 46892df423eSHarlanC 4698842de45SHarlanC let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 47092df423eSHarlanC netstream 4712f7fa101SHarlanC .write_release_stream(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name) 47292df423eSHarlanC .await?; 47392df423eSHarlanC netstream 4742f7fa101SHarlanC .write_fcpublish(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name) 47592df423eSHarlanC .await?; 47692df423eSHarlanC 4770bce4e4cSHarlanC self.state = ClientSessionState::CreateStream; 47892df423eSHarlanC 4790bce4e4cSHarlanC Ok(()) 4800bce4e4cSHarlanC } 4810bce4e4cSHarlanC on_result_create_stream(&mut self) -> Result<(), SessionError>4820bce4e4cSHarlanC pub fn on_result_create_stream(&mut self) -> Result<(), SessionError> { 4830bce4e4cSHarlanC match self.client_type { 4840bce4e4cSHarlanC ClientType::Play => { 4850bce4e4cSHarlanC self.state = ClientSessionState::Play; 4860bce4e4cSHarlanC } 4870bce4e4cSHarlanC ClientType::Publish => { 4880bce4e4cSHarlanC self.state = ClientSessionState::PublishingContent; 4890bce4e4cSHarlanC } 4900bce4e4cSHarlanC } 4910bce4e4cSHarlanC Ok(()) 4920bce4e4cSHarlanC } 4930bce4e4cSHarlanC on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError>4940bce4e4cSHarlanC pub fn on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError> { 4950bce4e4cSHarlanC self.unpacketizer 49685c0af6aSLuca Barbato .update_max_chunk_size(*chunk_size as usize); 4970bce4e4cSHarlanC Ok(()) 4980bce4e4cSHarlanC } 4990bce4e4cSHarlanC on_stream_is_recorded(&mut self, stream_id: &mut u32) -> Result<(), SessionError>500b97dcde8SHarlanC pub fn on_stream_is_recorded(&mut self, stream_id: &mut u32) -> Result<(), SessionError> { 50188325f54SHarlanC log::trace!("stream is recorded stream_id is {}", stream_id); 502b97dcde8SHarlanC Ok(()) 503b97dcde8SHarlanC } 504b97dcde8SHarlanC on_stream_begin(&mut self, stream_id: &mut u32) -> Result<(), SessionError>505b97dcde8SHarlanC pub fn on_stream_begin(&mut self, stream_id: &mut u32) -> Result<(), SessionError> { 50688325f54SHarlanC log::trace!("stream is begin stream_id is {}", stream_id); 507b97dcde8SHarlanC Ok(()) 508b97dcde8SHarlanC } 509b97dcde8SHarlanC on_set_peer_bandwidth(&mut self) -> Result<(), SessionError>510e0acb0c0SHarlanC pub async fn on_set_peer_bandwidth(&mut self) -> Result<(), SessionError> { 511c8d4d932SHarlan self.send_window_acknowledgement_size(5000000).await?; 512c8d4d932SHarlan 5130bce4e4cSHarlanC Ok(()) 5140bce4e4cSHarlanC } 51592df423eSHarlanC on_error(&mut self) -> Result<(), SessionError>5160bce4e4cSHarlanC pub fn on_error(&mut self) -> Result<(), SessionError> { 5170bce4e4cSHarlanC Ok(()) 5180bce4e4cSHarlanC } 5190bce4e4cSHarlanC on_status( &mut self, obj: &IndexMap<String, Amf0ValueType>, ) -> Result<(), SessionError>52092df423eSHarlanC pub async fn on_status( 52192df423eSHarlanC &mut self, 522c8d4d932SHarlan obj: &IndexMap<String, Amf0ValueType>, 52392df423eSHarlanC ) -> Result<(), SessionError> { 52492df423eSHarlanC if let Some(Amf0ValueType::UTF8String(code_info)) = obj.get("code") { 525b97dcde8SHarlanC match &code_info[..] { 526b97dcde8SHarlanC "NetStream.Publish.Start" => { 52792df423eSHarlanC self.state = ClientSessionState::StartPublish; 52895feb96cSHarlan //subscribe from local session and publish to remote rtmp server 52995feb96cSHarlan if let (Some(app_name), Some(stream_name)) = 53095feb96cSHarlan (&self.sub_app_name, &self.sub_stream_name) 53195feb96cSHarlan { 53295feb96cSHarlan self.common 53395feb96cSHarlan .subscribe_from_channels( 53495feb96cSHarlan app_name.clone(), 53595feb96cSHarlan stream_name.clone(), 53695feb96cSHarlan self.session_id, 53795feb96cSHarlan ) 53895feb96cSHarlan .await?; 53995feb96cSHarlan } else { 54092df423eSHarlanC self.common 54192df423eSHarlanC .subscribe_from_channels( 54292df423eSHarlanC self.app_name.clone(), 54392df423eSHarlanC self.stream_name.clone(), 544976f65a6SHarlan self.session_id, 54592df423eSHarlanC ) 54692df423eSHarlanC .await?; 54792df423eSHarlanC } 54895feb96cSHarlan } 549b97dcde8SHarlanC "NetStream.Publish.Reset" => {} 5503cf13c0aSHarlanC "NetStream.Play.Start" => { 55195feb96cSHarlan //pull from remote rtmp server and publish to local session 5523cf13c0aSHarlanC self.common 553976f65a6SHarlan .publish_to_channels( 554976f65a6SHarlan self.app_name.clone(), 555976f65a6SHarlan self.stream_name.clone(), 556976f65a6SHarlan self.session_id, 5578e71d710SHarlan self.gop_num, 558976f65a6SHarlan ) 5593cf13c0aSHarlanC .await? 5603cf13c0aSHarlanC } 561b97dcde8SHarlanC _ => {} 56292df423eSHarlanC } 56392df423eSHarlanC } 56488325f54SHarlanC log::trace!("{}", obj.len()); 5650bce4e4cSHarlanC Ok(()) 5660bce4e4cSHarlanC } 56795feb96cSHarlan subscribe(&mut self, app_name: String, stream_name: String)568c8d4d932SHarlan pub fn subscribe(&mut self, app_name: String, stream_name: String) { 569ded9d193SHarlan self.sub_app_name = Some(app_name); 570ded9d193SHarlan self.sub_stream_name = Some(stream_name); 57195feb96cSHarlan } 5720bce4e4cSHarlanC } 573