1 use uuid::Uuid; 2 3 use { 4 super::{ 5 common::Common, 6 define, 7 define::SessionType, 8 errors::{SessionError, SessionErrorValue}, 9 }, 10 //crate::utils::print::print, 11 crate::{ 12 amf0::Amf0ValueType, 13 channels::define::ChannelEventProducer, 14 chunk::{ 15 define::CHUNK_SIZE, 16 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 17 }, 18 handshake::{define::ClientHandshakeState, handshake_client::SimpleHandshakeClient}, 19 messages::{define::RtmpMessageData, parser::MessageParser}, 20 netconnection::writer::{ConnectProperties, NetConnection}, 21 netstream::writer::NetStreamWriter, 22 protocol_control_messages::writer::ProtocolControlMessagesWriter, 23 user_control_messages::writer::EventMessagesWriter, 24 }, 25 bytesio::{bytes_writer::AsyncBytesWriter, bytesio::BytesIO}, 26 std::{collections::HashMap, sync::Arc}, 27 tokio::{net::TcpStream, sync::Mutex}, 28 }; 29 30 #[allow(dead_code)] 31 enum ClientSessionState { 32 Handshake, 33 Connect, 34 CreateStream, 35 Play, 36 PublishingContent, 37 StartPublish, 38 WaitStateChange, 39 } 40 41 #[allow(dead_code)] 42 enum ClientSessionPlayState { 43 Handshake, 44 Connect, 45 CreateStream, 46 Play, 47 } 48 49 #[allow(dead_code)] 50 enum ClientSessionPublishState { 51 Handshake, 52 Connect, 53 CreateStream, 54 PublishingContent, 55 } 56 #[allow(dead_code)] 57 pub enum ClientType { 58 Play, 59 Publish, 60 } 61 pub struct ClientSession { 62 io: Arc<Mutex<BytesIO>>, 63 common: Common, 64 handshaker: SimpleHandshakeClient, 65 unpacketizer: ChunkUnpacketizer, 66 app_name: String, 67 stream_name: String, 68 /* Used to mark the subscriber's the data producer 69 in channels and delete it from map when unsubscribe 70 is called. */ 71 subscriber_id: Uuid, 72 state: ClientSessionState, 73 client_type: ClientType, 74 } 75 76 impl ClientSession { 77 #[allow(dead_code)] 78 pub fn new( 79 stream: TcpStream, 80 client_type: ClientType, 81 app_name: String, 82 stream_name: String, 83 event_producer: ChannelEventProducer, 84 ) -> Self { 85 let net_io = Arc::new(Mutex::new(BytesIO::new(stream))); 86 let subscriber_id = Uuid::new_v4(); 87 88 Self { 89 io: Arc::clone(&net_io), 90 common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Client), 91 handshaker: SimpleHandshakeClient::new(Arc::clone(&net_io)), 92 unpacketizer: ChunkUnpacketizer::new(), 93 app_name, 94 stream_name, 95 client_type, 96 state: ClientSessionState::Handshake, 97 subscriber_id, 98 } 99 } 100 101 pub async fn run(&mut self) -> Result<(), SessionError> { 102 loop { 103 match self.state { 104 ClientSessionState::Handshake => { 105 log::info!("[C -> S] handshake..."); 106 self.handshake().await?; 107 continue; 108 } 109 ClientSessionState::Connect => { 110 log::info!("[C -> S] connect..."); 111 self.send_connect(&(define::TRANSACTION_ID_CONNECT as f64)) 112 .await?; 113 self.state = ClientSessionState::WaitStateChange; 114 } 115 ClientSessionState::CreateStream => { 116 log::info!("[C -> S] CreateStream..."); 117 self.send_create_stream(&(define::TRANSACTION_ID_CREATE_STREAM as f64)) 118 .await?; 119 self.state = ClientSessionState::WaitStateChange; 120 } 121 ClientSessionState::Play => { 122 log::info!("[C -> S] Play..."); 123 self.send_play(&0.0, &self.stream_name.clone(), &0.0, &0.0, &false) 124 .await?; 125 self.state = ClientSessionState::WaitStateChange; 126 } 127 ClientSessionState::PublishingContent => { 128 log::info!("[C -> S] PublishingContent..."); 129 self.send_publish(&0.0, &self.stream_name.clone(), &"live".to_string()) 130 .await?; 131 self.state = ClientSessionState::WaitStateChange; 132 } 133 ClientSessionState::StartPublish => { 134 log::info!("[C -> S] StartPublish..."); 135 self.common.send_channel_data().await?; 136 } 137 ClientSessionState::WaitStateChange => {} 138 } 139 140 let data = self.io.lock().await.read().await?; 141 self.unpacketizer.extend_data(&data[..]); 142 143 loop { 144 let result = self.unpacketizer.read_chunks(); 145 146 if let Ok(rv) = result { 147 if let UnpackResult::Chunks(chunks) = rv { 148 for chunk_info in chunks.iter() { 149 let mut msg = MessageParser::new(chunk_info.clone()).parse()?; 150 151 let timestamp = chunk_info.message_header.timestamp; 152 self.process_messages(&mut msg, ×tamp).await?; 153 } 154 } 155 } else { 156 break; 157 } 158 } 159 } 160 } 161 162 async fn handshake(&mut self) -> Result<(), SessionError> { 163 loop { 164 self.handshaker.handshake().await?; 165 if self.handshaker.state == ClientHandshakeState::Finish { 166 log::info!("handshake finish"); 167 break; 168 } 169 170 let data = self.io.lock().await.read().await?; 171 self.handshaker.extend_data(&data[..]); 172 } 173 174 self.state = ClientSessionState::Connect; 175 176 Ok(()) 177 } 178 179 pub async fn process_messages( 180 &mut self, 181 msg: &mut RtmpMessageData, 182 timestamp: &u32, 183 ) -> Result<(), SessionError> { 184 match msg { 185 RtmpMessageData::Amf0Command { 186 command_name, 187 transaction_id, 188 command_object, 189 others, 190 } => { 191 log::info!("[C <- S] on_amf0_command_message..."); 192 self.on_amf0_command_message(command_name, transaction_id, command_object, others) 193 .await? 194 } 195 RtmpMessageData::SetPeerBandwidth { .. } => { 196 log::info!("[C <- S] on_set_peer_bandwidth..."); 197 self.on_set_peer_bandwidth().await? 198 } 199 RtmpMessageData::WindowAcknowledgementSize { .. } => { 200 log::info!("[C <- S] on_windows_acknowledgement_size..."); 201 } 202 RtmpMessageData::SetChunkSize { chunk_size } => { 203 log::info!("[C <- S] on_set_chunk_size..."); 204 self.on_set_chunk_size(chunk_size)?; 205 } 206 RtmpMessageData::StreamBegin { stream_id } => { 207 log::info!("[C <- S] on_stream_begin..."); 208 self.on_stream_begin(stream_id)?; 209 } 210 RtmpMessageData::StreamIsRecorded { stream_id } => { 211 log::info!("[C <- S] on_stream_is_recorded..."); 212 self.on_stream_is_recorded(stream_id)?; 213 } 214 RtmpMessageData::AudioData { data } => self.common.on_audio_data(data, timestamp)?, 215 RtmpMessageData::VideoData { data } => self.common.on_video_data(data, timestamp)?, 216 217 _ => {} 218 } 219 Ok(()) 220 } 221 222 pub async fn on_amf0_command_message( 223 &mut self, 224 command_name: &Amf0ValueType, 225 transaction_id: &Amf0ValueType, 226 command_object: &Amf0ValueType, 227 others: &mut Vec<Amf0ValueType>, 228 ) -> Result<(), SessionError> { 229 let empty_cmd_name = &String::new(); 230 let cmd_name = match command_name { 231 Amf0ValueType::UTF8String(str) => str, 232 _ => empty_cmd_name, 233 }; 234 235 let transaction_id = match transaction_id { 236 Amf0ValueType::Number(number) => *number as u8, 237 _ => 0, 238 }; 239 240 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 241 let _ = match command_object { 242 Amf0ValueType::Object(obj) => obj, 243 // Amf0ValueType::Null => 244 _ => &empty_cmd_obj, 245 }; 246 247 match cmd_name.as_str() { 248 "_result" => match transaction_id { 249 define::TRANSACTION_ID_CONNECT => { 250 log::info!("[C <- S] on_result_connect..."); 251 self.on_result_connect().await?; 252 } 253 define::TRANSACTION_ID_CREATE_STREAM => { 254 log::info!("[C <- S] on_result_create_stream..."); 255 self.on_result_create_stream()?; 256 } 257 _ => {} 258 }, 259 "_error" => { 260 self.on_error()?; 261 } 262 "onStatus" => { 263 match others.remove(0) { 264 Amf0ValueType::Object(obj) => self.on_status(&obj).await?, 265 _ => { 266 return Err(SessionError { 267 value: SessionErrorValue::Amf0ValueCountNotCorrect, 268 }) 269 } 270 }; 271 } 272 273 _ => {} 274 } 275 276 Ok(()) 277 } 278 279 pub async fn send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 280 self.send_set_chunk_size().await?; 281 282 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 283 let mut properties = ConnectProperties::new_none(); 284 285 let url = format!("rtmp://localhost:1935/{app_name}", app_name = self.app_name); 286 properties.app = Some(self.app_name.clone()); 287 properties.tc_url = Some(url.clone()); 288 289 match self.client_type { 290 ClientType::Play => { 291 properties.flash_ver = Some("flashVerFMLE/3.0 (compatible; FMSc/1.0)".to_string()); 292 properties.swf_url = Some(url.clone()); 293 } 294 ClientType::Publish => { 295 properties.fpad = Some(false); 296 properties.capabilities = Some(15_f64); 297 properties.audio_codecs = Some(3191_f64); 298 properties.video_codecs = Some(252_f64); 299 properties.video_function = Some(1_f64); 300 } 301 } 302 303 netconnection 304 .write_connect(transaction_id, &properties) 305 .await?; 306 307 Ok(()) 308 } 309 310 pub async fn send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 311 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 312 netconnection.write_create_stream(transaction_id).await?; 313 314 Ok(()) 315 } 316 317 pub async fn send_delete_stream( 318 &mut self, 319 transaction_id: &f64, 320 stream_id: &f64, 321 ) -> Result<(), SessionError> { 322 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 323 netstream 324 .write_delete_stream(transaction_id, stream_id) 325 .await?; 326 327 Ok(()) 328 } 329 330 pub async fn send_publish( 331 &mut self, 332 transaction_id: &f64, 333 stream_name: &String, 334 stream_type: &String, 335 ) -> Result<(), SessionError> { 336 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 337 netstream 338 .write_publish(transaction_id, stream_name, stream_type) 339 .await?; 340 341 Ok(()) 342 } 343 344 pub async fn send_play( 345 &mut self, 346 transaction_id: &f64, 347 stream_name: &String, 348 start: &f64, 349 duration: &f64, 350 reset: &bool, 351 ) -> Result<(), SessionError> { 352 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 353 netstream 354 .write_play(transaction_id, stream_name, start, duration, reset) 355 .await?; 356 357 Ok(()) 358 } 359 360 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 361 let mut controlmessage = 362 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 363 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 364 Ok(()) 365 } 366 367 pub async fn send_window_acknowledgement_size( 368 &mut self, 369 window_size: u32, 370 ) -> Result<(), SessionError> { 371 let mut controlmessage = 372 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 373 controlmessage 374 .write_window_acknowledgement_size(window_size) 375 .await?; 376 Ok(()) 377 } 378 379 pub async fn send_set_buffer_length( 380 &mut self, 381 stream_id: u32, 382 ms: u32, 383 ) -> Result<(), SessionError> { 384 let mut eventmessages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 385 eventmessages.write_set_buffer_length(stream_id, ms).await?; 386 387 Ok(()) 388 } 389 390 pub async fn on_result_connect(&mut self) -> Result<(), SessionError> { 391 let mut controlmessage = 392 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 393 controlmessage.write_acknowledgement(3107).await?; 394 395 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 396 netstream 397 .write_release_stream(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name) 398 .await?; 399 netstream 400 .write_fcpublish(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name) 401 .await?; 402 403 self.state = ClientSessionState::CreateStream; 404 405 Ok(()) 406 } 407 408 pub fn on_result_create_stream(&mut self) -> Result<(), SessionError> { 409 match self.client_type { 410 ClientType::Play => { 411 self.state = ClientSessionState::Play; 412 } 413 ClientType::Publish => { 414 self.state = ClientSessionState::PublishingContent; 415 } 416 } 417 Ok(()) 418 } 419 420 pub fn on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError> { 421 self.unpacketizer 422 .update_max_chunk_size(*chunk_size as usize); 423 Ok(()) 424 } 425 426 pub fn on_stream_is_recorded(&mut self, stream_id: &mut u32) -> Result<(), SessionError> { 427 log::trace!("stream is recorded stream_id is {}", stream_id); 428 Ok(()) 429 } 430 431 pub fn on_stream_begin(&mut self, stream_id: &mut u32) -> Result<(), SessionError> { 432 log::trace!("stream is begin stream_id is {}", stream_id); 433 Ok(()) 434 } 435 436 pub async fn on_set_peer_bandwidth(&mut self) -> Result<(), SessionError> { 437 self.send_window_acknowledgement_size(250000).await?; 438 Ok(()) 439 } 440 441 pub fn on_error(&mut self) -> Result<(), SessionError> { 442 Ok(()) 443 } 444 445 pub async fn on_status( 446 &mut self, 447 obj: &HashMap<String, Amf0ValueType>, 448 ) -> Result<(), SessionError> { 449 if let Some(Amf0ValueType::UTF8String(code_info)) = obj.get("code") { 450 match &code_info[..] { 451 "NetStream.Publish.Start" => { 452 self.state = ClientSessionState::StartPublish; 453 self.common 454 .subscribe_from_channels( 455 self.app_name.clone(), 456 self.stream_name.clone(), 457 self.subscriber_id, 458 ) 459 .await?; 460 } 461 "NetStream.Publish.Reset" => {} 462 463 "NetStream.Play.Start" => { 464 self.common 465 .publish_to_channels(self.app_name.clone(), self.stream_name.clone()) 466 .await? 467 } 468 _ => {} 469 } 470 } 471 log::trace!("{}", obj.len()); 472 Ok(()) 473 } 474 } 475