1*a4ef5d6cSHarlanC use crate::chunk::{errors::UnpackErrorValue, packetizer::ChunkPacketizer}; 213bac29aSHarlan 34435d79dSHarlanC use { 44435d79dSHarlanC super::{ 592df423eSHarlanC common::Common, 64435d79dSHarlanC define, 716394c08SHarlanC define::SessionType, 84435d79dSHarlanC errors::{SessionError, SessionErrorValue}, 94435d79dSHarlanC }, 104435d79dSHarlanC crate::{ 114435d79dSHarlanC amf0::Amf0ValueType, 124435d79dSHarlanC chunk::{ 132f7fa101SHarlanC define::CHUNK_SIZE, 144435d79dSHarlanC unpacketizer::{ChunkUnpacketizer, UnpackResult}, 154435d79dSHarlanC }, 168a37108dSHarlanC config, handshake, 172d9c981bSHarlanC handshake::{define::ServerHandshakeState, handshake_server::HandshakeServer}, 182f7fa101SHarlanC messages::{define::RtmpMessageData, parser::MessageParser}, 1988777e65SHarlan netconnection::writer::{ConnectProperties, NetConnection}, 204435d79dSHarlanC netstream::writer::NetStreamWriter, 214435d79dSHarlanC protocol_control_messages::writer::ProtocolControlMessagesWriter, 224435d79dSHarlanC user_control_messages::writer::EventMessagesWriter, 23c8d4d932SHarlan utils::RtmpUrlParser, 244435d79dSHarlanC }, 254435d79dSHarlanC bytes::BytesMut, 268e71d710SHarlan bytesio::{ 278e71d710SHarlan bytes_writer::AsyncBytesWriter, 288e71d710SHarlan bytesio::{TNetIO, TcpIO}, 298e71d710SHarlan }, 30c8d4d932SHarlan indexmap::IndexMap, 31c8d4d932SHarlan std::{sync::Arc, time::Duration}, 328e71d710SHarlan streamhub::{ 338e71d710SHarlan define::StreamHubEventSender, 348e71d710SHarlan utils::{RandomDigitCount, Uuid}, 358e71d710SHarlan }, 36b1840569SHarlanC tokio::{net::TcpStream, sync::Mutex}, 370bce4e4cSHarlanC }; 380bce4e4cSHarlanC 390bce4e4cSHarlanC enum ServerSessionState { 400bce4e4cSHarlanC Handshake, 410bce4e4cSHarlanC ReadChunk, 42f4c7b80bSHarlanC // OnConnect, 43f4c7b80bSHarlanC // OnCreateStream, 446fec14f0SHarlanC //Publish, 458baa1d3cSHarlan DeleteStream, 464b4045b2SHarlanC Play, 470bce4e4cSHarlanC } 480bce4e4cSHarlanC 49fe91dfa7SHarlanC pub struct ServerSession { 50740804e8SHarlanC pub app_name: String, 51740804e8SHarlanC pub stream_name: String, 52f159b276SHarlan pub url_parameters: String, 538e71d710SHarlan io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, 542b020b03SHarlanC handshaker: HandshakeServer, 554b4045b2SHarlanC unpacketizer: ChunkUnpacketizer, 560bce4e4cSHarlanC state: ServerSessionState, 5788325f54SHarlanC bytesio_data: BytesMut, 585eed9a43Swawacry has_remaing_data: bool, 5988d91efdSHarlanC /* Used to mark the subscriber's the data producer 6088d91efdSHarlanC in channels and delete it from map when unsubscribe 6188d91efdSHarlanC is called. */ 62976f65a6SHarlan pub session_id: Uuid, 6388777e65SHarlan connect_properties: ConnectProperties, 648e71d710SHarlan pub common: Common, 658e71d710SHarlan /*configure how many gops will be cached.*/ 668e71d710SHarlan gop_num: usize, 670bce4e4cSHarlanC } 680bce4e4cSHarlanC 69fe91dfa7SHarlanC impl ServerSession { new(stream: TcpStream, event_producer: StreamHubEventSender, gop_num: usize) -> Self708e71d710SHarlan pub fn new(stream: TcpStream, event_producer: StreamHubEventSender, gop_num: usize) -> Self { 71976f65a6SHarlan let remote_addr = if let Ok(addr) = stream.peer_addr() { 72976f65a6SHarlan log::info!("server session: {}", addr.to_string()); 73976f65a6SHarlan Some(addr) 74976f65a6SHarlan } else { 75976f65a6SHarlan None 76976f65a6SHarlan }; 77976f65a6SHarlan 788e71d710SHarlan let tcp_io: Box<dyn TNetIO + Send + Sync> = Box::new(TcpIO::new(stream)); 798e71d710SHarlan let net_io = Arc::new(Mutex::new(tcp_io)); 808e71d710SHarlan 810bce4e4cSHarlanC Self { 826fec14f0SHarlanC app_name: String::from(""), 83cc18a6e9SHarlanC stream_name: String::from(""), 84f159b276SHarlan url_parameters: String::from(""), 8535844cc6SHarlanC io: Arc::clone(&net_io), 862b020b03SHarlanC handshaker: HandshakeServer::new(Arc::clone(&net_io)), 871606f184SHarlanC unpacketizer: ChunkUnpacketizer::new(), 880bce4e4cSHarlanC state: ServerSessionState::Handshake, 89976f65a6SHarlan common: Common::new( 9013bac29aSHarlan Some(ChunkPacketizer::new(Arc::clone(&net_io))), 91976f65a6SHarlan event_producer, 92976f65a6SHarlan SessionType::Server, 93976f65a6SHarlan remote_addr, 94976f65a6SHarlan ), 9513bac29aSHarlan session_id: Uuid::new(RandomDigitCount::Four), 9688325f54SHarlanC bytesio_data: BytesMut::new(), 975eed9a43Swawacry has_remaing_data: false, 9888777e65SHarlan connect_properties: ConnectProperties::default(), 998e71d710SHarlan gop_num, 1000bce4e4cSHarlanC } 1010bce4e4cSHarlanC } 1020bce4e4cSHarlanC run(&mut self) -> Result<(), SessionError>1030bce4e4cSHarlanC pub async fn run(&mut self) -> Result<(), SessionError> { 1040bce4e4cSHarlanC loop { 1050bce4e4cSHarlanC match self.state { 1060bce4e4cSHarlanC ServerSessionState::Handshake => { 107cc18a6e9SHarlanC self.handshake().await?; 108cc18a6e9SHarlanC } 109cc18a6e9SHarlanC ServerSessionState::ReadChunk => { 110cc18a6e9SHarlanC self.read_parse_chunks().await?; 111cc18a6e9SHarlanC } 112cc18a6e9SHarlanC ServerSessionState::Play => { 113cc18a6e9SHarlanC self.play().await?; 114cc18a6e9SHarlanC } 1158baa1d3cSHarlan ServerSessionState::DeleteStream => { 1168baa1d3cSHarlan return Ok(()); 1178baa1d3cSHarlan } 118cc18a6e9SHarlanC } 119fe91dfa7SHarlanC } 1200bce4e4cSHarlanC 121f9029ceaSHarlanC //Ok(()) 122cc18a6e9SHarlanC } 123cc18a6e9SHarlanC handshake(&mut self) -> Result<(), SessionError>124cc18a6e9SHarlanC async fn handshake(&mut self) -> Result<(), SessionError> { 1258a37108dSHarlanC let mut bytes_len = 0; 1265eed9a43Swawacry 1278a37108dSHarlanC while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE { 1288a37108dSHarlanC self.bytesio_data = self.io.lock().await.read().await?; 1298a37108dSHarlanC bytes_len += self.bytesio_data.len(); 13088325f54SHarlanC self.handshaker.extend_data(&self.bytesio_data[..]); 1318a37108dSHarlanC } 1328a37108dSHarlanC 1332b020b03SHarlanC self.handshaker.handshake().await?; 134fe91dfa7SHarlanC 1350ca99c20SHarlan if let ServerHandshakeState::Finish = self.handshaker.state() { 1360bce4e4cSHarlanC self.state = ServerSessionState::ReadChunk; 1372b020b03SHarlanC let left_bytes = self.handshaker.get_remaining_bytes(); 13885c0af6aSLuca Barbato if !left_bytes.is_empty() { 139cc18a6e9SHarlanC self.unpacketizer.extend_data(&left_bytes[..]); 1405eed9a43Swawacry self.has_remaing_data = true; 141fe91dfa7SHarlanC } 1422d9c981bSHarlanC log::info!("[ S->C ] [send_set_chunk_size] "); 1432b020b03SHarlanC self.send_set_chunk_size().await?; 144cc18a6e9SHarlanC return Ok(()); 1454d5a5966SHarlanC } 146cc18a6e9SHarlanC 147cc18a6e9SHarlanC Ok(()) 148cc18a6e9SHarlanC } 149cc18a6e9SHarlanC read_parse_chunks(&mut self) -> Result<(), SessionError>150cc18a6e9SHarlanC async fn read_parse_chunks(&mut self) -> Result<(), SessionError> { 1515eed9a43Swawacry if !self.has_remaing_data { 1525eed9a43Swawacry match self 1535eed9a43Swawacry .io 1545eed9a43Swawacry .lock() 1555eed9a43Swawacry .await 1565eed9a43Swawacry .read_timeout(Duration::from_secs(2)) 1575eed9a43Swawacry .await 1585eed9a43Swawacry { 1595eed9a43Swawacry Ok(data) => { 1605eed9a43Swawacry self.bytesio_data = data; 1615eed9a43Swawacry } 1625eed9a43Swawacry Err(err) => { 1635eed9a43Swawacry self.common 164976f65a6SHarlan .unpublish_to_channels( 165976f65a6SHarlan self.app_name.clone(), 166976f65a6SHarlan self.stream_name.clone(), 167976f65a6SHarlan self.session_id, 168976f65a6SHarlan ) 1695eed9a43Swawacry .await?; 1705eed9a43Swawacry 1715eed9a43Swawacry return Err(SessionError { 1725eed9a43Swawacry value: SessionErrorValue::BytesIOError(err), 1735eed9a43Swawacry }); 1745eed9a43Swawacry } 1755eed9a43Swawacry } 1765eed9a43Swawacry 17788325f54SHarlanC self.unpacketizer.extend_data(&self.bytesio_data[..]); 178cc18a6e9SHarlanC } 179cc18a6e9SHarlanC 1805eed9a43Swawacry self.has_remaing_data = false; 1814d5a5966SHarlanC 18292986251SHarlanC loop { 183*a4ef5d6cSHarlanC match self.unpacketizer.read_chunks() { 184*a4ef5d6cSHarlanC Ok(rv) => { 1850ca99c20SHarlan if let UnpackResult::Chunks(chunks) = rv { 186f3a36955SHarlanC for chunk_info in chunks { 1874b4045b2SHarlanC let timestamp = chunk_info.message_header.timestamp; 188f3a36955SHarlanC let msg_stream_id = chunk_info.message_header.msg_streamd_id; 189f3a36955SHarlanC 19069de9bbdSHarlanC if let Some(mut msg) = MessageParser::new(chunk_info).parse()? { 1914b4045b2SHarlanC self.process_messages(&mut msg, &msg_stream_id, ×tamp) 1924b4045b2SHarlanC .await?; 1930bce4e4cSHarlanC } 1944745db95SHarlanC } 19569de9bbdSHarlanC } 196*a4ef5d6cSHarlanC } 197*a4ef5d6cSHarlanC Err(err) => { 198*a4ef5d6cSHarlanC if let UnpackErrorValue::CannotParse = err.value { 199*a4ef5d6cSHarlanC self.common 200*a4ef5d6cSHarlanC .unpublish_to_channels( 201*a4ef5d6cSHarlanC self.app_name.clone(), 202*a4ef5d6cSHarlanC self.stream_name.clone(), 203*a4ef5d6cSHarlanC self.session_id, 204*a4ef5d6cSHarlanC ) 205*a4ef5d6cSHarlanC .await?; 206*a4ef5d6cSHarlanC return Err(err)?; 207*a4ef5d6cSHarlanC } 208cc18a6e9SHarlanC break; 209cc18a6e9SHarlanC } 210cc18a6e9SHarlanC } 211*a4ef5d6cSHarlanC } 212cc18a6e9SHarlanC Ok(()) 213cc18a6e9SHarlanC } 214cc18a6e9SHarlanC play(&mut self) -> Result<(), SessionError>215cc18a6e9SHarlanC async fn play(&mut self) -> Result<(), SessionError> { 21692df423eSHarlanC match self.common.send_channel_data().await { 217cc18a6e9SHarlanC Ok(_) => {} 218f8169385SHarlanC Err(err) => { 21992df423eSHarlanC self.common 22092df423eSHarlanC .unsubscribe_from_channels( 22192df423eSHarlanC self.app_name.clone(), 22292df423eSHarlanC self.stream_name.clone(), 223976f65a6SHarlan self.session_id, 22492df423eSHarlanC ) 22592df423eSHarlanC .await?; 226f8169385SHarlanC return Err(err); 2270bce4e4cSHarlanC } 22892986251SHarlanC } 2291606f184SHarlanC 230cc18a6e9SHarlanC Ok(()) 231cc18a6e9SHarlanC } 232cc18a6e9SHarlanC send_set_chunk_size(&mut self) -> Result<(), SessionError>233e0acb0c0SHarlanC pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 234383722b2SHarlanC let mut controlmessage = 235383722b2SHarlanC ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 236e0acb0c0SHarlanC controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 2370bce4e4cSHarlanC 2380bce4e4cSHarlanC Ok(()) 2390bce4e4cSHarlanC } 2400bce4e4cSHarlanC process_messages( &mut self, rtmp_msg: &mut RtmpMessageData, msg_stream_id: &u32, timestamp: &u32, ) -> Result<(), SessionError>24135844cc6SHarlanC pub async fn process_messages( 2420bce4e4cSHarlanC &mut self, 2434b4045b2SHarlanC rtmp_msg: &mut RtmpMessageData, 2440bce4e4cSHarlanC msg_stream_id: &u32, 2454b4045b2SHarlanC timestamp: &u32, 2460bce4e4cSHarlanC ) -> Result<(), SessionError> { 2470bce4e4cSHarlanC match rtmp_msg { 2484b4045b2SHarlanC RtmpMessageData::Amf0Command { 2490bce4e4cSHarlanC command_name, 2500bce4e4cSHarlanC transaction_id, 2510bce4e4cSHarlanC command_object, 2520bce4e4cSHarlanC others, 25335844cc6SHarlanC } => { 254cc18a6e9SHarlanC self.on_amf0_command_message( 2550bce4e4cSHarlanC msg_stream_id, 2560bce4e4cSHarlanC command_name, 2570bce4e4cSHarlanC transaction_id, 2580bce4e4cSHarlanC command_object, 2590bce4e4cSHarlanC others, 26035844cc6SHarlanC ) 26135844cc6SHarlanC .await? 26235844cc6SHarlanC } 26392986251SHarlanC RtmpMessageData::SetChunkSize { chunk_size } => { 26485c0af6aSLuca Barbato self.on_set_chunk_size(*chunk_size as usize)?; 26592986251SHarlanC } 2664b4045b2SHarlanC RtmpMessageData::AudioData { data } => { 2678e71d710SHarlan self.common.on_audio_data(data, timestamp).await?; 2684b4045b2SHarlanC } 2694b4045b2SHarlanC RtmpMessageData::VideoData { data } => { 2708e71d710SHarlan self.common.on_video_data(data, timestamp).await?; 2714b4045b2SHarlanC } 27261bf3e1bSHarlanC RtmpMessageData::AmfData { raw_data } => { 2738e71d710SHarlan self.common.on_meta_data(raw_data, timestamp).await?; 2747adc8486SHarlanC } 2750bce4e4cSHarlanC 2760bce4e4cSHarlanC _ => {} 2770bce4e4cSHarlanC } 2780bce4e4cSHarlanC Ok(()) 2790bce4e4cSHarlanC } 2800bce4e4cSHarlanC on_amf0_command_message( &mut self, stream_id: &u32, command_name: &Amf0ValueType, transaction_id: &Amf0ValueType, command_object: &Amf0ValueType, others: &mut Vec<Amf0ValueType>, ) -> Result<(), SessionError>281cc18a6e9SHarlanC pub async fn on_amf0_command_message( 2820bce4e4cSHarlanC &mut self, 2830bce4e4cSHarlanC stream_id: &u32, 2840bce4e4cSHarlanC command_name: &Amf0ValueType, 2850bce4e4cSHarlanC transaction_id: &Amf0ValueType, 2860bce4e4cSHarlanC command_object: &Amf0ValueType, 2870bce4e4cSHarlanC others: &mut Vec<Amf0ValueType>, 2880bce4e4cSHarlanC ) -> Result<(), SessionError> { 2890bce4e4cSHarlanC let empty_cmd_name = &String::new(); 2900bce4e4cSHarlanC let cmd_name = match command_name { 2910bce4e4cSHarlanC Amf0ValueType::UTF8String(str) => str, 2920bce4e4cSHarlanC _ => empty_cmd_name, 2930bce4e4cSHarlanC }; 2940bce4e4cSHarlanC 2950bce4e4cSHarlanC let transaction_id = match transaction_id { 2960bce4e4cSHarlanC Amf0ValueType::Number(number) => number, 2970bce4e4cSHarlanC _ => &0.0, 2980bce4e4cSHarlanC }; 2990bce4e4cSHarlanC 300c8d4d932SHarlan let empty_cmd_obj: IndexMap<String, Amf0ValueType> = IndexMap::new(); 3010bce4e4cSHarlanC let obj = match command_object { 3020bce4e4cSHarlanC Amf0ValueType::Object(obj) => obj, 3030bce4e4cSHarlanC _ => &empty_cmd_obj, 3040bce4e4cSHarlanC }; 3050bce4e4cSHarlanC 3060bce4e4cSHarlanC match cmd_name.as_str() { 3070bce4e4cSHarlanC "connect" => { 30862d3fba8SHarlanC log::info!("[ S<-C ] [connect] "); 30985c0af6aSLuca Barbato self.on_connect(transaction_id, obj).await?; 3100bce4e4cSHarlanC } 3110bce4e4cSHarlanC "createStream" => { 31262d3fba8SHarlanC log::info!("[ S<-C ] [create stream] "); 313fe91dfa7SHarlanC self.on_create_stream(transaction_id).await?; 3140bce4e4cSHarlanC } 3150bce4e4cSHarlanC "deleteStream" => { 31685c0af6aSLuca Barbato if !others.is_empty() { 3170bce4e4cSHarlanC let stream_id = match others.pop() { 3180ca99c20SHarlan Some(Amf0ValueType::Number(streamid)) => streamid, 3190bce4e4cSHarlanC _ => 0.0, 3200bce4e4cSHarlanC }; 3210ca99c20SHarlan 32262d3fba8SHarlanC log::info!( 32362d3fba8SHarlanC "[ S<-C ] [delete stream] app_name: {}, stream_name: {}", 32462d3fba8SHarlanC self.app_name, 32562d3fba8SHarlanC self.stream_name 32662d3fba8SHarlanC ); 32762d3fba8SHarlanC 32835844cc6SHarlanC self.on_delete_stream(transaction_id, &stream_id).await?; 3298baa1d3cSHarlan self.state = ServerSessionState::DeleteStream; 3300bce4e4cSHarlanC } 3310bce4e4cSHarlanC } 3320bce4e4cSHarlanC "play" => { 33362d3fba8SHarlanC log::info!( 33462d3fba8SHarlanC "[ S<-C ] [play] app_name: {}, stream_name: {}", 33562d3fba8SHarlanC self.app_name, 33662d3fba8SHarlanC self.stream_name 33762d3fba8SHarlanC ); 338cc18a6e9SHarlanC self.unpacketizer.session_type = config::SERVER_PULL; 33935844cc6SHarlanC self.on_play(transaction_id, stream_id, others).await?; 3400bce4e4cSHarlanC } 3410bce4e4cSHarlanC "publish" => { 342cc18a6e9SHarlanC self.unpacketizer.session_type = config::SERVER_PUSH; 34335844cc6SHarlanC self.on_publish(transaction_id, stream_id, others).await?; 3440bce4e4cSHarlanC } 3450bce4e4cSHarlanC _ => {} 3460bce4e4cSHarlanC } 3470bce4e4cSHarlanC 3480bce4e4cSHarlanC Ok(()) 3490bce4e4cSHarlanC } 3500bce4e4cSHarlanC on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError>35192986251SHarlanC fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> { 3527cd03d07SHarlan log::info!( 3537cd03d07SHarlan "[ S<-C ] [set chunk size] app_name: {}, stream_name: {}, chunk size: {}", 3547cd03d07SHarlan self.app_name, 3557cd03d07SHarlan self.stream_name, 3567cd03d07SHarlan chunk_size 3577cd03d07SHarlan ); 35892986251SHarlanC self.unpacketizer.update_max_chunk_size(chunk_size); 35992986251SHarlanC Ok(()) 36092986251SHarlanC } 36192986251SHarlanC parse_connect_properties(&mut self, command_obj: &IndexMap<String, Amf0ValueType>)362c8d4d932SHarlan fn parse_connect_properties(&mut self, command_obj: &IndexMap<String, Amf0ValueType>) { 36353b16235SHarlan for (property, value) in command_obj { 36453b16235SHarlan match property.as_str() { 36588777e65SHarlan "app" => { 36653b16235SHarlan if let Amf0ValueType::UTF8String(app) = value { 36788777e65SHarlan self.connect_properties.app = Some(app.clone()); 36888777e65SHarlan } 36988777e65SHarlan } 37088777e65SHarlan "flashVer" => { 37153b16235SHarlan if let Amf0ValueType::UTF8String(flash_ver) = value { 37288777e65SHarlan self.connect_properties.flash_ver = Some(flash_ver.clone()); 37388777e65SHarlan } 37488777e65SHarlan } 37588777e65SHarlan "swfUrl" => { 37653b16235SHarlan if let Amf0ValueType::UTF8String(swf_url) = value { 37788777e65SHarlan self.connect_properties.swf_url = Some(swf_url.clone()); 37888777e65SHarlan } 37988777e65SHarlan } 38088777e65SHarlan "tcUrl" => { 38153b16235SHarlan if let Amf0ValueType::UTF8String(tc_url) = value { 38288777e65SHarlan self.connect_properties.tc_url = Some(tc_url.clone()); 38388777e65SHarlan } 38488777e65SHarlan } 38588777e65SHarlan "fpad" => { 38653b16235SHarlan if let Amf0ValueType::Boolean(fpad) = value { 38788777e65SHarlan self.connect_properties.fpad = Some(*fpad); 38888777e65SHarlan } 38988777e65SHarlan } 39088777e65SHarlan "audioCodecs" => { 39153b16235SHarlan if let Amf0ValueType::Number(audio_codecs) = value { 39288777e65SHarlan self.connect_properties.audio_codecs = Some(*audio_codecs); 39388777e65SHarlan } 39488777e65SHarlan } 39588777e65SHarlan "videoCodecs" => { 39653b16235SHarlan if let Amf0ValueType::Number(video_codecs) = value { 39788777e65SHarlan self.connect_properties.video_codecs = Some(*video_codecs); 39888777e65SHarlan } 39988777e65SHarlan } 40088777e65SHarlan "videoFunction" => { 40153b16235SHarlan if let Amf0ValueType::Number(video_function) = value { 40288777e65SHarlan self.connect_properties.video_function = Some(*video_function); 40388777e65SHarlan } 40488777e65SHarlan } 40588777e65SHarlan "pageUrl" => { 40653b16235SHarlan if let Amf0ValueType::UTF8String(page_url) = value { 40788777e65SHarlan self.connect_properties.page_url = Some(page_url.clone()); 40888777e65SHarlan } 40988777e65SHarlan } 41088777e65SHarlan "objectEncoding" => { 41153b16235SHarlan if let Amf0ValueType::Number(object_encoding) = value { 41288777e65SHarlan self.connect_properties.object_encoding = Some(*object_encoding); 41388777e65SHarlan } 41488777e65SHarlan } 41588777e65SHarlan _ => { 41653b16235SHarlan log::warn!("unknown connect properties: {}:{:?}", property, value); 41788777e65SHarlan } 41888777e65SHarlan } 41988777e65SHarlan } 42088777e65SHarlan } 42188777e65SHarlan on_connect( &mut self, transaction_id: &f64, command_obj: &IndexMap<String, Amf0ValueType>, ) -> Result<(), SessionError>42235844cc6SHarlanC async fn on_connect( 4230bce4e4cSHarlanC &mut self, 4240bce4e4cSHarlanC transaction_id: &f64, 425c8d4d932SHarlan command_obj: &IndexMap<String, Amf0ValueType>, 4260bce4e4cSHarlanC ) -> Result<(), SessionError> { 42788777e65SHarlan self.parse_connect_properties(command_obj); 42888777e65SHarlan log::info!("connect properties: {:?}", self.connect_properties); 429383722b2SHarlanC let mut control_message = 430383722b2SHarlanC ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 43162d3fba8SHarlanC log::info!("[ S->C ] [set window_acknowledgement_size]"); 4324745db95SHarlanC control_message 4334745db95SHarlanC .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 4344745db95SHarlanC .await?; 4352d9c981bSHarlanC 43662d3fba8SHarlanC log::info!("[ S->C ] [set set_peer_bandwidth]",); 4374745db95SHarlanC control_message 4384745db95SHarlanC .write_set_peer_bandwidth( 4390bce4e4cSHarlanC define::PEER_BANDWIDTH, 440f9029ceaSHarlanC define::peer_bandwidth_limit_type::DYNAMIC, 4414745db95SHarlanC ) 4424745db95SHarlanC .await?; 4430bce4e4cSHarlanC 4440bce4e4cSHarlanC let obj_encoding = command_obj.get("objectEncoding"); 4450bce4e4cSHarlanC let encoding = match obj_encoding { 4460bce4e4cSHarlanC Some(Amf0ValueType::Number(encoding)) => encoding, 4470bce4e4cSHarlanC _ => &define::OBJENCODING_AMF0, 4480bce4e4cSHarlanC }; 4490bce4e4cSHarlanC 4506fec14f0SHarlanC let app_name = command_obj.get("app"); 4516fec14f0SHarlanC self.app_name = match app_name { 4526fec14f0SHarlanC Some(Amf0ValueType::UTF8String(app)) => app.clone(), 4536fec14f0SHarlanC _ => { 4546fec14f0SHarlanC return Err(SessionError { 4556fec14f0SHarlanC value: SessionErrorValue::NoAppName, 4566fec14f0SHarlanC }); 4576fec14f0SHarlanC } 4586fec14f0SHarlanC }; 4596fec14f0SHarlanC 4602f7fa101SHarlanC let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 4612d9c981bSHarlanC log::info!("[ S->C ] [set connect_response]",); 4622f7fa101SHarlanC netconnection 4632f7fa101SHarlanC .write_connect_response( 46485c0af6aSLuca Barbato transaction_id, 4650ca99c20SHarlan define::FMSVER, 4660bce4e4cSHarlanC &define::CAPABILITIES, 4670bce4e4cSHarlanC &String::from("NetConnection.Connect.Success"), 4680ca99c20SHarlan define::LEVEL, 4690bce4e4cSHarlanC &String::from("Connection Succeeded."), 4700bce4e4cSHarlanC encoding, 4712f7fa101SHarlanC ) 4722f7fa101SHarlanC .await?; 47335844cc6SHarlanC 4740bce4e4cSHarlanC Ok(()) 4750bce4e4cSHarlanC } 4760bce4e4cSHarlanC on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError>477fe91dfa7SHarlanC pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 4782f7fa101SHarlanC let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 4792f7fa101SHarlanC netconnection 4802f7fa101SHarlanC .write_create_stream_response(transaction_id, &define::STREAM_ID) 4812f7fa101SHarlanC .await?; 4820bce4e4cSHarlanC 48362d3fba8SHarlanC log::info!( 48462d3fba8SHarlanC "[ S->C ] [create_stream_response] app_name: {}", 48562d3fba8SHarlanC self.app_name, 48662d3fba8SHarlanC ); 48762d3fba8SHarlanC 4880bce4e4cSHarlanC Ok(()) 4890bce4e4cSHarlanC } 4900bce4e4cSHarlanC on_delete_stream( &mut self, transaction_id: &f64, stream_id: &f64, ) -> Result<(), SessionError>49135844cc6SHarlanC pub async fn on_delete_stream( 4920bce4e4cSHarlanC &mut self, 4930bce4e4cSHarlanC transaction_id: &f64, 4940bce4e4cSHarlanC stream_id: &f64, 4950bce4e4cSHarlanC ) -> Result<(), SessionError> { 49692df423eSHarlanC self.common 497976f65a6SHarlan .unpublish_to_channels( 498976f65a6SHarlan self.app_name.clone(), 499976f65a6SHarlan self.stream_name.clone(), 500976f65a6SHarlan self.session_id, 501976f65a6SHarlan ) 50292df423eSHarlanC .await?; 503a4485a6eSHarlanC 5048842de45SHarlanC let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 505383722b2SHarlanC netstream 5062f7fa101SHarlanC .write_on_status( 5070bce4e4cSHarlanC transaction_id, 5080ca99c20SHarlan "status", 5090ca99c20SHarlan "NetStream.DeleteStream.Suceess", 5100ca99c20SHarlan "", 511383722b2SHarlanC ) 512383722b2SHarlanC .await?; 5130bce4e4cSHarlanC 514f8169385SHarlanC //self.unsubscribe_from_channels().await?; 51562d3fba8SHarlanC log::info!( 51662d3fba8SHarlanC "[ S->C ] [delete stream success] app_name: {}, stream_name: {}", 51762d3fba8SHarlanC self.app_name, 51862d3fba8SHarlanC self.stream_name 51962d3fba8SHarlanC ); 52062d3fba8SHarlanC log::trace!("{}", stream_id); 521cc18a6e9SHarlanC 5220bce4e4cSHarlanC Ok(()) 5230bce4e4cSHarlanC } 5240ca99c20SHarlan get_request_url(&mut self, raw_stream_name: String) -> String525f159b276SHarlan fn get_request_url(&mut self, raw_stream_name: String) -> String { 526976f65a6SHarlan if let Some(tc_url) = &self.connect_properties.tc_url { 527248cdac6SHarlan format!("{tc_url}/{raw_stream_name}") 528976f65a6SHarlan } else { 529f159b276SHarlan format!("{}/{}", self.app_name.clone(), raw_stream_name) 530f159b276SHarlan } 531f159b276SHarlan } 532976f65a6SHarlan 5330ca99c20SHarlan #[allow(clippy::never_loop)] on_play( &mut self, transaction_id: &f64, stream_id: &u32, other_values: &mut Vec<Amf0ValueType>, ) -> Result<(), SessionError>53435844cc6SHarlanC pub async fn on_play( 5350bce4e4cSHarlanC &mut self, 5360bce4e4cSHarlanC transaction_id: &f64, 5370bce4e4cSHarlanC stream_id: &u32, 5380bce4e4cSHarlanC other_values: &mut Vec<Amf0ValueType>, 5390bce4e4cSHarlanC ) -> Result<(), SessionError> { 5400bce4e4cSHarlanC let length = other_values.len() as u8; 5410bce4e4cSHarlanC let mut index: u8 = 0; 5420bce4e4cSHarlanC 5430bce4e4cSHarlanC let mut stream_name: Option<String> = None; 5440bce4e4cSHarlanC let mut start: Option<f64> = None; 5450bce4e4cSHarlanC let mut duration: Option<f64> = None; 5460bce4e4cSHarlanC let mut reset: Option<bool> = None; 5470bce4e4cSHarlanC 5480bce4e4cSHarlanC loop { 5490bce4e4cSHarlanC if index >= length { 5500bce4e4cSHarlanC break; 5510bce4e4cSHarlanC } 55285c0af6aSLuca Barbato index += 1; 5530bce4e4cSHarlanC stream_name = match other_values.remove(0) { 5540bce4e4cSHarlanC Amf0ValueType::UTF8String(val) => Some(val), 5550bce4e4cSHarlanC _ => None, 5560bce4e4cSHarlanC }; 5570bce4e4cSHarlanC 5580bce4e4cSHarlanC if index >= length { 5590bce4e4cSHarlanC break; 5600bce4e4cSHarlanC } 56185c0af6aSLuca Barbato index += 1; 5620bce4e4cSHarlanC start = match other_values.remove(0) { 5630bce4e4cSHarlanC Amf0ValueType::Number(val) => Some(val), 5640bce4e4cSHarlanC _ => None, 5650bce4e4cSHarlanC }; 5660bce4e4cSHarlanC 5670bce4e4cSHarlanC if index >= length { 5680bce4e4cSHarlanC break; 5690bce4e4cSHarlanC } 57085c0af6aSLuca Barbato index += 1; 5710bce4e4cSHarlanC duration = match other_values.remove(0) { 5720bce4e4cSHarlanC Amf0ValueType::Number(val) => Some(val), 5730bce4e4cSHarlanC _ => None, 5740bce4e4cSHarlanC }; 5750bce4e4cSHarlanC 5760bce4e4cSHarlanC if index >= length { 5770bce4e4cSHarlanC break; 5780bce4e4cSHarlanC } 579f9029ceaSHarlanC //index = index + 1; 5800bce4e4cSHarlanC reset = match other_values.remove(0) { 5810bce4e4cSHarlanC Amf0ValueType::Boolean(val) => Some(val), 5820bce4e4cSHarlanC _ => None, 5830bce4e4cSHarlanC }; 5840bce4e4cSHarlanC break; 5850bce4e4cSHarlanC } 58662d3fba8SHarlanC 58762d3fba8SHarlanC let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 58885c0af6aSLuca Barbato event_messages.write_stream_begin(*stream_id).await?; 5895de1eabbSHarlanC log::info!( 59062d3fba8SHarlanC "[ S->C ] [stream begin] app_name: {}, stream_name: {}", 59162d3fba8SHarlanC self.app_name, 59262d3fba8SHarlanC self.stream_name 59362d3fba8SHarlanC ); 59462d3fba8SHarlanC log::trace!( 59562d3fba8SHarlanC "{} {} {}", 5965de1eabbSHarlanC start.is_some(), 5975de1eabbSHarlanC duration.is_some(), 5985de1eabbSHarlanC reset.is_some() 5995de1eabbSHarlanC ); 6000bce4e4cSHarlanC 6018842de45SHarlanC let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 602383722b2SHarlanC netstream 6030ca99c20SHarlan .write_on_status(transaction_id, "status", "NetStream.Play.Reset", "reset") 6040ca99c20SHarlan .await?; 6050ca99c20SHarlan 6060ca99c20SHarlan netstream 6072f7fa101SHarlanC .write_on_status( 6080bce4e4cSHarlanC transaction_id, 6090ca99c20SHarlan "status", 6100ca99c20SHarlan "NetStream.Play.Start", 6110ca99c20SHarlan "play start", 612383722b2SHarlanC ) 613383722b2SHarlanC .await?; 6140bce4e4cSHarlanC 615383722b2SHarlanC netstream 6162f7fa101SHarlanC .write_on_status( 6170bce4e4cSHarlanC transaction_id, 6180ca99c20SHarlan "status", 6190ca99c20SHarlan "NetStream.Data.Start", 6200ca99c20SHarlan "data start.", 6217adc8486SHarlanC ) 6227adc8486SHarlanC .await?; 6237adc8486SHarlanC 6247adc8486SHarlanC netstream 6252f7fa101SHarlanC .write_on_status( 6267adc8486SHarlanC transaction_id, 6270ca99c20SHarlan "status", 6280ca99c20SHarlan "NetStream.Play.PublishNotify", 6290ca99c20SHarlan "play publish notify.", 6307adc8486SHarlanC ) 6317adc8486SHarlanC .await?; 6327adc8486SHarlanC 6330ca99c20SHarlan event_messages.write_stream_is_record(*stream_id).await?; 634f159b276SHarlan 635f159b276SHarlan let raw_stream_name = stream_name.unwrap(); 636c8d4d932SHarlan 637c8d4d932SHarlan (self.stream_name, self.url_parameters) = RtmpUrlParser::default() 638c8d4d932SHarlan .set_raw_stream_name(raw_stream_name.clone()) 639c8d4d932SHarlan .parse_raw_stream_name(); 640f159b276SHarlan 64162d3fba8SHarlanC log::info!( 642f159b276SHarlan "[ S->C ] [stream is record] app_name: {}, stream_name: {}, url parameters: {}", 64362d3fba8SHarlanC self.app_name, 644f159b276SHarlan self.stream_name, 645f159b276SHarlan self.url_parameters 64662d3fba8SHarlanC ); 647f159b276SHarlan 648976f65a6SHarlan /*Now it can update the request url*/ 649f159b276SHarlan self.common.request_url = self.get_request_url(raw_stream_name); 65092df423eSHarlanC self.common 651f159b276SHarlan .subscribe_from_channels( 652f159b276SHarlan self.app_name.clone(), 653f159b276SHarlan self.stream_name.clone(), 654f159b276SHarlan self.session_id, 655f159b276SHarlan ) 65692df423eSHarlanC .await?; 65730c61c6eSHarlanC 6581606f184SHarlanC self.state = ServerSessionState::Play; 6591606f184SHarlanC 6601606f184SHarlanC Ok(()) 6611606f184SHarlanC } 6621606f184SHarlanC on_publish( &mut self, transaction_id: &f64, stream_id: &u32, other_values: &mut Vec<Amf0ValueType>, ) -> Result<(), SessionError>66335844cc6SHarlanC pub async fn on_publish( 6640bce4e4cSHarlanC &mut self, 6650bce4e4cSHarlanC transaction_id: &f64, 6660bce4e4cSHarlanC stream_id: &u32, 6670bce4e4cSHarlanC other_values: &mut Vec<Amf0ValueType>, 6680bce4e4cSHarlanC ) -> Result<(), SessionError> { 6690bce4e4cSHarlanC let length = other_values.len(); 6700bce4e4cSHarlanC 6710bce4e4cSHarlanC if length < 2 { 6720bce4e4cSHarlanC return Err(SessionError { 6730bce4e4cSHarlanC value: SessionErrorValue::Amf0ValueCountNotCorrect, 6740bce4e4cSHarlanC }); 6750bce4e4cSHarlanC } 6760bce4e4cSHarlanC 677f159b276SHarlan let raw_stream_name = match other_values.remove(0) { 6780bce4e4cSHarlanC Amf0ValueType::UTF8String(val) => val, 6790bce4e4cSHarlanC _ => { 6800bce4e4cSHarlanC return Err(SessionError { 6810bce4e4cSHarlanC value: SessionErrorValue::Amf0ValueCountNotCorrect, 6820bce4e4cSHarlanC }); 6830bce4e4cSHarlanC } 6840bce4e4cSHarlanC }; 6850bce4e4cSHarlanC 686c8d4d932SHarlan (self.stream_name, self.url_parameters) = RtmpUrlParser::default() 687c8d4d932SHarlan .set_raw_stream_name(raw_stream_name.clone()) 688c8d4d932SHarlan .parse_raw_stream_name(); 689c8d4d932SHarlan 690976f65a6SHarlan /*Now it can update the request url*/ 691f159b276SHarlan self.common.request_url = self.get_request_url(raw_stream_name); 692a4485a6eSHarlanC 693f9029ceaSHarlanC let _ = match other_values.remove(0) { 6940bce4e4cSHarlanC Amf0ValueType::UTF8String(val) => val, 6950bce4e4cSHarlanC _ => { 6960bce4e4cSHarlanC return Err(SessionError { 6970bce4e4cSHarlanC value: SessionErrorValue::Amf0ValueCountNotCorrect, 6980bce4e4cSHarlanC }); 6990bce4e4cSHarlanC } 7000bce4e4cSHarlanC }; 7010bce4e4cSHarlanC 70262d3fba8SHarlanC log::info!( 703f159b276SHarlan "[ S<-C ] [publish] app_name: {}, stream_name: {}, url parameters: {}", 70462d3fba8SHarlanC self.app_name, 705f159b276SHarlan self.stream_name, 706f159b276SHarlan self.url_parameters 70762d3fba8SHarlanC ); 70862d3fba8SHarlanC 70962d3fba8SHarlanC log::info!( 710f159b276SHarlan "[ S->C ] [stream begin] app_name: {}, stream_name: {}, url parameters: {}", 71162d3fba8SHarlanC self.app_name, 712f159b276SHarlan self.stream_name, 713f159b276SHarlan self.url_parameters 71462d3fba8SHarlanC ); 71562d3fba8SHarlanC 71697f0b5afSHarlanC let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 71785c0af6aSLuca Barbato event_messages.write_stream_begin(*stream_id).await?; 7180bce4e4cSHarlanC 7198842de45SHarlanC let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 720383722b2SHarlanC netstream 7210ca99c20SHarlan .write_on_status(transaction_id, "status", "NetStream.Publish.Start", "") 722383722b2SHarlanC .await?; 72362d3fba8SHarlanC log::info!( 72462d3fba8SHarlanC "[ S->C ] [NetStream.Publish.Start] app_name: {}, stream_name: {}", 72562d3fba8SHarlanC self.app_name, 72662d3fba8SHarlanC self.stream_name 72762d3fba8SHarlanC ); 72835844cc6SHarlanC 72992df423eSHarlanC self.common 730976f65a6SHarlan .publish_to_channels( 731976f65a6SHarlan self.app_name.clone(), 732976f65a6SHarlan self.stream_name.clone(), 733976f65a6SHarlan self.session_id, 7348e71d710SHarlan self.gop_num, 735976f65a6SHarlan ) 73692df423eSHarlanC .await?; 7371606f184SHarlanC 7381606f184SHarlanC Ok(()) 7391606f184SHarlanC } 7400bce4e4cSHarlanC } 741