1*a4ef5d6cSHarlanC use crate::chunk::{errors::UnpackErrorValue, packetizer::ChunkPacketizer};
213bac29aSHarlan 
34435d79dSHarlanC use {
44435d79dSHarlanC     super::{
592df423eSHarlanC         common::Common,
64435d79dSHarlanC         define,
716394c08SHarlanC         define::SessionType,
84435d79dSHarlanC         errors::{SessionError, SessionErrorValue},
94435d79dSHarlanC     },
104435d79dSHarlanC     crate::{
114435d79dSHarlanC         amf0::Amf0ValueType,
124435d79dSHarlanC         chunk::{
132f7fa101SHarlanC             define::CHUNK_SIZE,
144435d79dSHarlanC             unpacketizer::{ChunkUnpacketizer, UnpackResult},
154435d79dSHarlanC         },
168a37108dSHarlanC         config, handshake,
172d9c981bSHarlanC         handshake::{define::ServerHandshakeState, handshake_server::HandshakeServer},
182f7fa101SHarlanC         messages::{define::RtmpMessageData, parser::MessageParser},
1988777e65SHarlan         netconnection::writer::{ConnectProperties, NetConnection},
204435d79dSHarlanC         netstream::writer::NetStreamWriter,
214435d79dSHarlanC         protocol_control_messages::writer::ProtocolControlMessagesWriter,
224435d79dSHarlanC         user_control_messages::writer::EventMessagesWriter,
23c8d4d932SHarlan         utils::RtmpUrlParser,
244435d79dSHarlanC     },
254435d79dSHarlanC     bytes::BytesMut,
268e71d710SHarlan     bytesio::{
278e71d710SHarlan         bytes_writer::AsyncBytesWriter,
288e71d710SHarlan         bytesio::{TNetIO, TcpIO},
298e71d710SHarlan     },
30c8d4d932SHarlan     indexmap::IndexMap,
31c8d4d932SHarlan     std::{sync::Arc, time::Duration},
328e71d710SHarlan     streamhub::{
338e71d710SHarlan         define::StreamHubEventSender,
348e71d710SHarlan         utils::{RandomDigitCount, Uuid},
358e71d710SHarlan     },
36b1840569SHarlanC     tokio::{net::TcpStream, sync::Mutex},
370bce4e4cSHarlanC };
380bce4e4cSHarlanC 
390bce4e4cSHarlanC enum ServerSessionState {
400bce4e4cSHarlanC     Handshake,
410bce4e4cSHarlanC     ReadChunk,
42f4c7b80bSHarlanC     // OnConnect,
43f4c7b80bSHarlanC     // OnCreateStream,
446fec14f0SHarlanC     //Publish,
458baa1d3cSHarlan     DeleteStream,
464b4045b2SHarlanC     Play,
470bce4e4cSHarlanC }
480bce4e4cSHarlanC 
49fe91dfa7SHarlanC pub struct ServerSession {
50740804e8SHarlanC     pub app_name: String,
51740804e8SHarlanC     pub stream_name: String,
52f159b276SHarlan     pub url_parameters: String,
538e71d710SHarlan     io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>,
542b020b03SHarlanC     handshaker: HandshakeServer,
554b4045b2SHarlanC     unpacketizer: ChunkUnpacketizer,
560bce4e4cSHarlanC     state: ServerSessionState,
5788325f54SHarlanC     bytesio_data: BytesMut,
585eed9a43Swawacry     has_remaing_data: bool,
5988d91efdSHarlanC     /* Used to mark the subscriber's the data producer
6088d91efdSHarlanC     in channels and delete it from map when unsubscribe
6188d91efdSHarlanC     is called. */
62976f65a6SHarlan     pub session_id: Uuid,
6388777e65SHarlan     connect_properties: ConnectProperties,
648e71d710SHarlan     pub common: Common,
658e71d710SHarlan     /*configure how many gops will be cached.*/
668e71d710SHarlan     gop_num: usize,
670bce4e4cSHarlanC }
680bce4e4cSHarlanC 
69fe91dfa7SHarlanC impl ServerSession {
new(stream: TcpStream, event_producer: StreamHubEventSender, gop_num: usize) -> Self708e71d710SHarlan     pub fn new(stream: TcpStream, event_producer: StreamHubEventSender, gop_num: usize) -> Self {
71976f65a6SHarlan         let remote_addr = if let Ok(addr) = stream.peer_addr() {
72976f65a6SHarlan             log::info!("server session: {}", addr.to_string());
73976f65a6SHarlan             Some(addr)
74976f65a6SHarlan         } else {
75976f65a6SHarlan             None
76976f65a6SHarlan         };
77976f65a6SHarlan 
788e71d710SHarlan         let tcp_io: Box<dyn TNetIO + Send + Sync> = Box::new(TcpIO::new(stream));
798e71d710SHarlan         let net_io = Arc::new(Mutex::new(tcp_io));
808e71d710SHarlan 
810bce4e4cSHarlanC         Self {
826fec14f0SHarlanC             app_name: String::from(""),
83cc18a6e9SHarlanC             stream_name: String::from(""),
84f159b276SHarlan             url_parameters: String::from(""),
8535844cc6SHarlanC             io: Arc::clone(&net_io),
862b020b03SHarlanC             handshaker: HandshakeServer::new(Arc::clone(&net_io)),
871606f184SHarlanC             unpacketizer: ChunkUnpacketizer::new(),
880bce4e4cSHarlanC             state: ServerSessionState::Handshake,
89976f65a6SHarlan             common: Common::new(
9013bac29aSHarlan                 Some(ChunkPacketizer::new(Arc::clone(&net_io))),
91976f65a6SHarlan                 event_producer,
92976f65a6SHarlan                 SessionType::Server,
93976f65a6SHarlan                 remote_addr,
94976f65a6SHarlan             ),
9513bac29aSHarlan             session_id: Uuid::new(RandomDigitCount::Four),
9688325f54SHarlanC             bytesio_data: BytesMut::new(),
975eed9a43Swawacry             has_remaing_data: false,
9888777e65SHarlan             connect_properties: ConnectProperties::default(),
998e71d710SHarlan             gop_num,
1000bce4e4cSHarlanC         }
1010bce4e4cSHarlanC     }
1020bce4e4cSHarlanC 
run(&mut self) -> Result<(), SessionError>1030bce4e4cSHarlanC     pub async fn run(&mut self) -> Result<(), SessionError> {
1040bce4e4cSHarlanC         loop {
1050bce4e4cSHarlanC             match self.state {
1060bce4e4cSHarlanC                 ServerSessionState::Handshake => {
107cc18a6e9SHarlanC                     self.handshake().await?;
108cc18a6e9SHarlanC                 }
109cc18a6e9SHarlanC                 ServerSessionState::ReadChunk => {
110cc18a6e9SHarlanC                     self.read_parse_chunks().await?;
111cc18a6e9SHarlanC                 }
112cc18a6e9SHarlanC                 ServerSessionState::Play => {
113cc18a6e9SHarlanC                     self.play().await?;
114cc18a6e9SHarlanC                 }
1158baa1d3cSHarlan                 ServerSessionState::DeleteStream => {
1168baa1d3cSHarlan                     return Ok(());
1178baa1d3cSHarlan                 }
118cc18a6e9SHarlanC             }
119fe91dfa7SHarlanC         }
1200bce4e4cSHarlanC 
121f9029ceaSHarlanC         //Ok(())
122cc18a6e9SHarlanC     }
123cc18a6e9SHarlanC 
handshake(&mut self) -> Result<(), SessionError>124cc18a6e9SHarlanC     async fn handshake(&mut self) -> Result<(), SessionError> {
1258a37108dSHarlanC         let mut bytes_len = 0;
1265eed9a43Swawacry 
1278a37108dSHarlanC         while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE {
1288a37108dSHarlanC             self.bytesio_data = self.io.lock().await.read().await?;
1298a37108dSHarlanC             bytes_len += self.bytesio_data.len();
13088325f54SHarlanC             self.handshaker.extend_data(&self.bytesio_data[..]);
1318a37108dSHarlanC         }
1328a37108dSHarlanC 
1332b020b03SHarlanC         self.handshaker.handshake().await?;
134fe91dfa7SHarlanC 
1350ca99c20SHarlan         if let ServerHandshakeState::Finish = self.handshaker.state() {
1360bce4e4cSHarlanC             self.state = ServerSessionState::ReadChunk;
1372b020b03SHarlanC             let left_bytes = self.handshaker.get_remaining_bytes();
13885c0af6aSLuca Barbato             if !left_bytes.is_empty() {
139cc18a6e9SHarlanC                 self.unpacketizer.extend_data(&left_bytes[..]);
1405eed9a43Swawacry                 self.has_remaing_data = true;
141fe91dfa7SHarlanC             }
1422d9c981bSHarlanC             log::info!("[ S->C ] [send_set_chunk_size] ");
1432b020b03SHarlanC             self.send_set_chunk_size().await?;
144cc18a6e9SHarlanC             return Ok(());
1454d5a5966SHarlanC         }
146cc18a6e9SHarlanC 
147cc18a6e9SHarlanC         Ok(())
148cc18a6e9SHarlanC     }
149cc18a6e9SHarlanC 
read_parse_chunks(&mut self) -> Result<(), SessionError>150cc18a6e9SHarlanC     async fn read_parse_chunks(&mut self) -> Result<(), SessionError> {
1515eed9a43Swawacry         if !self.has_remaing_data {
1525eed9a43Swawacry             match self
1535eed9a43Swawacry                 .io
1545eed9a43Swawacry                 .lock()
1555eed9a43Swawacry                 .await
1565eed9a43Swawacry                 .read_timeout(Duration::from_secs(2))
1575eed9a43Swawacry                 .await
1585eed9a43Swawacry             {
1595eed9a43Swawacry                 Ok(data) => {
1605eed9a43Swawacry                     self.bytesio_data = data;
1615eed9a43Swawacry                 }
1625eed9a43Swawacry                 Err(err) => {
1635eed9a43Swawacry                     self.common
164976f65a6SHarlan                         .unpublish_to_channels(
165976f65a6SHarlan                             self.app_name.clone(),
166976f65a6SHarlan                             self.stream_name.clone(),
167976f65a6SHarlan                             self.session_id,
168976f65a6SHarlan                         )
1695eed9a43Swawacry                         .await?;
1705eed9a43Swawacry 
1715eed9a43Swawacry                     return Err(SessionError {
1725eed9a43Swawacry                         value: SessionErrorValue::BytesIOError(err),
1735eed9a43Swawacry                     });
1745eed9a43Swawacry                 }
1755eed9a43Swawacry             }
1765eed9a43Swawacry 
17788325f54SHarlanC             self.unpacketizer.extend_data(&self.bytesio_data[..]);
178cc18a6e9SHarlanC         }
179cc18a6e9SHarlanC 
1805eed9a43Swawacry         self.has_remaing_data = false;
1814d5a5966SHarlanC 
18292986251SHarlanC         loop {
183*a4ef5d6cSHarlanC             match self.unpacketizer.read_chunks() {
184*a4ef5d6cSHarlanC                 Ok(rv) => {
1850ca99c20SHarlan                     if let UnpackResult::Chunks(chunks) = rv {
186f3a36955SHarlanC                         for chunk_info in chunks {
1874b4045b2SHarlanC                             let timestamp = chunk_info.message_header.timestamp;
188f3a36955SHarlanC                             let msg_stream_id = chunk_info.message_header.msg_streamd_id;
189f3a36955SHarlanC 
19069de9bbdSHarlanC                             if let Some(mut msg) = MessageParser::new(chunk_info).parse()? {
1914b4045b2SHarlanC                                 self.process_messages(&mut msg, &msg_stream_id, &timestamp)
1924b4045b2SHarlanC                                     .await?;
1930bce4e4cSHarlanC                             }
1944745db95SHarlanC                         }
19569de9bbdSHarlanC                     }
196*a4ef5d6cSHarlanC                 }
197*a4ef5d6cSHarlanC                 Err(err) => {
198*a4ef5d6cSHarlanC                     if let UnpackErrorValue::CannotParse = err.value {
199*a4ef5d6cSHarlanC                         self.common
200*a4ef5d6cSHarlanC                             .unpublish_to_channels(
201*a4ef5d6cSHarlanC                                 self.app_name.clone(),
202*a4ef5d6cSHarlanC                                 self.stream_name.clone(),
203*a4ef5d6cSHarlanC                                 self.session_id,
204*a4ef5d6cSHarlanC                             )
205*a4ef5d6cSHarlanC                             .await?;
206*a4ef5d6cSHarlanC                         return Err(err)?;
207*a4ef5d6cSHarlanC                     }
208cc18a6e9SHarlanC                     break;
209cc18a6e9SHarlanC                 }
210cc18a6e9SHarlanC             }
211*a4ef5d6cSHarlanC         }
212cc18a6e9SHarlanC         Ok(())
213cc18a6e9SHarlanC     }
214cc18a6e9SHarlanC 
play(&mut self) -> Result<(), SessionError>215cc18a6e9SHarlanC     async fn play(&mut self) -> Result<(), SessionError> {
21692df423eSHarlanC         match self.common.send_channel_data().await {
217cc18a6e9SHarlanC             Ok(_) => {}
218f8169385SHarlanC             Err(err) => {
21992df423eSHarlanC                 self.common
22092df423eSHarlanC                     .unsubscribe_from_channels(
22192df423eSHarlanC                         self.app_name.clone(),
22292df423eSHarlanC                         self.stream_name.clone(),
223976f65a6SHarlan                         self.session_id,
22492df423eSHarlanC                     )
22592df423eSHarlanC                     .await?;
226f8169385SHarlanC                 return Err(err);
2270bce4e4cSHarlanC             }
22892986251SHarlanC         }
2291606f184SHarlanC 
230cc18a6e9SHarlanC         Ok(())
231cc18a6e9SHarlanC     }
232cc18a6e9SHarlanC 
send_set_chunk_size(&mut self) -> Result<(), SessionError>233e0acb0c0SHarlanC     pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> {
234383722b2SHarlanC         let mut controlmessage =
235383722b2SHarlanC             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
236e0acb0c0SHarlanC         controlmessage.write_set_chunk_size(CHUNK_SIZE).await?;
2370bce4e4cSHarlanC 
2380bce4e4cSHarlanC         Ok(())
2390bce4e4cSHarlanC     }
2400bce4e4cSHarlanC 
process_messages( &mut self, rtmp_msg: &mut RtmpMessageData, msg_stream_id: &u32, timestamp: &u32, ) -> Result<(), SessionError>24135844cc6SHarlanC     pub async fn process_messages(
2420bce4e4cSHarlanC         &mut self,
2434b4045b2SHarlanC         rtmp_msg: &mut RtmpMessageData,
2440bce4e4cSHarlanC         msg_stream_id: &u32,
2454b4045b2SHarlanC         timestamp: &u32,
2460bce4e4cSHarlanC     ) -> Result<(), SessionError> {
2470bce4e4cSHarlanC         match rtmp_msg {
2484b4045b2SHarlanC             RtmpMessageData::Amf0Command {
2490bce4e4cSHarlanC                 command_name,
2500bce4e4cSHarlanC                 transaction_id,
2510bce4e4cSHarlanC                 command_object,
2520bce4e4cSHarlanC                 others,
25335844cc6SHarlanC             } => {
254cc18a6e9SHarlanC                 self.on_amf0_command_message(
2550bce4e4cSHarlanC                     msg_stream_id,
2560bce4e4cSHarlanC                     command_name,
2570bce4e4cSHarlanC                     transaction_id,
2580bce4e4cSHarlanC                     command_object,
2590bce4e4cSHarlanC                     others,
26035844cc6SHarlanC                 )
26135844cc6SHarlanC                 .await?
26235844cc6SHarlanC             }
26392986251SHarlanC             RtmpMessageData::SetChunkSize { chunk_size } => {
26485c0af6aSLuca Barbato                 self.on_set_chunk_size(*chunk_size as usize)?;
26592986251SHarlanC             }
2664b4045b2SHarlanC             RtmpMessageData::AudioData { data } => {
2678e71d710SHarlan                 self.common.on_audio_data(data, timestamp).await?;
2684b4045b2SHarlanC             }
2694b4045b2SHarlanC             RtmpMessageData::VideoData { data } => {
2708e71d710SHarlan                 self.common.on_video_data(data, timestamp).await?;
2714b4045b2SHarlanC             }
27261bf3e1bSHarlanC             RtmpMessageData::AmfData { raw_data } => {
2738e71d710SHarlan                 self.common.on_meta_data(raw_data, timestamp).await?;
2747adc8486SHarlanC             }
2750bce4e4cSHarlanC 
2760bce4e4cSHarlanC             _ => {}
2770bce4e4cSHarlanC         }
2780bce4e4cSHarlanC         Ok(())
2790bce4e4cSHarlanC     }
2800bce4e4cSHarlanC 
on_amf0_command_message( &mut self, stream_id: &u32, command_name: &Amf0ValueType, transaction_id: &Amf0ValueType, command_object: &Amf0ValueType, others: &mut Vec<Amf0ValueType>, ) -> Result<(), SessionError>281cc18a6e9SHarlanC     pub async fn on_amf0_command_message(
2820bce4e4cSHarlanC         &mut self,
2830bce4e4cSHarlanC         stream_id: &u32,
2840bce4e4cSHarlanC         command_name: &Amf0ValueType,
2850bce4e4cSHarlanC         transaction_id: &Amf0ValueType,
2860bce4e4cSHarlanC         command_object: &Amf0ValueType,
2870bce4e4cSHarlanC         others: &mut Vec<Amf0ValueType>,
2880bce4e4cSHarlanC     ) -> Result<(), SessionError> {
2890bce4e4cSHarlanC         let empty_cmd_name = &String::new();
2900bce4e4cSHarlanC         let cmd_name = match command_name {
2910bce4e4cSHarlanC             Amf0ValueType::UTF8String(str) => str,
2920bce4e4cSHarlanC             _ => empty_cmd_name,
2930bce4e4cSHarlanC         };
2940bce4e4cSHarlanC 
2950bce4e4cSHarlanC         let transaction_id = match transaction_id {
2960bce4e4cSHarlanC             Amf0ValueType::Number(number) => number,
2970bce4e4cSHarlanC             _ => &0.0,
2980bce4e4cSHarlanC         };
2990bce4e4cSHarlanC 
300c8d4d932SHarlan         let empty_cmd_obj: IndexMap<String, Amf0ValueType> = IndexMap::new();
3010bce4e4cSHarlanC         let obj = match command_object {
3020bce4e4cSHarlanC             Amf0ValueType::Object(obj) => obj,
3030bce4e4cSHarlanC             _ => &empty_cmd_obj,
3040bce4e4cSHarlanC         };
3050bce4e4cSHarlanC 
3060bce4e4cSHarlanC         match cmd_name.as_str() {
3070bce4e4cSHarlanC             "connect" => {
30862d3fba8SHarlanC                 log::info!("[ S<-C ] [connect] ");
30985c0af6aSLuca Barbato                 self.on_connect(transaction_id, obj).await?;
3100bce4e4cSHarlanC             }
3110bce4e4cSHarlanC             "createStream" => {
31262d3fba8SHarlanC                 log::info!("[ S<-C ] [create stream] ");
313fe91dfa7SHarlanC                 self.on_create_stream(transaction_id).await?;
3140bce4e4cSHarlanC             }
3150bce4e4cSHarlanC             "deleteStream" => {
31685c0af6aSLuca Barbato                 if !others.is_empty() {
3170bce4e4cSHarlanC                     let stream_id = match others.pop() {
3180ca99c20SHarlan                         Some(Amf0ValueType::Number(streamid)) => streamid,
3190bce4e4cSHarlanC                         _ => 0.0,
3200bce4e4cSHarlanC                     };
3210ca99c20SHarlan 
32262d3fba8SHarlanC                     log::info!(
32362d3fba8SHarlanC                         "[ S<-C ] [delete stream] app_name: {}, stream_name: {}",
32462d3fba8SHarlanC                         self.app_name,
32562d3fba8SHarlanC                         self.stream_name
32662d3fba8SHarlanC                     );
32762d3fba8SHarlanC 
32835844cc6SHarlanC                     self.on_delete_stream(transaction_id, &stream_id).await?;
3298baa1d3cSHarlan                     self.state = ServerSessionState::DeleteStream;
3300bce4e4cSHarlanC                 }
3310bce4e4cSHarlanC             }
3320bce4e4cSHarlanC             "play" => {
33362d3fba8SHarlanC                 log::info!(
33462d3fba8SHarlanC                     "[ S<-C ] [play]  app_name: {}, stream_name: {}",
33562d3fba8SHarlanC                     self.app_name,
33662d3fba8SHarlanC                     self.stream_name
33762d3fba8SHarlanC                 );
338cc18a6e9SHarlanC                 self.unpacketizer.session_type = config::SERVER_PULL;
33935844cc6SHarlanC                 self.on_play(transaction_id, stream_id, others).await?;
3400bce4e4cSHarlanC             }
3410bce4e4cSHarlanC             "publish" => {
342cc18a6e9SHarlanC                 self.unpacketizer.session_type = config::SERVER_PUSH;
34335844cc6SHarlanC                 self.on_publish(transaction_id, stream_id, others).await?;
3440bce4e4cSHarlanC             }
3450bce4e4cSHarlanC             _ => {}
3460bce4e4cSHarlanC         }
3470bce4e4cSHarlanC 
3480bce4e4cSHarlanC         Ok(())
3490bce4e4cSHarlanC     }
3500bce4e4cSHarlanC 
on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError>35192986251SHarlanC     fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> {
3527cd03d07SHarlan         log::info!(
3537cd03d07SHarlan             "[ S<-C ] [set chunk size]  app_name: {}, stream_name: {}, chunk size: {}",
3547cd03d07SHarlan             self.app_name,
3557cd03d07SHarlan             self.stream_name,
3567cd03d07SHarlan             chunk_size
3577cd03d07SHarlan         );
35892986251SHarlanC         self.unpacketizer.update_max_chunk_size(chunk_size);
35992986251SHarlanC         Ok(())
36092986251SHarlanC     }
36192986251SHarlanC 
parse_connect_properties(&mut self, command_obj: &IndexMap<String, Amf0ValueType>)362c8d4d932SHarlan     fn parse_connect_properties(&mut self, command_obj: &IndexMap<String, Amf0ValueType>) {
36353b16235SHarlan         for (property, value) in command_obj {
36453b16235SHarlan             match property.as_str() {
36588777e65SHarlan                 "app" => {
36653b16235SHarlan                     if let Amf0ValueType::UTF8String(app) = value {
36788777e65SHarlan                         self.connect_properties.app = Some(app.clone());
36888777e65SHarlan                     }
36988777e65SHarlan                 }
37088777e65SHarlan                 "flashVer" => {
37153b16235SHarlan                     if let Amf0ValueType::UTF8String(flash_ver) = value {
37288777e65SHarlan                         self.connect_properties.flash_ver = Some(flash_ver.clone());
37388777e65SHarlan                     }
37488777e65SHarlan                 }
37588777e65SHarlan                 "swfUrl" => {
37653b16235SHarlan                     if let Amf0ValueType::UTF8String(swf_url) = value {
37788777e65SHarlan                         self.connect_properties.swf_url = Some(swf_url.clone());
37888777e65SHarlan                     }
37988777e65SHarlan                 }
38088777e65SHarlan                 "tcUrl" => {
38153b16235SHarlan                     if let Amf0ValueType::UTF8String(tc_url) = value {
38288777e65SHarlan                         self.connect_properties.tc_url = Some(tc_url.clone());
38388777e65SHarlan                     }
38488777e65SHarlan                 }
38588777e65SHarlan                 "fpad" => {
38653b16235SHarlan                     if let Amf0ValueType::Boolean(fpad) = value {
38788777e65SHarlan                         self.connect_properties.fpad = Some(*fpad);
38888777e65SHarlan                     }
38988777e65SHarlan                 }
39088777e65SHarlan                 "audioCodecs" => {
39153b16235SHarlan                     if let Amf0ValueType::Number(audio_codecs) = value {
39288777e65SHarlan                         self.connect_properties.audio_codecs = Some(*audio_codecs);
39388777e65SHarlan                     }
39488777e65SHarlan                 }
39588777e65SHarlan                 "videoCodecs" => {
39653b16235SHarlan                     if let Amf0ValueType::Number(video_codecs) = value {
39788777e65SHarlan                         self.connect_properties.video_codecs = Some(*video_codecs);
39888777e65SHarlan                     }
39988777e65SHarlan                 }
40088777e65SHarlan                 "videoFunction" => {
40153b16235SHarlan                     if let Amf0ValueType::Number(video_function) = value {
40288777e65SHarlan                         self.connect_properties.video_function = Some(*video_function);
40388777e65SHarlan                     }
40488777e65SHarlan                 }
40588777e65SHarlan                 "pageUrl" => {
40653b16235SHarlan                     if let Amf0ValueType::UTF8String(page_url) = value {
40788777e65SHarlan                         self.connect_properties.page_url = Some(page_url.clone());
40888777e65SHarlan                     }
40988777e65SHarlan                 }
41088777e65SHarlan                 "objectEncoding" => {
41153b16235SHarlan                     if let Amf0ValueType::Number(object_encoding) = value {
41288777e65SHarlan                         self.connect_properties.object_encoding = Some(*object_encoding);
41388777e65SHarlan                     }
41488777e65SHarlan                 }
41588777e65SHarlan                 _ => {
41653b16235SHarlan                     log::warn!("unknown connect properties: {}:{:?}", property, value);
41788777e65SHarlan                 }
41888777e65SHarlan             }
41988777e65SHarlan         }
42088777e65SHarlan     }
42188777e65SHarlan 
on_connect( &mut self, transaction_id: &f64, command_obj: &IndexMap<String, Amf0ValueType>, ) -> Result<(), SessionError>42235844cc6SHarlanC     async fn on_connect(
4230bce4e4cSHarlanC         &mut self,
4240bce4e4cSHarlanC         transaction_id: &f64,
425c8d4d932SHarlan         command_obj: &IndexMap<String, Amf0ValueType>,
4260bce4e4cSHarlanC     ) -> Result<(), SessionError> {
42788777e65SHarlan         self.parse_connect_properties(command_obj);
42888777e65SHarlan         log::info!("connect properties: {:?}", self.connect_properties);
429383722b2SHarlanC         let mut control_message =
430383722b2SHarlanC             ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
43162d3fba8SHarlanC         log::info!("[ S->C ] [set window_acknowledgement_size]");
4324745db95SHarlanC         control_message
4334745db95SHarlanC             .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)
4344745db95SHarlanC             .await?;
4352d9c981bSHarlanC 
43662d3fba8SHarlanC         log::info!("[ S->C ] [set set_peer_bandwidth]",);
4374745db95SHarlanC         control_message
4384745db95SHarlanC             .write_set_peer_bandwidth(
4390bce4e4cSHarlanC                 define::PEER_BANDWIDTH,
440f9029ceaSHarlanC                 define::peer_bandwidth_limit_type::DYNAMIC,
4414745db95SHarlanC             )
4424745db95SHarlanC             .await?;
4430bce4e4cSHarlanC 
4440bce4e4cSHarlanC         let obj_encoding = command_obj.get("objectEncoding");
4450bce4e4cSHarlanC         let encoding = match obj_encoding {
4460bce4e4cSHarlanC             Some(Amf0ValueType::Number(encoding)) => encoding,
4470bce4e4cSHarlanC             _ => &define::OBJENCODING_AMF0,
4480bce4e4cSHarlanC         };
4490bce4e4cSHarlanC 
4506fec14f0SHarlanC         let app_name = command_obj.get("app");
4516fec14f0SHarlanC         self.app_name = match app_name {
4526fec14f0SHarlanC             Some(Amf0ValueType::UTF8String(app)) => app.clone(),
4536fec14f0SHarlanC             _ => {
4546fec14f0SHarlanC                 return Err(SessionError {
4556fec14f0SHarlanC                     value: SessionErrorValue::NoAppName,
4566fec14f0SHarlanC                 });
4576fec14f0SHarlanC             }
4586fec14f0SHarlanC         };
4596fec14f0SHarlanC 
4602f7fa101SHarlanC         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
4612d9c981bSHarlanC         log::info!("[ S->C ] [set connect_response]",);
4622f7fa101SHarlanC         netconnection
4632f7fa101SHarlanC             .write_connect_response(
46485c0af6aSLuca Barbato                 transaction_id,
4650ca99c20SHarlan                 define::FMSVER,
4660bce4e4cSHarlanC                 &define::CAPABILITIES,
4670bce4e4cSHarlanC                 &String::from("NetConnection.Connect.Success"),
4680ca99c20SHarlan                 define::LEVEL,
4690bce4e4cSHarlanC                 &String::from("Connection Succeeded."),
4700bce4e4cSHarlanC                 encoding,
4712f7fa101SHarlanC             )
4722f7fa101SHarlanC             .await?;
47335844cc6SHarlanC 
4740bce4e4cSHarlanC         Ok(())
4750bce4e4cSHarlanC     }
4760bce4e4cSHarlanC 
on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError>477fe91dfa7SHarlanC     pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> {
4782f7fa101SHarlanC         let mut netconnection = NetConnection::new(Arc::clone(&self.io));
4792f7fa101SHarlanC         netconnection
4802f7fa101SHarlanC             .write_create_stream_response(transaction_id, &define::STREAM_ID)
4812f7fa101SHarlanC             .await?;
4820bce4e4cSHarlanC 
48362d3fba8SHarlanC         log::info!(
48462d3fba8SHarlanC             "[ S->C ] [create_stream_response]  app_name: {}",
48562d3fba8SHarlanC             self.app_name,
48662d3fba8SHarlanC         );
48762d3fba8SHarlanC 
4880bce4e4cSHarlanC         Ok(())
4890bce4e4cSHarlanC     }
4900bce4e4cSHarlanC 
on_delete_stream( &mut self, transaction_id: &f64, stream_id: &f64, ) -> Result<(), SessionError>49135844cc6SHarlanC     pub async fn on_delete_stream(
4920bce4e4cSHarlanC         &mut self,
4930bce4e4cSHarlanC         transaction_id: &f64,
4940bce4e4cSHarlanC         stream_id: &f64,
4950bce4e4cSHarlanC     ) -> Result<(), SessionError> {
49692df423eSHarlanC         self.common
497976f65a6SHarlan             .unpublish_to_channels(
498976f65a6SHarlan                 self.app_name.clone(),
499976f65a6SHarlan                 self.stream_name.clone(),
500976f65a6SHarlan                 self.session_id,
501976f65a6SHarlan             )
50292df423eSHarlanC             .await?;
503a4485a6eSHarlanC 
5048842de45SHarlanC         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
505383722b2SHarlanC         netstream
5062f7fa101SHarlanC             .write_on_status(
5070bce4e4cSHarlanC                 transaction_id,
5080ca99c20SHarlan                 "status",
5090ca99c20SHarlan                 "NetStream.DeleteStream.Suceess",
5100ca99c20SHarlan                 "",
511383722b2SHarlanC             )
512383722b2SHarlanC             .await?;
5130bce4e4cSHarlanC 
514f8169385SHarlanC         //self.unsubscribe_from_channels().await?;
51562d3fba8SHarlanC         log::info!(
51662d3fba8SHarlanC             "[ S->C ] [delete stream success]  app_name: {}, stream_name: {}",
51762d3fba8SHarlanC             self.app_name,
51862d3fba8SHarlanC             self.stream_name
51962d3fba8SHarlanC         );
52062d3fba8SHarlanC         log::trace!("{}", stream_id);
521cc18a6e9SHarlanC 
5220bce4e4cSHarlanC         Ok(())
5230bce4e4cSHarlanC     }
5240ca99c20SHarlan 
get_request_url(&mut self, raw_stream_name: String) -> String525f159b276SHarlan     fn get_request_url(&mut self, raw_stream_name: String) -> String {
526976f65a6SHarlan         if let Some(tc_url) = &self.connect_properties.tc_url {
527248cdac6SHarlan             format!("{tc_url}/{raw_stream_name}")
528976f65a6SHarlan         } else {
529f159b276SHarlan             format!("{}/{}", self.app_name.clone(), raw_stream_name)
530f159b276SHarlan         }
531f159b276SHarlan     }
532976f65a6SHarlan 
5330ca99c20SHarlan     #[allow(clippy::never_loop)]
on_play( &mut self, transaction_id: &f64, stream_id: &u32, other_values: &mut Vec<Amf0ValueType>, ) -> Result<(), SessionError>53435844cc6SHarlanC     pub async fn on_play(
5350bce4e4cSHarlanC         &mut self,
5360bce4e4cSHarlanC         transaction_id: &f64,
5370bce4e4cSHarlanC         stream_id: &u32,
5380bce4e4cSHarlanC         other_values: &mut Vec<Amf0ValueType>,
5390bce4e4cSHarlanC     ) -> Result<(), SessionError> {
5400bce4e4cSHarlanC         let length = other_values.len() as u8;
5410bce4e4cSHarlanC         let mut index: u8 = 0;
5420bce4e4cSHarlanC 
5430bce4e4cSHarlanC         let mut stream_name: Option<String> = None;
5440bce4e4cSHarlanC         let mut start: Option<f64> = None;
5450bce4e4cSHarlanC         let mut duration: Option<f64> = None;
5460bce4e4cSHarlanC         let mut reset: Option<bool> = None;
5470bce4e4cSHarlanC 
5480bce4e4cSHarlanC         loop {
5490bce4e4cSHarlanC             if index >= length {
5500bce4e4cSHarlanC                 break;
5510bce4e4cSHarlanC             }
55285c0af6aSLuca Barbato             index += 1;
5530bce4e4cSHarlanC             stream_name = match other_values.remove(0) {
5540bce4e4cSHarlanC                 Amf0ValueType::UTF8String(val) => Some(val),
5550bce4e4cSHarlanC                 _ => None,
5560bce4e4cSHarlanC             };
5570bce4e4cSHarlanC 
5580bce4e4cSHarlanC             if index >= length {
5590bce4e4cSHarlanC                 break;
5600bce4e4cSHarlanC             }
56185c0af6aSLuca Barbato             index += 1;
5620bce4e4cSHarlanC             start = match other_values.remove(0) {
5630bce4e4cSHarlanC                 Amf0ValueType::Number(val) => Some(val),
5640bce4e4cSHarlanC                 _ => None,
5650bce4e4cSHarlanC             };
5660bce4e4cSHarlanC 
5670bce4e4cSHarlanC             if index >= length {
5680bce4e4cSHarlanC                 break;
5690bce4e4cSHarlanC             }
57085c0af6aSLuca Barbato             index += 1;
5710bce4e4cSHarlanC             duration = match other_values.remove(0) {
5720bce4e4cSHarlanC                 Amf0ValueType::Number(val) => Some(val),
5730bce4e4cSHarlanC                 _ => None,
5740bce4e4cSHarlanC             };
5750bce4e4cSHarlanC 
5760bce4e4cSHarlanC             if index >= length {
5770bce4e4cSHarlanC                 break;
5780bce4e4cSHarlanC             }
579f9029ceaSHarlanC             //index = index + 1;
5800bce4e4cSHarlanC             reset = match other_values.remove(0) {
5810bce4e4cSHarlanC                 Amf0ValueType::Boolean(val) => Some(val),
5820bce4e4cSHarlanC                 _ => None,
5830bce4e4cSHarlanC             };
5840bce4e4cSHarlanC             break;
5850bce4e4cSHarlanC         }
58662d3fba8SHarlanC 
58762d3fba8SHarlanC         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
58885c0af6aSLuca Barbato         event_messages.write_stream_begin(*stream_id).await?;
5895de1eabbSHarlanC         log::info!(
59062d3fba8SHarlanC             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}",
59162d3fba8SHarlanC             self.app_name,
59262d3fba8SHarlanC             self.stream_name
59362d3fba8SHarlanC         );
59462d3fba8SHarlanC         log::trace!(
59562d3fba8SHarlanC             "{} {} {}",
5965de1eabbSHarlanC             start.is_some(),
5975de1eabbSHarlanC             duration.is_some(),
5985de1eabbSHarlanC             reset.is_some()
5995de1eabbSHarlanC         );
6000bce4e4cSHarlanC 
6018842de45SHarlanC         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
602383722b2SHarlanC         netstream
6030ca99c20SHarlan             .write_on_status(transaction_id, "status", "NetStream.Play.Reset", "reset")
6040ca99c20SHarlan             .await?;
6050ca99c20SHarlan 
6060ca99c20SHarlan         netstream
6072f7fa101SHarlanC             .write_on_status(
6080bce4e4cSHarlanC                 transaction_id,
6090ca99c20SHarlan                 "status",
6100ca99c20SHarlan                 "NetStream.Play.Start",
6110ca99c20SHarlan                 "play start",
612383722b2SHarlanC             )
613383722b2SHarlanC             .await?;
6140bce4e4cSHarlanC 
615383722b2SHarlanC         netstream
6162f7fa101SHarlanC             .write_on_status(
6170bce4e4cSHarlanC                 transaction_id,
6180ca99c20SHarlan                 "status",
6190ca99c20SHarlan                 "NetStream.Data.Start",
6200ca99c20SHarlan                 "data start.",
6217adc8486SHarlanC             )
6227adc8486SHarlanC             .await?;
6237adc8486SHarlanC 
6247adc8486SHarlanC         netstream
6252f7fa101SHarlanC             .write_on_status(
6267adc8486SHarlanC                 transaction_id,
6270ca99c20SHarlan                 "status",
6280ca99c20SHarlan                 "NetStream.Play.PublishNotify",
6290ca99c20SHarlan                 "play publish notify.",
6307adc8486SHarlanC             )
6317adc8486SHarlanC             .await?;
6327adc8486SHarlanC 
6330ca99c20SHarlan         event_messages.write_stream_is_record(*stream_id).await?;
634f159b276SHarlan 
635f159b276SHarlan         let raw_stream_name = stream_name.unwrap();
636c8d4d932SHarlan 
637c8d4d932SHarlan         (self.stream_name, self.url_parameters) = RtmpUrlParser::default()
638c8d4d932SHarlan             .set_raw_stream_name(raw_stream_name.clone())
639c8d4d932SHarlan             .parse_raw_stream_name();
640f159b276SHarlan 
64162d3fba8SHarlanC         log::info!(
642f159b276SHarlan             "[ S->C ] [stream is record]  app_name: {}, stream_name: {}, url parameters: {}",
64362d3fba8SHarlanC             self.app_name,
644f159b276SHarlan             self.stream_name,
645f159b276SHarlan             self.url_parameters
64662d3fba8SHarlanC         );
647f159b276SHarlan 
648976f65a6SHarlan         /*Now it can update the request url*/
649f159b276SHarlan         self.common.request_url = self.get_request_url(raw_stream_name);
65092df423eSHarlanC         self.common
651f159b276SHarlan             .subscribe_from_channels(
652f159b276SHarlan                 self.app_name.clone(),
653f159b276SHarlan                 self.stream_name.clone(),
654f159b276SHarlan                 self.session_id,
655f159b276SHarlan             )
65692df423eSHarlanC             .await?;
65730c61c6eSHarlanC 
6581606f184SHarlanC         self.state = ServerSessionState::Play;
6591606f184SHarlanC 
6601606f184SHarlanC         Ok(())
6611606f184SHarlanC     }
6621606f184SHarlanC 
on_publish( &mut self, transaction_id: &f64, stream_id: &u32, other_values: &mut Vec<Amf0ValueType>, ) -> Result<(), SessionError>66335844cc6SHarlanC     pub async fn on_publish(
6640bce4e4cSHarlanC         &mut self,
6650bce4e4cSHarlanC         transaction_id: &f64,
6660bce4e4cSHarlanC         stream_id: &u32,
6670bce4e4cSHarlanC         other_values: &mut Vec<Amf0ValueType>,
6680bce4e4cSHarlanC     ) -> Result<(), SessionError> {
6690bce4e4cSHarlanC         let length = other_values.len();
6700bce4e4cSHarlanC 
6710bce4e4cSHarlanC         if length < 2 {
6720bce4e4cSHarlanC             return Err(SessionError {
6730bce4e4cSHarlanC                 value: SessionErrorValue::Amf0ValueCountNotCorrect,
6740bce4e4cSHarlanC             });
6750bce4e4cSHarlanC         }
6760bce4e4cSHarlanC 
677f159b276SHarlan         let raw_stream_name = match other_values.remove(0) {
6780bce4e4cSHarlanC             Amf0ValueType::UTF8String(val) => val,
6790bce4e4cSHarlanC             _ => {
6800bce4e4cSHarlanC                 return Err(SessionError {
6810bce4e4cSHarlanC                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
6820bce4e4cSHarlanC                 });
6830bce4e4cSHarlanC             }
6840bce4e4cSHarlanC         };
6850bce4e4cSHarlanC 
686c8d4d932SHarlan         (self.stream_name, self.url_parameters) = RtmpUrlParser::default()
687c8d4d932SHarlan             .set_raw_stream_name(raw_stream_name.clone())
688c8d4d932SHarlan             .parse_raw_stream_name();
689c8d4d932SHarlan 
690976f65a6SHarlan         /*Now it can update the request url*/
691f159b276SHarlan         self.common.request_url = self.get_request_url(raw_stream_name);
692a4485a6eSHarlanC 
693f9029ceaSHarlanC         let _ = match other_values.remove(0) {
6940bce4e4cSHarlanC             Amf0ValueType::UTF8String(val) => val,
6950bce4e4cSHarlanC             _ => {
6960bce4e4cSHarlanC                 return Err(SessionError {
6970bce4e4cSHarlanC                     value: SessionErrorValue::Amf0ValueCountNotCorrect,
6980bce4e4cSHarlanC                 });
6990bce4e4cSHarlanC             }
7000bce4e4cSHarlanC         };
7010bce4e4cSHarlanC 
70262d3fba8SHarlanC         log::info!(
703f159b276SHarlan             "[ S<-C ] [publish]  app_name: {}, stream_name: {}, url parameters: {}",
70462d3fba8SHarlanC             self.app_name,
705f159b276SHarlan             self.stream_name,
706f159b276SHarlan             self.url_parameters
70762d3fba8SHarlanC         );
70862d3fba8SHarlanC 
70962d3fba8SHarlanC         log::info!(
710f159b276SHarlan             "[ S->C ] [stream begin]  app_name: {}, stream_name: {}, url parameters: {}",
71162d3fba8SHarlanC             self.app_name,
712f159b276SHarlan             self.stream_name,
713f159b276SHarlan             self.url_parameters
71462d3fba8SHarlanC         );
71562d3fba8SHarlanC 
71697f0b5afSHarlanC         let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone()));
71785c0af6aSLuca Barbato         event_messages.write_stream_begin(*stream_id).await?;
7180bce4e4cSHarlanC 
7198842de45SHarlanC         let mut netstream = NetStreamWriter::new(Arc::clone(&self.io));
720383722b2SHarlanC         netstream
7210ca99c20SHarlan             .write_on_status(transaction_id, "status", "NetStream.Publish.Start", "")
722383722b2SHarlanC             .await?;
72362d3fba8SHarlanC         log::info!(
72462d3fba8SHarlanC             "[ S->C ] [NetStream.Publish.Start]  app_name: {}, stream_name: {}",
72562d3fba8SHarlanC             self.app_name,
72662d3fba8SHarlanC             self.stream_name
72762d3fba8SHarlanC         );
72835844cc6SHarlanC 
72992df423eSHarlanC         self.common
730976f65a6SHarlan             .publish_to_channels(
731976f65a6SHarlan                 self.app_name.clone(),
732976f65a6SHarlan                 self.stream_name.clone(),
733976f65a6SHarlan                 self.session_id,
7348e71d710SHarlan                 self.gop_num,
735976f65a6SHarlan             )
73692df423eSHarlanC             .await?;
7371606f184SHarlanC 
7381606f184SHarlanC         Ok(())
7391606f184SHarlanC     }
7400bce4e4cSHarlanC }
741