1 use crate::chunk::packetizer::ChunkPacketizer; 2 3 use { 4 super::{ 5 common::Common, 6 define, 7 define::SessionType, 8 errors::{SessionError, SessionErrorValue}, 9 }, 10 crate::{ 11 amf0::Amf0ValueType, 12 chunk::{ 13 define::CHUNK_SIZE, 14 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 15 }, 16 handshake, 17 handshake::{define::ClientHandshakeState, handshake_client::SimpleHandshakeClient}, 18 messages::{define::RtmpMessageData, parser::MessageParser}, 19 netconnection::writer::{ConnectProperties, NetConnection}, 20 netstream::writer::NetStreamWriter, 21 protocol_control_messages::writer::ProtocolControlMessagesWriter, 22 user_control_messages::writer::EventMessagesWriter, 23 utils::RtmpUrlParser, 24 }, 25 bytesio::{ 26 bytes_writer::AsyncBytesWriter, 27 bytesio::{TNetIO, TcpIO}, 28 }, 29 indexmap::IndexMap, 30 std::sync::Arc, 31 //crate::utils::print::print, 32 streamhub::define::StreamHubEventSender, 33 streamhub::utils::RandomDigitCount, 34 streamhub::utils::Uuid, 35 tokio::{net::TcpStream, sync::Mutex}, 36 }; 37 38 #[allow(dead_code)] 39 enum ClientSessionState { 40 Handshake, 41 Connect, 42 CreateStream, 43 Play, 44 PublishingContent, 45 StartPublish, 46 WaitStateChange, 47 } 48 49 #[allow(dead_code)] 50 enum ClientSessionPlayState { 51 Handshake, 52 Connect, 53 CreateStream, 54 Play, 55 } 56 57 #[allow(dead_code)] 58 enum ClientSessionPublishState { 59 Handshake, 60 Connect, 61 CreateStream, 62 PublishingContent, 63 } 64 #[allow(dead_code)] 65 #[derive(Clone, Debug, PartialEq)] 66 pub enum ClientType { 67 Play, 68 Publish, 69 } 70 pub struct ClientSession { 71 io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, 72 common: Common, 73 handshaker: SimpleHandshakeClient, 74 unpacketizer: ChunkUnpacketizer, 75 //domain name with port 76 raw_domain_name: String, 77 app_name: String, 78 //stream name with parameters 79 raw_stream_name: String, 80 stream_name: String, 81 /* Used to mark the subscriber's the data producer 82 in channels and delete it from map when unsubscribe 83 is called. */ 84 session_id: Uuid, 85 state: ClientSessionState, 86 client_type: ClientType, 87 sub_app_name: Option<String>, 88 sub_stream_name: Option<String>, 89 /*configure how many gops will be cached.*/ 90 gop_num: usize, 91 } 92 93 impl ClientSession { 94 pub fn new( 95 stream: TcpStream, 96 client_type: ClientType, 97 raw_domain_name: String, 98 app_name: String, 99 raw_stream_name: String, 100 event_producer: StreamHubEventSender, 101 gop_num: usize, 102 ) -> Self { 103 let remote_addr = if let Ok(addr) = stream.peer_addr() { 104 log::info!("server session: {}", addr.to_string()); 105 Some(addr) 106 } else { 107 None 108 }; 109 110 let tcp_io: Box<dyn TNetIO + Send + Sync> = Box::new(TcpIO::new(stream)); 111 let net_io = Arc::new(Mutex::new(tcp_io)); 112 113 let subscriber_id = Uuid::new(RandomDigitCount::Four); 114 115 let packetizer = if client_type == ClientType::Publish { 116 Some(ChunkPacketizer::new(Arc::clone(&net_io))) 117 } else { 118 None 119 }; 120 121 let common = Common::new(packetizer, event_producer, SessionType::Client, remote_addr); 122 123 let (stream_name, _) = RtmpUrlParser::default() 124 .set_raw_stream_name(raw_stream_name.clone()) 125 .parse_raw_stream_name(); 126 127 Self { 128 io: Arc::clone(&net_io), 129 common, 130 handshaker: SimpleHandshakeClient::new(Arc::clone(&net_io)), 131 unpacketizer: ChunkUnpacketizer::new(), 132 raw_domain_name, 133 app_name, 134 raw_stream_name, 135 stream_name, 136 session_id: subscriber_id, 137 state: ClientSessionState::Handshake, 138 client_type, 139 sub_app_name: None, 140 sub_stream_name: None, 141 gop_num, 142 } 143 } 144 145 pub async fn run(&mut self) -> Result<(), SessionError> { 146 loop { 147 match self.state { 148 ClientSessionState::Handshake => { 149 log::info!("[C -> S] handshake..."); 150 self.handshake().await?; 151 continue; 152 } 153 ClientSessionState::Connect => { 154 log::info!("[C -> S] connect..."); 155 self.send_connect(&(define::TRANSACTION_ID_CONNECT as f64)) 156 .await?; 157 self.state = ClientSessionState::WaitStateChange; 158 } 159 ClientSessionState::CreateStream => { 160 log::info!("[C -> S] CreateStream..."); 161 self.send_create_stream(&(define::TRANSACTION_ID_CREATE_STREAM as f64)) 162 .await?; 163 self.state = ClientSessionState::WaitStateChange; 164 } 165 ClientSessionState::Play => { 166 log::info!("[C -> S] Play..."); 167 self.send_play(&0.0, &self.raw_stream_name.clone(), &0.0, &0.0, &false) 168 .await?; 169 self.state = ClientSessionState::WaitStateChange; 170 } 171 ClientSessionState::PublishingContent => { 172 log::info!("[C -> S] PublishingContent..."); 173 self.send_publish(&3.0, &self.raw_stream_name.clone(), &"live".to_string()) 174 .await?; 175 self.state = ClientSessionState::WaitStateChange; 176 } 177 ClientSessionState::StartPublish => { 178 log::info!("[C -> S] StartPublish..."); 179 self.common.send_channel_data().await?; 180 } 181 ClientSessionState::WaitStateChange => {} 182 } 183 184 let data = self.io.lock().await.read().await?; 185 self.unpacketizer.extend_data(&data[..]); 186 187 loop { 188 match self.unpacketizer.read_chunks() { 189 Ok(rv) => { 190 if let UnpackResult::Chunks(chunks) = rv { 191 for chunk_info in chunks.iter() { 192 let mut msg = MessageParser::new(chunk_info.clone()).parse()?; 193 let timestamp = chunk_info.message_header.timestamp; 194 self.process_messages(&mut msg, ×tamp).await?; 195 } 196 } 197 } 198 Err(err) => { 199 log::trace!("read trunks error: {}", err); 200 break; 201 } 202 } 203 } 204 } 205 } 206 207 async fn handshake(&mut self) -> Result<(), SessionError> { 208 loop { 209 self.handshaker.handshake().await?; 210 if self.handshaker.state == ClientHandshakeState::Finish { 211 log::info!("handshake finish"); 212 break; 213 } 214 215 let mut bytes_len = 0; 216 while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE * 2 { 217 let data = self.io.lock().await.read().await?; 218 bytes_len += data.len(); 219 self.handshaker.extend_data(&data[..]); 220 } 221 } 222 223 self.state = ClientSessionState::Connect; 224 225 Ok(()) 226 } 227 228 pub async fn process_messages( 229 &mut self, 230 msg: &mut RtmpMessageData, 231 timestamp: &u32, 232 ) -> Result<(), SessionError> { 233 match msg { 234 RtmpMessageData::Amf0Command { 235 command_name, 236 transaction_id, 237 command_object, 238 others, 239 } => { 240 log::info!("[C <- S] on_amf0_command_message..."); 241 self.on_amf0_command_message(command_name, transaction_id, command_object, others) 242 .await? 243 } 244 RtmpMessageData::SetPeerBandwidth { .. } => { 245 log::info!("[C <- S] on_set_peer_bandwidth..."); 246 self.on_set_peer_bandwidth().await? 247 } 248 RtmpMessageData::WindowAcknowledgementSize { .. } => { 249 log::info!("[C <- S] on_windows_acknowledgement_size..."); 250 } 251 RtmpMessageData::SetChunkSize { chunk_size } => { 252 log::info!("[C <- S] on_set_chunk_size..."); 253 self.on_set_chunk_size(chunk_size)?; 254 } 255 RtmpMessageData::StreamBegin { stream_id } => { 256 log::info!("[C <- S] on_stream_begin..."); 257 self.on_stream_begin(stream_id)?; 258 } 259 RtmpMessageData::StreamIsRecorded { stream_id } => { 260 log::info!("[C <- S] on_stream_is_recorded..."); 261 self.on_stream_is_recorded(stream_id)?; 262 } 263 RtmpMessageData::AudioData { data } => { 264 self.common.on_audio_data(data, timestamp).await? 265 } 266 RtmpMessageData::VideoData { data } => { 267 self.common.on_video_data(data, timestamp).await? 268 } 269 RtmpMessageData::AmfData { raw_data } => { 270 self.common.on_meta_data(raw_data, timestamp).await?; 271 } 272 273 _ => {} 274 } 275 Ok(()) 276 } 277 278 pub async fn on_amf0_command_message( 279 &mut self, 280 command_name: &Amf0ValueType, 281 transaction_id: &Amf0ValueType, 282 command_object: &Amf0ValueType, 283 others: &mut Vec<Amf0ValueType>, 284 ) -> Result<(), SessionError> { 285 log::info!("[C <- S] on_amf0_command_message..."); 286 let empty_cmd_name = &String::new(); 287 let cmd_name = match command_name { 288 Amf0ValueType::UTF8String(str) => str, 289 _ => empty_cmd_name, 290 }; 291 292 let transaction_id = match transaction_id { 293 Amf0ValueType::Number(number) => *number as u8, 294 _ => 0, 295 }; 296 297 let empty_cmd_obj: IndexMap<String, Amf0ValueType> = IndexMap::new(); 298 let _ = match command_object { 299 Amf0ValueType::Object(obj) => obj, 300 // Amf0ValueType::Null => 301 _ => &empty_cmd_obj, 302 }; 303 304 match cmd_name.as_str() { 305 "_result" => match transaction_id { 306 define::TRANSACTION_ID_CONNECT => { 307 log::info!("[C <- S] on_result_connect..."); 308 self.on_result_connect().await?; 309 } 310 define::TRANSACTION_ID_CREATE_STREAM => { 311 log::info!("[C <- S] on_result_create_stream..."); 312 self.on_result_create_stream()?; 313 } 314 _ => {} 315 }, 316 "_error" => { 317 self.on_error()?; 318 } 319 "onStatus" => { 320 match others.remove(0) { 321 Amf0ValueType::Object(obj) => self.on_status(&obj).await?, 322 _ => { 323 return Err(SessionError { 324 value: SessionErrorValue::Amf0ValueCountNotCorrect, 325 }) 326 } 327 }; 328 } 329 330 _ => {} 331 } 332 333 Ok(()) 334 } 335 336 pub async fn send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 337 self.send_set_chunk_size().await?; 338 339 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 340 let mut properties = ConnectProperties::new_none(); 341 342 let url = format!( 343 "rtmp://{domain_name}/{app_name}", 344 domain_name = self.raw_domain_name, 345 app_name = self.app_name 346 ); 347 properties.app = Some(self.app_name.clone()); 348 349 match self.client_type { 350 ClientType::Play => { 351 properties.flash_ver = Some("LNX 9,0,124,2".to_string()); 352 properties.tc_url = Some(url.clone()); 353 properties.fpad = Some(false); 354 properties.capabilities = Some(15_f64); 355 properties.audio_codecs = Some(4071_f64); 356 properties.video_codecs = Some(252_f64); 357 properties.video_function = Some(1_f64); 358 } 359 ClientType::Publish => { 360 properties.pub_type = Some("nonprivate".to_string()); 361 properties.flash_ver = Some("FMLE/3.0 (compatible; xiu)".to_string()); 362 properties.fpad = Some(false); 363 properties.tc_url = Some(url.clone()); 364 } 365 } 366 367 netconnection 368 .write_connect(transaction_id, &properties) 369 .await?; 370 371 Ok(()) 372 } 373 374 pub async fn send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 375 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 376 netconnection.write_create_stream(transaction_id).await?; 377 378 Ok(()) 379 } 380 381 pub async fn send_delete_stream( 382 &mut self, 383 transaction_id: &f64, 384 stream_id: &f64, 385 ) -> Result<(), SessionError> { 386 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 387 netstream 388 .write_delete_stream(transaction_id, stream_id) 389 .await?; 390 391 Ok(()) 392 } 393 394 pub async fn send_publish( 395 &mut self, 396 transaction_id: &f64, 397 stream_name: &String, 398 stream_type: &String, 399 ) -> Result<(), SessionError> { 400 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 401 netstream 402 .write_publish(transaction_id, stream_name, stream_type) 403 .await?; 404 405 Ok(()) 406 } 407 408 pub async fn send_play( 409 &mut self, 410 transaction_id: &f64, 411 stream_name: &String, 412 start: &f64, 413 duration: &f64, 414 reset: &bool, 415 ) -> Result<(), SessionError> { 416 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 417 netstream 418 .write_play(transaction_id, stream_name, start, duration, reset) 419 .await?; 420 421 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 422 netconnection 423 .write_get_stream_length(transaction_id, stream_name) 424 .await?; 425 426 self.send_set_buffer_length(1, 1300).await?; 427 428 Ok(()) 429 } 430 431 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 432 let mut controlmessage = 433 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 434 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 435 Ok(()) 436 } 437 438 pub async fn send_window_acknowledgement_size( 439 &mut self, 440 window_size: u32, 441 ) -> Result<(), SessionError> { 442 let mut controlmessage = 443 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 444 controlmessage 445 .write_window_acknowledgement_size(window_size) 446 .await?; 447 Ok(()) 448 } 449 450 pub async fn send_set_buffer_length( 451 &mut self, 452 stream_id: u32, 453 ms: u32, 454 ) -> Result<(), SessionError> { 455 let mut eventmessages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 456 eventmessages.write_set_buffer_length(stream_id, ms).await?; 457 458 Ok(()) 459 } 460 461 pub async fn on_result_connect(&mut self) -> Result<(), SessionError> { 462 let mut controlmessage = 463 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 464 controlmessage.write_acknowledgement(3107).await?; 465 466 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 467 netstream 468 .write_release_stream(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name) 469 .await?; 470 netstream 471 .write_fcpublish(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name) 472 .await?; 473 474 self.state = ClientSessionState::CreateStream; 475 476 Ok(()) 477 } 478 479 pub fn on_result_create_stream(&mut self) -> Result<(), SessionError> { 480 match self.client_type { 481 ClientType::Play => { 482 self.state = ClientSessionState::Play; 483 } 484 ClientType::Publish => { 485 self.state = ClientSessionState::PublishingContent; 486 } 487 } 488 Ok(()) 489 } 490 491 pub fn on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError> { 492 self.unpacketizer 493 .update_max_chunk_size(*chunk_size as usize); 494 Ok(()) 495 } 496 497 pub fn on_stream_is_recorded(&mut self, stream_id: &mut u32) -> Result<(), SessionError> { 498 log::trace!("stream is recorded stream_id is {}", stream_id); 499 Ok(()) 500 } 501 502 pub fn on_stream_begin(&mut self, stream_id: &mut u32) -> Result<(), SessionError> { 503 log::trace!("stream is begin stream_id is {}", stream_id); 504 Ok(()) 505 } 506 507 pub async fn on_set_peer_bandwidth(&mut self) -> Result<(), SessionError> { 508 self.send_window_acknowledgement_size(5000000).await?; 509 510 Ok(()) 511 } 512 513 pub fn on_error(&mut self) -> Result<(), SessionError> { 514 Ok(()) 515 } 516 517 pub async fn on_status( 518 &mut self, 519 obj: &IndexMap<String, Amf0ValueType>, 520 ) -> Result<(), SessionError> { 521 if let Some(Amf0ValueType::UTF8String(code_info)) = obj.get("code") { 522 match &code_info[..] { 523 "NetStream.Publish.Start" => { 524 self.state = ClientSessionState::StartPublish; 525 //subscribe from local session and publish to remote rtmp server 526 if let (Some(app_name), Some(stream_name)) = 527 (&self.sub_app_name, &self.sub_stream_name) 528 { 529 self.common 530 .subscribe_from_channels( 531 app_name.clone(), 532 stream_name.clone(), 533 self.session_id, 534 ) 535 .await?; 536 } else { 537 self.common 538 .subscribe_from_channels( 539 self.app_name.clone(), 540 self.stream_name.clone(), 541 self.session_id, 542 ) 543 .await?; 544 } 545 } 546 "NetStream.Publish.Reset" => {} 547 "NetStream.Play.Start" => { 548 //pull from remote rtmp server and publish to local session 549 self.common 550 .publish_to_channels( 551 self.app_name.clone(), 552 self.stream_name.clone(), 553 self.session_id, 554 self.gop_num, 555 ) 556 .await? 557 } 558 _ => {} 559 } 560 } 561 log::trace!("{}", obj.len()); 562 Ok(()) 563 } 564 565 pub fn subscribe(&mut self, app_name: String, stream_name: String) { 566 self.sub_app_name = Some(app_name); 567 self.sub_stream_name = Some(stream_name); 568 } 569 } 570