1 use { 2 super::{ 3 common::Common, 4 define, 5 define::SessionType, 6 errors::{SessionError, SessionErrorValue}, 7 }, 8 crate::{ 9 amf0::Amf0ValueType, 10 chunk::{ 11 define::CHUNK_SIZE, 12 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 13 }, 14 config, handshake, 15 handshake::{define::ServerHandshakeState, handshake_server::HandshakeServer}, 16 messages::{define::RtmpMessageData, parser::MessageParser}, 17 netconnection::writer::{ConnectProperties, NetConnection}, 18 netstream::writer::NetStreamWriter, 19 protocol_control_messages::writer::ProtocolControlMessagesWriter, 20 user_control_messages::writer::EventMessagesWriter, 21 utils::RtmpUrlParser, 22 }, 23 bytes::BytesMut, 24 bytesio::{ 25 bytes_writer::AsyncBytesWriter, 26 bytesio::{TNetIO, TcpIO}, 27 }, 28 indexmap::IndexMap, 29 std::{sync::Arc, time::Duration}, 30 streamhub::{ 31 define::StreamHubEventSender, 32 utils::{RandomDigitCount, Uuid}, 33 }, 34 tokio::{net::TcpStream, sync::Mutex}, 35 }; 36 37 enum ServerSessionState { 38 Handshake, 39 ReadChunk, 40 // OnConnect, 41 // OnCreateStream, 42 //Publish, 43 DeleteStream, 44 Play, 45 } 46 47 pub struct ServerSession { 48 pub app_name: String, 49 pub stream_name: String, 50 pub url_parameters: String, 51 io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, 52 handshaker: HandshakeServer, 53 unpacketizer: ChunkUnpacketizer, 54 state: ServerSessionState, 55 bytesio_data: BytesMut, 56 has_remaing_data: bool, 57 /* Used to mark the subscriber's the data producer 58 in channels and delete it from map when unsubscribe 59 is called. */ 60 pub session_id: Uuid, 61 connect_properties: ConnectProperties, 62 pub common: Common, 63 /*configure how many gops will be cached.*/ 64 gop_num: usize, 65 } 66 67 impl ServerSession { 68 pub fn new(stream: TcpStream, event_producer: StreamHubEventSender, gop_num: usize) -> Self { 69 let remote_addr = if let Ok(addr) = stream.peer_addr() { 70 log::info!("server session: {}", addr.to_string()); 71 Some(addr) 72 } else { 73 None 74 }; 75 76 let tcp_io: Box<dyn TNetIO + Send + Sync> = Box::new(TcpIO::new(stream)); 77 let net_io = Arc::new(Mutex::new(tcp_io)); 78 79 let subscriber_id = Uuid::new(RandomDigitCount::Four); 80 Self { 81 app_name: String::from(""), 82 stream_name: String::from(""), 83 url_parameters: String::from(""), 84 io: Arc::clone(&net_io), 85 handshaker: HandshakeServer::new(Arc::clone(&net_io)), 86 unpacketizer: ChunkUnpacketizer::new(), 87 state: ServerSessionState::Handshake, 88 common: Common::new( 89 Arc::clone(&net_io), 90 event_producer, 91 SessionType::Server, 92 remote_addr, 93 ), 94 session_id: subscriber_id, 95 bytesio_data: BytesMut::new(), 96 has_remaing_data: false, 97 connect_properties: ConnectProperties::default(), 98 gop_num, 99 } 100 } 101 102 pub async fn run(&mut self) -> Result<(), SessionError> { 103 loop { 104 match self.state { 105 ServerSessionState::Handshake => { 106 self.handshake().await?; 107 } 108 ServerSessionState::ReadChunk => { 109 self.read_parse_chunks().await?; 110 } 111 ServerSessionState::Play => { 112 self.play().await?; 113 } 114 ServerSessionState::DeleteStream => { 115 return Ok(()); 116 } 117 } 118 } 119 120 //Ok(()) 121 } 122 123 async fn handshake(&mut self) -> Result<(), SessionError> { 124 let mut bytes_len = 0; 125 126 while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE { 127 self.bytesio_data = self.io.lock().await.read().await?; 128 bytes_len += self.bytesio_data.len(); 129 self.handshaker.extend_data(&self.bytesio_data[..]); 130 } 131 132 self.handshaker.handshake().await?; 133 134 if let ServerHandshakeState::Finish = self.handshaker.state() { 135 self.state = ServerSessionState::ReadChunk; 136 let left_bytes = self.handshaker.get_remaining_bytes(); 137 if !left_bytes.is_empty() { 138 self.unpacketizer.extend_data(&left_bytes[..]); 139 self.has_remaing_data = true; 140 } 141 log::info!("[ S->C ] [send_set_chunk_size] "); 142 self.send_set_chunk_size().await?; 143 return Ok(()); 144 } 145 146 Ok(()) 147 } 148 149 async fn read_parse_chunks(&mut self) -> Result<(), SessionError> { 150 if !self.has_remaing_data { 151 match self 152 .io 153 .lock() 154 .await 155 .read_timeout(Duration::from_secs(2)) 156 .await 157 { 158 Ok(data) => { 159 self.bytesio_data = data; 160 } 161 Err(err) => { 162 self.common 163 .unpublish_to_channels( 164 self.app_name.clone(), 165 self.stream_name.clone(), 166 self.session_id, 167 ) 168 .await?; 169 170 return Err(SessionError { 171 value: SessionErrorValue::BytesIOError(err), 172 }); 173 } 174 } 175 176 self.unpacketizer.extend_data(&self.bytesio_data[..]); 177 } 178 179 self.has_remaing_data = false; 180 181 loop { 182 let result = self.unpacketizer.read_chunks(); 183 184 if let Ok(rv) = result { 185 if let UnpackResult::Chunks(chunks) = rv { 186 for chunk_info in chunks { 187 let timestamp = chunk_info.message_header.timestamp; 188 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 189 190 let mut msg = MessageParser::new(chunk_info).parse()?; 191 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 192 .await?; 193 } 194 } 195 } else { 196 break; 197 } 198 } 199 Ok(()) 200 } 201 202 async fn play(&mut self) -> Result<(), SessionError> { 203 match self.common.send_channel_data().await { 204 Ok(_) => {} 205 Err(err) => { 206 self.common 207 .unsubscribe_from_channels( 208 self.app_name.clone(), 209 self.stream_name.clone(), 210 self.session_id, 211 ) 212 .await?; 213 return Err(err); 214 } 215 } 216 217 Ok(()) 218 } 219 220 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 221 let mut controlmessage = 222 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 223 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 224 225 Ok(()) 226 } 227 228 pub async fn process_messages( 229 &mut self, 230 rtmp_msg: &mut RtmpMessageData, 231 msg_stream_id: &u32, 232 timestamp: &u32, 233 ) -> Result<(), SessionError> { 234 match rtmp_msg { 235 RtmpMessageData::Amf0Command { 236 command_name, 237 transaction_id, 238 command_object, 239 others, 240 } => { 241 self.on_amf0_command_message( 242 msg_stream_id, 243 command_name, 244 transaction_id, 245 command_object, 246 others, 247 ) 248 .await? 249 } 250 RtmpMessageData::SetChunkSize { chunk_size } => { 251 self.on_set_chunk_size(*chunk_size as usize)?; 252 } 253 RtmpMessageData::AudioData { data } => { 254 self.common.on_audio_data(data, timestamp).await?; 255 } 256 RtmpMessageData::VideoData { data } => { 257 self.common.on_video_data(data, timestamp).await?; 258 } 259 RtmpMessageData::AmfData { raw_data } => { 260 self.common.on_meta_data(raw_data, timestamp).await?; 261 } 262 263 _ => {} 264 } 265 Ok(()) 266 } 267 268 pub async fn on_amf0_command_message( 269 &mut self, 270 stream_id: &u32, 271 command_name: &Amf0ValueType, 272 transaction_id: &Amf0ValueType, 273 command_object: &Amf0ValueType, 274 others: &mut Vec<Amf0ValueType>, 275 ) -> Result<(), SessionError> { 276 let empty_cmd_name = &String::new(); 277 let cmd_name = match command_name { 278 Amf0ValueType::UTF8String(str) => str, 279 _ => empty_cmd_name, 280 }; 281 282 let transaction_id = match transaction_id { 283 Amf0ValueType::Number(number) => number, 284 _ => &0.0, 285 }; 286 287 let empty_cmd_obj: IndexMap<String, Amf0ValueType> = IndexMap::new(); 288 let obj = match command_object { 289 Amf0ValueType::Object(obj) => obj, 290 _ => &empty_cmd_obj, 291 }; 292 293 match cmd_name.as_str() { 294 "connect" => { 295 log::info!("[ S<-C ] [connect] "); 296 self.on_connect(transaction_id, obj).await?; 297 } 298 "createStream" => { 299 log::info!("[ S<-C ] [create stream] "); 300 self.on_create_stream(transaction_id).await?; 301 } 302 "deleteStream" => { 303 if !others.is_empty() { 304 let stream_id = match others.pop() { 305 Some(Amf0ValueType::Number(streamid)) => streamid, 306 _ => 0.0, 307 }; 308 309 log::info!( 310 "[ S<-C ] [delete stream] app_name: {}, stream_name: {}", 311 self.app_name, 312 self.stream_name 313 ); 314 315 self.on_delete_stream(transaction_id, &stream_id).await?; 316 self.state = ServerSessionState::DeleteStream; 317 } 318 } 319 "play" => { 320 log::info!( 321 "[ S<-C ] [play] app_name: {}, stream_name: {}", 322 self.app_name, 323 self.stream_name 324 ); 325 self.unpacketizer.session_type = config::SERVER_PULL; 326 self.on_play(transaction_id, stream_id, others).await?; 327 } 328 "publish" => { 329 self.unpacketizer.session_type = config::SERVER_PUSH; 330 self.on_publish(transaction_id, stream_id, others).await?; 331 } 332 _ => {} 333 } 334 335 Ok(()) 336 } 337 338 fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> { 339 log::info!( 340 "[ S<-C ] [set chunk size] app_name: {}, stream_name: {}, chunk size: {}", 341 self.app_name, 342 self.stream_name, 343 chunk_size 344 ); 345 self.unpacketizer.update_max_chunk_size(chunk_size); 346 Ok(()) 347 } 348 349 fn parse_connect_properties(&mut self, command_obj: &IndexMap<String, Amf0ValueType>) { 350 for (property, value) in command_obj { 351 match property.as_str() { 352 "app" => { 353 if let Amf0ValueType::UTF8String(app) = value { 354 self.connect_properties.app = Some(app.clone()); 355 } 356 } 357 "flashVer" => { 358 if let Amf0ValueType::UTF8String(flash_ver) = value { 359 self.connect_properties.flash_ver = Some(flash_ver.clone()); 360 } 361 } 362 "swfUrl" => { 363 if let Amf0ValueType::UTF8String(swf_url) = value { 364 self.connect_properties.swf_url = Some(swf_url.clone()); 365 } 366 } 367 "tcUrl" => { 368 if let Amf0ValueType::UTF8String(tc_url) = value { 369 self.connect_properties.tc_url = Some(tc_url.clone()); 370 } 371 } 372 "fpad" => { 373 if let Amf0ValueType::Boolean(fpad) = value { 374 self.connect_properties.fpad = Some(*fpad); 375 } 376 } 377 "audioCodecs" => { 378 if let Amf0ValueType::Number(audio_codecs) = value { 379 self.connect_properties.audio_codecs = Some(*audio_codecs); 380 } 381 } 382 "videoCodecs" => { 383 if let Amf0ValueType::Number(video_codecs) = value { 384 self.connect_properties.video_codecs = Some(*video_codecs); 385 } 386 } 387 "videoFunction" => { 388 if let Amf0ValueType::Number(video_function) = value { 389 self.connect_properties.video_function = Some(*video_function); 390 } 391 } 392 "pageUrl" => { 393 if let Amf0ValueType::UTF8String(page_url) = value { 394 self.connect_properties.page_url = Some(page_url.clone()); 395 } 396 } 397 "objectEncoding" => { 398 if let Amf0ValueType::Number(object_encoding) = value { 399 self.connect_properties.object_encoding = Some(*object_encoding); 400 } 401 } 402 _ => { 403 log::warn!("unknown connect properties: {}:{:?}", property, value); 404 } 405 } 406 } 407 } 408 409 async fn on_connect( 410 &mut self, 411 transaction_id: &f64, 412 command_obj: &IndexMap<String, Amf0ValueType>, 413 ) -> Result<(), SessionError> { 414 self.parse_connect_properties(command_obj); 415 log::info!("connect properties: {:?}", self.connect_properties); 416 let mut control_message = 417 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 418 log::info!("[ S->C ] [set window_acknowledgement_size]"); 419 control_message 420 .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 421 .await?; 422 423 log::info!("[ S->C ] [set set_peer_bandwidth]",); 424 control_message 425 .write_set_peer_bandwidth( 426 define::PEER_BANDWIDTH, 427 define::peer_bandwidth_limit_type::DYNAMIC, 428 ) 429 .await?; 430 431 let obj_encoding = command_obj.get("objectEncoding"); 432 let encoding = match obj_encoding { 433 Some(Amf0ValueType::Number(encoding)) => encoding, 434 _ => &define::OBJENCODING_AMF0, 435 }; 436 437 let app_name = command_obj.get("app"); 438 self.app_name = match app_name { 439 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 440 _ => { 441 return Err(SessionError { 442 value: SessionErrorValue::NoAppName, 443 }); 444 } 445 }; 446 447 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 448 log::info!("[ S->C ] [set connect_response]",); 449 netconnection 450 .write_connect_response( 451 transaction_id, 452 define::FMSVER, 453 &define::CAPABILITIES, 454 &String::from("NetConnection.Connect.Success"), 455 define::LEVEL, 456 &String::from("Connection Succeeded."), 457 encoding, 458 ) 459 .await?; 460 461 Ok(()) 462 } 463 464 pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 465 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 466 netconnection 467 .write_create_stream_response(transaction_id, &define::STREAM_ID) 468 .await?; 469 470 log::info!( 471 "[ S->C ] [create_stream_response] app_name: {}", 472 self.app_name, 473 ); 474 475 Ok(()) 476 } 477 478 pub async fn on_delete_stream( 479 &mut self, 480 transaction_id: &f64, 481 stream_id: &f64, 482 ) -> Result<(), SessionError> { 483 self.common 484 .unpublish_to_channels( 485 self.app_name.clone(), 486 self.stream_name.clone(), 487 self.session_id, 488 ) 489 .await?; 490 491 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 492 netstream 493 .write_on_status( 494 transaction_id, 495 "status", 496 "NetStream.DeleteStream.Suceess", 497 "", 498 ) 499 .await?; 500 501 //self.unsubscribe_from_channels().await?; 502 log::info!( 503 "[ S->C ] [delete stream success] app_name: {}, stream_name: {}", 504 self.app_name, 505 self.stream_name 506 ); 507 log::trace!("{}", stream_id); 508 509 Ok(()) 510 } 511 512 fn get_request_url(&mut self, raw_stream_name: String) -> String { 513 if let Some(tc_url) = &self.connect_properties.tc_url { 514 format!("{tc_url}/{raw_stream_name}") 515 } else { 516 format!("{}/{}", self.app_name.clone(), raw_stream_name) 517 } 518 } 519 520 #[allow(clippy::never_loop)] 521 pub async fn on_play( 522 &mut self, 523 transaction_id: &f64, 524 stream_id: &u32, 525 other_values: &mut Vec<Amf0ValueType>, 526 ) -> Result<(), SessionError> { 527 let length = other_values.len() as u8; 528 let mut index: u8 = 0; 529 530 let mut stream_name: Option<String> = None; 531 let mut start: Option<f64> = None; 532 let mut duration: Option<f64> = None; 533 let mut reset: Option<bool> = None; 534 535 loop { 536 if index >= length { 537 break; 538 } 539 index += 1; 540 stream_name = match other_values.remove(0) { 541 Amf0ValueType::UTF8String(val) => Some(val), 542 _ => None, 543 }; 544 545 if index >= length { 546 break; 547 } 548 index += 1; 549 start = match other_values.remove(0) { 550 Amf0ValueType::Number(val) => Some(val), 551 _ => None, 552 }; 553 554 if index >= length { 555 break; 556 } 557 index += 1; 558 duration = match other_values.remove(0) { 559 Amf0ValueType::Number(val) => Some(val), 560 _ => None, 561 }; 562 563 if index >= length { 564 break; 565 } 566 //index = index + 1; 567 reset = match other_values.remove(0) { 568 Amf0ValueType::Boolean(val) => Some(val), 569 _ => None, 570 }; 571 break; 572 } 573 574 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 575 event_messages.write_stream_begin(*stream_id).await?; 576 log::info!( 577 "[ S->C ] [stream begin] app_name: {}, stream_name: {}", 578 self.app_name, 579 self.stream_name 580 ); 581 log::trace!( 582 "{} {} {}", 583 start.is_some(), 584 duration.is_some(), 585 reset.is_some() 586 ); 587 588 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 589 netstream 590 .write_on_status(transaction_id, "status", "NetStream.Play.Reset", "reset") 591 .await?; 592 593 netstream 594 .write_on_status( 595 transaction_id, 596 "status", 597 "NetStream.Play.Start", 598 "play start", 599 ) 600 .await?; 601 602 netstream 603 .write_on_status( 604 transaction_id, 605 "status", 606 "NetStream.Data.Start", 607 "data start.", 608 ) 609 .await?; 610 611 netstream 612 .write_on_status( 613 transaction_id, 614 "status", 615 "NetStream.Play.PublishNotify", 616 "play publish notify.", 617 ) 618 .await?; 619 620 event_messages.write_stream_is_record(*stream_id).await?; 621 622 let raw_stream_name = stream_name.unwrap(); 623 624 (self.stream_name, self.url_parameters) = RtmpUrlParser::default() 625 .set_raw_stream_name(raw_stream_name.clone()) 626 .parse_raw_stream_name(); 627 628 log::info!( 629 "[ S->C ] [stream is record] app_name: {}, stream_name: {}, url parameters: {}", 630 self.app_name, 631 self.stream_name, 632 self.url_parameters 633 ); 634 635 /*Now it can update the request url*/ 636 self.common.request_url = self.get_request_url(raw_stream_name); 637 self.common 638 .subscribe_from_channels( 639 self.app_name.clone(), 640 self.stream_name.clone(), 641 self.session_id, 642 ) 643 .await?; 644 645 self.state = ServerSessionState::Play; 646 647 Ok(()) 648 } 649 650 pub async fn on_publish( 651 &mut self, 652 transaction_id: &f64, 653 stream_id: &u32, 654 other_values: &mut Vec<Amf0ValueType>, 655 ) -> Result<(), SessionError> { 656 let length = other_values.len(); 657 658 if length < 2 { 659 return Err(SessionError { 660 value: SessionErrorValue::Amf0ValueCountNotCorrect, 661 }); 662 } 663 664 let raw_stream_name = match other_values.remove(0) { 665 Amf0ValueType::UTF8String(val) => val, 666 _ => { 667 return Err(SessionError { 668 value: SessionErrorValue::Amf0ValueCountNotCorrect, 669 }); 670 } 671 }; 672 673 (self.stream_name, self.url_parameters) = RtmpUrlParser::default() 674 .set_raw_stream_name(raw_stream_name.clone()) 675 .parse_raw_stream_name(); 676 677 /*Now it can update the request url*/ 678 self.common.request_url = self.get_request_url(raw_stream_name); 679 680 let _ = match other_values.remove(0) { 681 Amf0ValueType::UTF8String(val) => val, 682 _ => { 683 return Err(SessionError { 684 value: SessionErrorValue::Amf0ValueCountNotCorrect, 685 }); 686 } 687 }; 688 689 log::info!( 690 "[ S<-C ] [publish] app_name: {}, stream_name: {}, url parameters: {}", 691 self.app_name, 692 self.stream_name, 693 self.url_parameters 694 ); 695 696 log::info!( 697 "[ S->C ] [stream begin] app_name: {}, stream_name: {}, url parameters: {}", 698 self.app_name, 699 self.stream_name, 700 self.url_parameters 701 ); 702 703 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 704 event_messages.write_stream_begin(*stream_id).await?; 705 706 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 707 netstream 708 .write_on_status(transaction_id, "status", "NetStream.Publish.Start", "") 709 .await?; 710 log::info!( 711 "[ S->C ] [NetStream.Publish.Start] app_name: {}, stream_name: {}", 712 self.app_name, 713 self.stream_name 714 ); 715 716 self.common 717 .publish_to_channels( 718 self.app_name.clone(), 719 self.stream_name.clone(), 720 self.session_id, 721 self.gop_num, 722 ) 723 .await?; 724 725 Ok(()) 726 } 727 } 728