1 use uuid::Uuid; 2 3 use { 4 super::{ 5 common::Common, 6 define, 7 define::SessionType, 8 errors::{SessionError, SessionErrorValue}, 9 }, 10 crate::utils::print::print, 11 crate::{ 12 amf0::Amf0ValueType, 13 channels::define::ChannelEventProducer, 14 chunk::{ 15 define::CHUNK_SIZE, 16 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 17 }, 18 handshake::handshake::{ClientHandshakeState, SimpleHandshakeClient}, 19 messages::{define::RtmpMessageData, parser::MessageParser}, 20 netconnection::writer::{ConnectProperties, NetConnection}, 21 netstream::writer::NetStreamWriter, 22 protocol_control_messages::writer::ProtocolControlMessagesWriter, 23 user_control_messages::writer::EventMessagesWriter, 24 }, 25 bytesio::{bytes_writer::AsyncBytesWriter, bytesio::BytesIO}, 26 std::{collections::HashMap, sync::Arc}, 27 tokio::{net::TcpStream, sync::Mutex}, 28 }; 29 30 #[allow(dead_code)] 31 enum ClientSessionState { 32 Handshake, 33 Connect, 34 CreateStream, 35 Play, 36 PublishingContent, 37 StartPublish, 38 WaitStateChange, 39 } 40 41 #[allow(dead_code)] 42 enum ClientSessionPlayState { 43 Handshake, 44 Connect, 45 CreateStream, 46 Play, 47 } 48 49 #[allow(dead_code)] 50 enum ClientSessionPublishState { 51 Handshake, 52 Connect, 53 CreateStream, 54 PublishingContent, 55 } 56 #[allow(dead_code)] 57 pub enum ClientType { 58 Play, 59 Publish, 60 } 61 pub struct ClientSession { 62 io: Arc<Mutex<BytesIO>>, 63 common: Common, 64 65 handshaker: SimpleHandshakeClient, 66 67 unpacketizer: ChunkUnpacketizer, 68 69 app_name: String, 70 stream_name: String, 71 72 /* Used to mark the subscriber's the data producer 73 in channels and delete it from map when unsubscribe 74 is called. */ 75 subscriber_id: Uuid, 76 77 state: ClientSessionState, 78 client_type: ClientType, 79 } 80 81 impl ClientSession { 82 #[allow(dead_code)] 83 pub fn new( 84 stream: TcpStream, 85 client_type: ClientType, 86 app_name: String, 87 stream_name: String, 88 event_producer: ChannelEventProducer, 89 ) -> Self { 90 let net_io = Arc::new(Mutex::new(BytesIO::new(stream))); 91 let subscriber_id = Uuid::new_v4(); 92 93 Self { 94 io: Arc::clone(&net_io), 95 common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Client), 96 97 handshaker: SimpleHandshakeClient::new(Arc::clone(&net_io)), 98 99 unpacketizer: ChunkUnpacketizer::new(), 100 101 app_name, 102 stream_name, 103 client_type, 104 105 state: ClientSessionState::Handshake, 106 subscriber_id, 107 } 108 } 109 110 pub async fn run(&mut self) -> Result<(), SessionError> { 111 loop { 112 match self.state { 113 ClientSessionState::Handshake => { 114 log::info!("[C -> S] handshake..."); 115 self.handshake().await?; 116 continue; 117 } 118 ClientSessionState::Connect => { 119 log::info!("[C -> S] connect..."); 120 self.send_connect(&(define::TRANSACTION_ID_CONNECT as f64)) 121 .await?; 122 self.state = ClientSessionState::WaitStateChange; 123 } 124 ClientSessionState::CreateStream => { 125 log::info!("[C -> S] CreateStream..."); 126 self.send_create_stream(&(define::TRANSACTION_ID_CREATE_STREAM as f64)) 127 .await?; 128 self.state = ClientSessionState::WaitStateChange; 129 } 130 ClientSessionState::Play => { 131 log::info!("[C -> S] Play..."); 132 self.send_play(&0.0, &self.stream_name.clone(), &0.0, &0.0, &false) 133 .await?; 134 self.state = ClientSessionState::WaitStateChange; 135 } 136 ClientSessionState::PublishingContent => { 137 log::info!("[C -> S] PublishingContent..."); 138 self.send_publish(&0.0, &self.stream_name.clone(), &"live".to_string()) 139 .await?; 140 self.state = ClientSessionState::WaitStateChange; 141 } 142 ClientSessionState::StartPublish => { 143 log::info!("[C -> S] StartPublish..."); 144 self.common.send_channel_data().await?; 145 } 146 ClientSessionState::WaitStateChange => {} 147 } 148 149 let data = self.io.lock().await.read().await?; 150 self.unpacketizer.extend_data(&data[..]); 151 let result = self.unpacketizer.read_chunk()?; 152 153 match result { 154 UnpackResult::ChunkInfo(chunk_info) => { 155 let mut message_parser = MessageParser::new(chunk_info.clone()); 156 let mut msg = message_parser.parse()?; 157 let timestamp = chunk_info.message_header.timestamp; 158 159 self.process_messages(&mut msg, ×tamp).await?; 160 } 161 _ => {} 162 } 163 } 164 165 // Ok(()) 166 } 167 168 async fn handshake(&mut self) -> Result<(), SessionError> { 169 loop { 170 self.handshaker.handshake().await?; 171 if self.handshaker.state == ClientHandshakeState::Finish { 172 log::info!("handshake finish"); 173 break; 174 } 175 176 let data = self.io.lock().await.read().await?; 177 print(data.clone()); 178 self.handshaker.extend_data(&data[..]); 179 } 180 181 self.state = ClientSessionState::Connect; 182 183 Ok(()) 184 } 185 186 pub async fn process_messages( 187 &mut self, 188 msg: &mut RtmpMessageData, 189 timestamp: &u32, 190 ) -> Result<(), SessionError> { 191 match msg { 192 RtmpMessageData::Amf0Command { 193 command_name, 194 transaction_id, 195 command_object, 196 others, 197 } => { 198 self.on_amf0_command_message(command_name, transaction_id, command_object, others) 199 .await? 200 } 201 RtmpMessageData::SetPeerBandwidth { properties } => { 202 log::trace!( 203 "process_messages SetPeerBandwidth windows size: {}", 204 properties.window_size 205 ); 206 self.on_set_peer_bandwidth().await? 207 } 208 RtmpMessageData::SetChunkSize { chunk_size } => self.on_set_chunk_size(chunk_size)?, 209 210 RtmpMessageData::StreamBegin { stream_id } => self.on_stream_begin(stream_id)?, 211 212 RtmpMessageData::StreamIsRecorded { stream_id } => { 213 self.on_stream_is_recorded(stream_id)? 214 } 215 216 RtmpMessageData::AudioData { data } => self.common.on_audio_data(data, timestamp)?, 217 218 RtmpMessageData::VideoData { data } => self.common.on_video_data(data, timestamp)?, 219 220 _ => {} 221 } 222 Ok(()) 223 } 224 225 pub async fn on_amf0_command_message( 226 &mut self, 227 command_name: &Amf0ValueType, 228 transaction_id: &Amf0ValueType, 229 command_object: &Amf0ValueType, 230 others: &mut Vec<Amf0ValueType>, 231 ) -> Result<(), SessionError> { 232 let empty_cmd_name = &String::new(); 233 let cmd_name = match command_name { 234 Amf0ValueType::UTF8String(str) => str, 235 _ => empty_cmd_name, 236 }; 237 238 let transaction_id = match transaction_id { 239 Amf0ValueType::Number(number) => number.clone() as u8, 240 _ => 0, 241 }; 242 243 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 244 let _ = match command_object { 245 Amf0ValueType::Object(obj) => obj, 246 // Amf0ValueType::Null => 247 _ => &empty_cmd_obj, 248 }; 249 250 match cmd_name.as_str() { 251 "_result" => match transaction_id { 252 define::TRANSACTION_ID_CONNECT => { 253 self.on_result_connect().await?; 254 } 255 define::TRANSACTION_ID_CREATE_STREAM => { 256 self.on_result_create_stream()?; 257 } 258 _ => {} 259 }, 260 "_error" => { 261 self.on_error()?; 262 } 263 "onStatus" => { 264 match others.remove(0) { 265 Amf0ValueType::Object(obj) => self.on_status(&obj).await?, 266 _ => { 267 return Err(SessionError { 268 value: SessionErrorValue::Amf0ValueCountNotCorrect, 269 }) 270 } 271 }; 272 } 273 274 _ => {} 275 } 276 277 Ok(()) 278 } 279 280 pub async fn send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 281 self.send_set_chunk_size().await?; 282 283 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 284 285 let mut properties = ConnectProperties::new_none(); 286 287 let url = format!("rtmp://localhost:1935/{app_name}", app_name = self.app_name); 288 properties.app = Some(self.app_name.clone()); 289 properties.tc_url = Some(url.clone()); 290 291 match self.client_type { 292 ClientType::Play => { 293 properties.flash_ver = Some("flashVerFMLE/3.0 (compatible; FMSc/1.0)".to_string()); 294 properties.swf_url = Some(url.clone()); 295 } 296 ClientType::Publish => { 297 properties.fpad = Some(false); 298 properties.capabilities = Some(15_f64); 299 properties.audio_codecs = Some(3191_f64); 300 properties.video_codecs = Some(252_f64); 301 properties.video_function = Some(1_f64); 302 } 303 } 304 305 netconnection.write_connect(transaction_id, &properties).await?; 306 307 // let mut chunk_info = ChunkInfo::new( 308 // csid_type::COMMAND_AMF0_AMF3, 309 // chunk_type::TYPE_0, 310 // 0, 311 // data.len() as u32, 312 // msg_type_id::COMMAND_AMF0, 313 // 0, 314 // data, 315 // ); 316 317 // self.packetizer.write_chunk(&mut chunk_info).await?; 318 Ok(()) 319 } 320 321 pub async fn send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 322 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 323 netconnection.write_create_stream(transaction_id).await?; 324 325 Ok(()) 326 } 327 328 pub async fn send_delete_stream( 329 &mut self, 330 transaction_id: &f64, 331 stream_id: &f64, 332 ) -> Result<(), SessionError> { 333 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 334 netstream 335 .write_delete_stream(transaction_id, stream_id) 336 .await?; 337 338 Ok(()) 339 } 340 341 pub async fn send_publish( 342 &mut self, 343 transaction_id: &f64, 344 stream_name: &String, 345 stream_type: &String, 346 ) -> Result<(), SessionError> { 347 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 348 netstream 349 .write_publish(transaction_id, stream_name, stream_type) 350 .await?; 351 352 Ok(()) 353 } 354 355 pub async fn send_play( 356 &mut self, 357 transaction_id: &f64, 358 stream_name: &String, 359 start: &f64, 360 duration: &f64, 361 reset: &bool, 362 ) -> Result<(), SessionError> { 363 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 364 netstream 365 .write_play(transaction_id, stream_name, start, duration, reset) 366 .await?; 367 368 Ok(()) 369 } 370 371 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 372 let mut controlmessage = 373 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 374 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 375 Ok(()) 376 } 377 378 pub async fn send_window_acknowledgement_size( 379 &mut self, 380 window_size: u32, 381 ) -> Result<(), SessionError> { 382 let mut controlmessage = 383 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 384 controlmessage 385 .write_window_acknowledgement_size(window_size) 386 .await?; 387 Ok(()) 388 } 389 390 pub async fn send_set_buffer_length( 391 &mut self, 392 stream_id: u32, 393 ms: u32, 394 ) -> Result<(), SessionError> { 395 let mut eventmessages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 396 eventmessages.write_set_buffer_length(stream_id, ms).await?; 397 398 Ok(()) 399 } 400 401 pub async fn on_result_connect(&mut self) -> Result<(), SessionError> { 402 let mut controlmessage = 403 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 404 controlmessage.write_acknowledgement(3107).await?; 405 406 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 407 netstream 408 .write_release_stream(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name) 409 .await?; 410 netstream 411 .write_fcpublish(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name) 412 .await?; 413 414 self.state = ClientSessionState::CreateStream; 415 416 Ok(()) 417 } 418 419 pub fn on_result_create_stream(&mut self) -> Result<(), SessionError> { 420 match self.client_type { 421 ClientType::Play => { 422 self.state = ClientSessionState::Play; 423 } 424 ClientType::Publish => { 425 self.state = ClientSessionState::PublishingContent; 426 } 427 } 428 Ok(()) 429 } 430 431 pub fn on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError> { 432 self.unpacketizer 433 .update_max_chunk_size(chunk_size.clone() as usize); 434 Ok(()) 435 } 436 437 pub fn on_stream_is_recorded(&mut self, stream_id: &mut u32) -> Result<(), SessionError> { 438 log::trace!("stream is recorded stream_id is {}", stream_id); 439 Ok(()) 440 } 441 442 pub fn on_stream_begin(&mut self, stream_id: &mut u32) -> Result<(), SessionError> { 443 log::trace!("stream is begin stream_id is {}", stream_id); 444 Ok(()) 445 } 446 447 pub async fn on_set_peer_bandwidth(&mut self) -> Result<(), SessionError> { 448 self.send_window_acknowledgement_size(250000).await?; 449 Ok(()) 450 } 451 452 pub fn on_error(&mut self) -> Result<(), SessionError> { 453 Ok(()) 454 } 455 456 pub async fn on_status( 457 &mut self, 458 obj: &HashMap<String, Amf0ValueType>, 459 ) -> Result<(), SessionError> { 460 if let Some(Amf0ValueType::UTF8String(code_info)) = obj.get("code") { 461 match &code_info[..] { 462 "NetStream.Publish.Start" => { 463 self.state = ClientSessionState::StartPublish; 464 self.common 465 .subscribe_from_channels( 466 self.app_name.clone(), 467 self.stream_name.clone(), 468 self.subscriber_id, 469 ) 470 .await?; 471 } 472 "NetStream.Publish.Reset" => {} 473 474 "NetStream.Play.Start" => { 475 self.common 476 .publish_to_channels(self.app_name.clone(), self.stream_name.clone()) 477 .await? 478 } 479 _ => {} 480 } 481 } 482 log::trace!("{}", obj.len()); 483 Ok(()) 484 } 485 } 486