1 use crate::chunk::packetizer::ChunkPacketizer; 2 3 use { 4 super::{ 5 common::Common, 6 define, 7 define::SessionType, 8 errors::{SessionError, SessionErrorValue}, 9 }, 10 crate::{ 11 amf0::Amf0ValueType, 12 chunk::{ 13 define::CHUNK_SIZE, 14 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 15 }, 16 handshake, 17 handshake::{define::ClientHandshakeState, handshake_client::SimpleHandshakeClient}, 18 messages::{define::RtmpMessageData, parser::MessageParser}, 19 netconnection::writer::{ConnectProperties, NetConnection}, 20 netstream::writer::NetStreamWriter, 21 protocol_control_messages::writer::ProtocolControlMessagesWriter, 22 user_control_messages::writer::EventMessagesWriter, 23 utils::RtmpUrlParser, 24 }, 25 bytesio::{ 26 bytes_writer::AsyncBytesWriter, 27 bytesio::{TNetIO, TcpIO}, 28 }, 29 indexmap::IndexMap, 30 std::sync::Arc, 31 //crate::utils::print::print, 32 streamhub::define::StreamHubEventSender, 33 streamhub::utils::RandomDigitCount, 34 streamhub::utils::Uuid, 35 tokio::{net::TcpStream, sync::Mutex}, 36 }; 37 38 #[allow(dead_code)] 39 enum ClientSessionState { 40 Handshake, 41 Connect, 42 CreateStream, 43 Play, 44 PublishingContent, 45 StartPublish, 46 WaitStateChange, 47 } 48 49 #[allow(dead_code)] 50 enum ClientSessionPlayState { 51 Handshake, 52 Connect, 53 CreateStream, 54 Play, 55 } 56 57 #[allow(dead_code)] 58 enum ClientSessionPublishState { 59 Handshake, 60 Connect, 61 CreateStream, 62 PublishingContent, 63 } 64 #[allow(dead_code)] 65 #[derive(Clone, Debug, PartialEq)] 66 pub enum ClientType { 67 Play, 68 Publish, 69 } 70 pub struct ClientSession { 71 io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, 72 common: Common, 73 handshaker: SimpleHandshakeClient, 74 unpacketizer: ChunkUnpacketizer, 75 //domain name with port 76 raw_domain_name: String, 77 app_name: String, 78 //stream name with parameters 79 raw_stream_name: String, 80 stream_name: String, 81 /* Used to mark the subscriber's the data producer 82 in channels and delete it from map when unsubscribe 83 is called. */ 84 session_id: Uuid, 85 state: ClientSessionState, 86 client_type: ClientType, 87 sub_app_name: Option<String>, 88 sub_stream_name: Option<String>, 89 /*configure how many gops will be cached.*/ 90 gop_num: usize, 91 } 92 93 impl ClientSession { new( stream: TcpStream, client_type: ClientType, raw_domain_name: String, app_name: String, raw_stream_name: String, event_producer: StreamHubEventSender, gop_num: usize, ) -> Self94 pub fn new( 95 stream: TcpStream, 96 client_type: ClientType, 97 raw_domain_name: String, 98 app_name: String, 99 raw_stream_name: String, 100 event_producer: StreamHubEventSender, 101 gop_num: usize, 102 ) -> Self { 103 let remote_addr = if let Ok(addr) = stream.peer_addr() { 104 log::info!("server session: {}", addr.to_string()); 105 Some(addr) 106 } else { 107 None 108 }; 109 110 let tcp_io: Box<dyn TNetIO + Send + Sync> = Box::new(TcpIO::new(stream)); 111 let net_io = Arc::new(Mutex::new(tcp_io)); 112 113 let subscriber_id = Uuid::new(RandomDigitCount::Four); 114 115 let packetizer = if client_type == ClientType::Publish { 116 Some(ChunkPacketizer::new(Arc::clone(&net_io))) 117 } else { 118 None 119 }; 120 121 let common = Common::new(packetizer, event_producer, SessionType::Client, remote_addr); 122 123 let (stream_name, _) = RtmpUrlParser::default() 124 .set_raw_stream_name(raw_stream_name.clone()) 125 .parse_raw_stream_name(); 126 127 Self { 128 io: Arc::clone(&net_io), 129 common, 130 handshaker: SimpleHandshakeClient::new(Arc::clone(&net_io)), 131 unpacketizer: ChunkUnpacketizer::new(), 132 raw_domain_name, 133 app_name, 134 raw_stream_name, 135 stream_name, 136 session_id: subscriber_id, 137 state: ClientSessionState::Handshake, 138 client_type, 139 sub_app_name: None, 140 sub_stream_name: None, 141 gop_num, 142 } 143 } 144 run(&mut self) -> Result<(), SessionError>145 pub async fn run(&mut self) -> Result<(), SessionError> { 146 loop { 147 match self.state { 148 ClientSessionState::Handshake => { 149 log::info!("[C -> S] handshake..."); 150 self.handshake().await?; 151 continue; 152 } 153 ClientSessionState::Connect => { 154 log::info!("[C -> S] connect..."); 155 self.send_connect(&(define::TRANSACTION_ID_CONNECT as f64)) 156 .await?; 157 self.state = ClientSessionState::WaitStateChange; 158 } 159 ClientSessionState::CreateStream => { 160 log::info!("[C -> S] CreateStream..."); 161 self.send_create_stream(&(define::TRANSACTION_ID_CREATE_STREAM as f64)) 162 .await?; 163 self.state = ClientSessionState::WaitStateChange; 164 } 165 ClientSessionState::Play => { 166 log::info!("[C -> S] Play..."); 167 self.send_play(&0.0, &self.raw_stream_name.clone(), &0.0, &0.0, &false) 168 .await?; 169 self.state = ClientSessionState::WaitStateChange; 170 } 171 ClientSessionState::PublishingContent => { 172 log::info!("[C -> S] PublishingContent..."); 173 self.send_publish(&3.0, &self.raw_stream_name.clone(), &"live".to_string()) 174 .await?; 175 self.state = ClientSessionState::WaitStateChange; 176 } 177 ClientSessionState::StartPublish => { 178 log::info!("[C -> S] StartPublish..."); 179 self.common.send_channel_data().await?; 180 } 181 ClientSessionState::WaitStateChange => {} 182 } 183 184 let data = self.io.lock().await.read().await?; 185 self.unpacketizer.extend_data(&data[..]); 186 187 loop { 188 match self.unpacketizer.read_chunks() { 189 Ok(rv) => { 190 if let UnpackResult::Chunks(chunks) = rv { 191 for chunk_info in chunks.iter() { 192 if let Some(mut msg) = 193 MessageParser::new(chunk_info.clone()).parse()? 194 { 195 let timestamp = chunk_info.message_header.timestamp; 196 self.process_messages(&mut msg, ×tamp).await?; 197 } 198 } 199 } 200 } 201 Err(err) => { 202 log::trace!("read trunks error: {}", err); 203 break; 204 } 205 } 206 } 207 } 208 } 209 handshake(&mut self) -> Result<(), SessionError>210 async fn handshake(&mut self) -> Result<(), SessionError> { 211 loop { 212 self.handshaker.handshake().await?; 213 if self.handshaker.state == ClientHandshakeState::Finish { 214 log::info!("handshake finish"); 215 break; 216 } 217 218 let mut bytes_len = 0; 219 while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE * 2 { 220 let data = self.io.lock().await.read().await?; 221 bytes_len += data.len(); 222 self.handshaker.extend_data(&data[..]); 223 } 224 } 225 226 self.state = ClientSessionState::Connect; 227 228 Ok(()) 229 } 230 process_messages( &mut self, msg: &mut RtmpMessageData, timestamp: &u32, ) -> Result<(), SessionError>231 pub async fn process_messages( 232 &mut self, 233 msg: &mut RtmpMessageData, 234 timestamp: &u32, 235 ) -> Result<(), SessionError> { 236 match msg { 237 RtmpMessageData::Amf0Command { 238 command_name, 239 transaction_id, 240 command_object, 241 others, 242 } => { 243 log::info!("[C <- S] on_amf0_command_message..."); 244 self.on_amf0_command_message(command_name, transaction_id, command_object, others) 245 .await? 246 } 247 RtmpMessageData::SetPeerBandwidth { .. } => { 248 log::info!("[C <- S] on_set_peer_bandwidth..."); 249 self.on_set_peer_bandwidth().await? 250 } 251 RtmpMessageData::WindowAcknowledgementSize { .. } => { 252 log::info!("[C <- S] on_windows_acknowledgement_size..."); 253 } 254 RtmpMessageData::SetChunkSize { chunk_size } => { 255 log::info!("[C <- S] on_set_chunk_size..."); 256 self.on_set_chunk_size(chunk_size)?; 257 } 258 RtmpMessageData::StreamBegin { stream_id } => { 259 log::info!("[C <- S] on_stream_begin..."); 260 self.on_stream_begin(stream_id)?; 261 } 262 RtmpMessageData::StreamIsRecorded { stream_id } => { 263 log::info!("[C <- S] on_stream_is_recorded..."); 264 self.on_stream_is_recorded(stream_id)?; 265 } 266 RtmpMessageData::AudioData { data } => { 267 self.common.on_audio_data(data, timestamp).await? 268 } 269 RtmpMessageData::VideoData { data } => { 270 self.common.on_video_data(data, timestamp).await? 271 } 272 RtmpMessageData::AmfData { raw_data } => { 273 self.common.on_meta_data(raw_data, timestamp).await?; 274 } 275 276 _ => {} 277 } 278 Ok(()) 279 } 280 on_amf0_command_message( &mut self, command_name: &Amf0ValueType, transaction_id: &Amf0ValueType, command_object: &Amf0ValueType, others: &mut Vec<Amf0ValueType>, ) -> Result<(), SessionError>281 pub async fn on_amf0_command_message( 282 &mut self, 283 command_name: &Amf0ValueType, 284 transaction_id: &Amf0ValueType, 285 command_object: &Amf0ValueType, 286 others: &mut Vec<Amf0ValueType>, 287 ) -> Result<(), SessionError> { 288 log::info!("[C <- S] on_amf0_command_message..."); 289 let empty_cmd_name = &String::new(); 290 let cmd_name = match command_name { 291 Amf0ValueType::UTF8String(str) => str, 292 _ => empty_cmd_name, 293 }; 294 295 let transaction_id = match transaction_id { 296 Amf0ValueType::Number(number) => *number as u8, 297 _ => 0, 298 }; 299 300 let empty_cmd_obj: IndexMap<String, Amf0ValueType> = IndexMap::new(); 301 let _ = match command_object { 302 Amf0ValueType::Object(obj) => obj, 303 // Amf0ValueType::Null => 304 _ => &empty_cmd_obj, 305 }; 306 307 match cmd_name.as_str() { 308 "_result" => match transaction_id { 309 define::TRANSACTION_ID_CONNECT => { 310 log::info!("[C <- S] on_result_connect..."); 311 self.on_result_connect().await?; 312 } 313 define::TRANSACTION_ID_CREATE_STREAM => { 314 log::info!("[C <- S] on_result_create_stream..."); 315 self.on_result_create_stream()?; 316 } 317 _ => {} 318 }, 319 "_error" => { 320 self.on_error()?; 321 } 322 "onStatus" => { 323 match others.remove(0) { 324 Amf0ValueType::Object(obj) => self.on_status(&obj).await?, 325 _ => { 326 return Err(SessionError { 327 value: SessionErrorValue::Amf0ValueCountNotCorrect, 328 }) 329 } 330 }; 331 } 332 333 _ => {} 334 } 335 336 Ok(()) 337 } 338 send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError>339 pub async fn send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 340 self.send_set_chunk_size().await?; 341 342 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 343 let mut properties = ConnectProperties::new_none(); 344 345 let url = format!( 346 "rtmp://{domain_name}/{app_name}", 347 domain_name = self.raw_domain_name, 348 app_name = self.app_name 349 ); 350 properties.app = Some(self.app_name.clone()); 351 352 match self.client_type { 353 ClientType::Play => { 354 properties.flash_ver = Some("LNX 9,0,124,2".to_string()); 355 properties.tc_url = Some(url.clone()); 356 properties.fpad = Some(false); 357 properties.capabilities = Some(15_f64); 358 properties.audio_codecs = Some(4071_f64); 359 properties.video_codecs = Some(252_f64); 360 properties.video_function = Some(1_f64); 361 } 362 ClientType::Publish => { 363 properties.pub_type = Some("nonprivate".to_string()); 364 properties.flash_ver = Some("FMLE/3.0 (compatible; xiu)".to_string()); 365 properties.fpad = Some(false); 366 properties.tc_url = Some(url.clone()); 367 } 368 } 369 370 netconnection 371 .write_connect(transaction_id, &properties) 372 .await?; 373 374 Ok(()) 375 } 376 send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError>377 pub async fn send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 378 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 379 netconnection.write_create_stream(transaction_id).await?; 380 381 Ok(()) 382 } 383 send_delete_stream( &mut self, transaction_id: &f64, stream_id: &f64, ) -> Result<(), SessionError>384 pub async fn send_delete_stream( 385 &mut self, 386 transaction_id: &f64, 387 stream_id: &f64, 388 ) -> Result<(), SessionError> { 389 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 390 netstream 391 .write_delete_stream(transaction_id, stream_id) 392 .await?; 393 394 Ok(()) 395 } 396 send_publish( &mut self, transaction_id: &f64, stream_name: &String, stream_type: &String, ) -> Result<(), SessionError>397 pub async fn send_publish( 398 &mut self, 399 transaction_id: &f64, 400 stream_name: &String, 401 stream_type: &String, 402 ) -> Result<(), SessionError> { 403 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 404 netstream 405 .write_publish(transaction_id, stream_name, stream_type) 406 .await?; 407 408 Ok(()) 409 } 410 send_play( &mut self, transaction_id: &f64, stream_name: &String, start: &f64, duration: &f64, reset: &bool, ) -> Result<(), SessionError>411 pub async fn send_play( 412 &mut self, 413 transaction_id: &f64, 414 stream_name: &String, 415 start: &f64, 416 duration: &f64, 417 reset: &bool, 418 ) -> Result<(), SessionError> { 419 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 420 netstream 421 .write_play(transaction_id, stream_name, start, duration, reset) 422 .await?; 423 424 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 425 netconnection 426 .write_get_stream_length(transaction_id, stream_name) 427 .await?; 428 429 self.send_set_buffer_length(1, 1300).await?; 430 431 Ok(()) 432 } 433 send_set_chunk_size(&mut self) -> Result<(), SessionError>434 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 435 let mut controlmessage = 436 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 437 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 438 Ok(()) 439 } 440 send_window_acknowledgement_size( &mut self, window_size: u32, ) -> Result<(), SessionError>441 pub async fn send_window_acknowledgement_size( 442 &mut self, 443 window_size: u32, 444 ) -> Result<(), SessionError> { 445 let mut controlmessage = 446 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 447 controlmessage 448 .write_window_acknowledgement_size(window_size) 449 .await?; 450 Ok(()) 451 } 452 send_set_buffer_length( &mut self, stream_id: u32, ms: u32, ) -> Result<(), SessionError>453 pub async fn send_set_buffer_length( 454 &mut self, 455 stream_id: u32, 456 ms: u32, 457 ) -> Result<(), SessionError> { 458 let mut eventmessages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 459 eventmessages.write_set_buffer_length(stream_id, ms).await?; 460 461 Ok(()) 462 } 463 on_result_connect(&mut self) -> Result<(), SessionError>464 pub async fn on_result_connect(&mut self) -> Result<(), SessionError> { 465 let mut controlmessage = 466 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 467 controlmessage.write_acknowledgement(3107).await?; 468 469 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 470 netstream 471 .write_release_stream(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name) 472 .await?; 473 netstream 474 .write_fcpublish(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name) 475 .await?; 476 477 self.state = ClientSessionState::CreateStream; 478 479 Ok(()) 480 } 481 on_result_create_stream(&mut self) -> Result<(), SessionError>482 pub fn on_result_create_stream(&mut self) -> Result<(), SessionError> { 483 match self.client_type { 484 ClientType::Play => { 485 self.state = ClientSessionState::Play; 486 } 487 ClientType::Publish => { 488 self.state = ClientSessionState::PublishingContent; 489 } 490 } 491 Ok(()) 492 } 493 on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError>494 pub fn on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError> { 495 self.unpacketizer 496 .update_max_chunk_size(*chunk_size as usize); 497 Ok(()) 498 } 499 on_stream_is_recorded(&mut self, stream_id: &mut u32) -> Result<(), SessionError>500 pub fn on_stream_is_recorded(&mut self, stream_id: &mut u32) -> Result<(), SessionError> { 501 log::trace!("stream is recorded stream_id is {}", stream_id); 502 Ok(()) 503 } 504 on_stream_begin(&mut self, stream_id: &mut u32) -> Result<(), SessionError>505 pub fn on_stream_begin(&mut self, stream_id: &mut u32) -> Result<(), SessionError> { 506 log::trace!("stream is begin stream_id is {}", stream_id); 507 Ok(()) 508 } 509 on_set_peer_bandwidth(&mut self) -> Result<(), SessionError>510 pub async fn on_set_peer_bandwidth(&mut self) -> Result<(), SessionError> { 511 self.send_window_acknowledgement_size(5000000).await?; 512 513 Ok(()) 514 } 515 on_error(&mut self) -> Result<(), SessionError>516 pub fn on_error(&mut self) -> Result<(), SessionError> { 517 Ok(()) 518 } 519 on_status( &mut self, obj: &IndexMap<String, Amf0ValueType>, ) -> Result<(), SessionError>520 pub async fn on_status( 521 &mut self, 522 obj: &IndexMap<String, Amf0ValueType>, 523 ) -> Result<(), SessionError> { 524 if let Some(Amf0ValueType::UTF8String(code_info)) = obj.get("code") { 525 match &code_info[..] { 526 "NetStream.Publish.Start" => { 527 self.state = ClientSessionState::StartPublish; 528 //subscribe from local session and publish to remote rtmp server 529 if let (Some(app_name), Some(stream_name)) = 530 (&self.sub_app_name, &self.sub_stream_name) 531 { 532 self.common 533 .subscribe_from_channels( 534 app_name.clone(), 535 stream_name.clone(), 536 self.session_id, 537 ) 538 .await?; 539 } else { 540 self.common 541 .subscribe_from_channels( 542 self.app_name.clone(), 543 self.stream_name.clone(), 544 self.session_id, 545 ) 546 .await?; 547 } 548 } 549 "NetStream.Publish.Reset" => {} 550 "NetStream.Play.Start" => { 551 //pull from remote rtmp server and publish to local session 552 self.common 553 .publish_to_channels( 554 self.app_name.clone(), 555 self.stream_name.clone(), 556 self.session_id, 557 self.gop_num, 558 ) 559 .await? 560 } 561 _ => {} 562 } 563 } 564 log::trace!("{}", obj.len()); 565 Ok(()) 566 } 567 subscribe(&mut self, app_name: String, stream_name: String)568 pub fn subscribe(&mut self, app_name: String, stream_name: String) { 569 self.sub_app_name = Some(app_name); 570 self.sub_stream_name = Some(stream_name); 571 } 572 } 573