1 use crate::chunk::{errors::UnpackErrorValue, packetizer::ChunkPacketizer}; 2 3 use { 4 super::{ 5 common::Common, 6 define, 7 define::SessionType, 8 errors::{SessionError, SessionErrorValue}, 9 }, 10 crate::{ 11 amf0::Amf0ValueType, 12 chunk::{ 13 define::CHUNK_SIZE, 14 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 15 }, 16 config, handshake, 17 handshake::{define::ServerHandshakeState, handshake_server::HandshakeServer}, 18 messages::{define::RtmpMessageData, parser::MessageParser}, 19 netconnection::writer::{ConnectProperties, NetConnection}, 20 netstream::writer::NetStreamWriter, 21 protocol_control_messages::writer::ProtocolControlMessagesWriter, 22 user_control_messages::writer::EventMessagesWriter, 23 utils::RtmpUrlParser, 24 }, 25 bytes::BytesMut, 26 bytesio::{ 27 bytes_writer::AsyncBytesWriter, 28 bytesio::{TNetIO, TcpIO}, 29 }, 30 indexmap::IndexMap, 31 std::{sync::Arc, time::Duration}, 32 streamhub::{ 33 define::StreamHubEventSender, 34 utils::{RandomDigitCount, Uuid}, 35 }, 36 tokio::{net::TcpStream, sync::Mutex}, 37 }; 38 39 enum ServerSessionState { 40 Handshake, 41 ReadChunk, 42 // OnConnect, 43 // OnCreateStream, 44 //Publish, 45 DeleteStream, 46 Play, 47 } 48 49 pub struct ServerSession { 50 pub app_name: String, 51 pub stream_name: String, 52 pub url_parameters: String, 53 io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, 54 handshaker: HandshakeServer, 55 unpacketizer: ChunkUnpacketizer, 56 state: ServerSessionState, 57 bytesio_data: BytesMut, 58 has_remaing_data: bool, 59 /* Used to mark the subscriber's the data producer 60 in channels and delete it from map when unsubscribe 61 is called. */ 62 pub session_id: Uuid, 63 connect_properties: ConnectProperties, 64 pub common: Common, 65 /*configure how many gops will be cached.*/ 66 gop_num: usize, 67 } 68 69 impl ServerSession { new(stream: TcpStream, event_producer: StreamHubEventSender, gop_num: usize) -> Self70 pub fn new(stream: TcpStream, event_producer: StreamHubEventSender, gop_num: usize) -> Self { 71 let remote_addr = if let Ok(addr) = stream.peer_addr() { 72 log::info!("server session: {}", addr.to_string()); 73 Some(addr) 74 } else { 75 None 76 }; 77 78 let tcp_io: Box<dyn TNetIO + Send + Sync> = Box::new(TcpIO::new(stream)); 79 let net_io = Arc::new(Mutex::new(tcp_io)); 80 81 Self { 82 app_name: String::from(""), 83 stream_name: String::from(""), 84 url_parameters: String::from(""), 85 io: Arc::clone(&net_io), 86 handshaker: HandshakeServer::new(Arc::clone(&net_io)), 87 unpacketizer: ChunkUnpacketizer::new(), 88 state: ServerSessionState::Handshake, 89 common: Common::new( 90 Some(ChunkPacketizer::new(Arc::clone(&net_io))), 91 event_producer, 92 SessionType::Server, 93 remote_addr, 94 ), 95 session_id: Uuid::new(RandomDigitCount::Four), 96 bytesio_data: BytesMut::new(), 97 has_remaing_data: false, 98 connect_properties: ConnectProperties::default(), 99 gop_num, 100 } 101 } 102 run(&mut self) -> Result<(), SessionError>103 pub async fn run(&mut self) -> Result<(), SessionError> { 104 loop { 105 match self.state { 106 ServerSessionState::Handshake => { 107 self.handshake().await?; 108 } 109 ServerSessionState::ReadChunk => { 110 self.read_parse_chunks().await?; 111 } 112 ServerSessionState::Play => { 113 self.play().await?; 114 } 115 ServerSessionState::DeleteStream => { 116 return Ok(()); 117 } 118 } 119 } 120 121 //Ok(()) 122 } 123 handshake(&mut self) -> Result<(), SessionError>124 async fn handshake(&mut self) -> Result<(), SessionError> { 125 let mut bytes_len = 0; 126 127 while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE { 128 self.bytesio_data = self.io.lock().await.read().await?; 129 bytes_len += self.bytesio_data.len(); 130 self.handshaker.extend_data(&self.bytesio_data[..]); 131 } 132 133 self.handshaker.handshake().await?; 134 135 if let ServerHandshakeState::Finish = self.handshaker.state() { 136 self.state = ServerSessionState::ReadChunk; 137 let left_bytes = self.handshaker.get_remaining_bytes(); 138 if !left_bytes.is_empty() { 139 self.unpacketizer.extend_data(&left_bytes[..]); 140 self.has_remaing_data = true; 141 } 142 log::info!("[ S->C ] [send_set_chunk_size] "); 143 self.send_set_chunk_size().await?; 144 return Ok(()); 145 } 146 147 Ok(()) 148 } 149 read_parse_chunks(&mut self) -> Result<(), SessionError>150 async fn read_parse_chunks(&mut self) -> Result<(), SessionError> { 151 if !self.has_remaing_data { 152 match self 153 .io 154 .lock() 155 .await 156 .read_timeout(Duration::from_secs(2)) 157 .await 158 { 159 Ok(data) => { 160 self.bytesio_data = data; 161 } 162 Err(err) => { 163 self.common 164 .unpublish_to_channels( 165 self.app_name.clone(), 166 self.stream_name.clone(), 167 self.session_id, 168 ) 169 .await?; 170 171 return Err(SessionError { 172 value: SessionErrorValue::BytesIOError(err), 173 }); 174 } 175 } 176 177 self.unpacketizer.extend_data(&self.bytesio_data[..]); 178 } 179 180 self.has_remaing_data = false; 181 182 loop { 183 match self.unpacketizer.read_chunks() { 184 Ok(rv) => { 185 if let UnpackResult::Chunks(chunks) = rv { 186 for chunk_info in chunks { 187 let timestamp = chunk_info.message_header.timestamp; 188 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 189 190 if let Some(mut msg) = MessageParser::new(chunk_info).parse()? { 191 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 192 .await?; 193 } 194 } 195 } 196 } 197 Err(err) => { 198 if let UnpackErrorValue::CannotParse = err.value { 199 self.common 200 .unpublish_to_channels( 201 self.app_name.clone(), 202 self.stream_name.clone(), 203 self.session_id, 204 ) 205 .await?; 206 return Err(err)?; 207 } 208 break; 209 } 210 } 211 } 212 Ok(()) 213 } 214 play(&mut self) -> Result<(), SessionError>215 async fn play(&mut self) -> Result<(), SessionError> { 216 match self.common.send_channel_data().await { 217 Ok(_) => {} 218 Err(err) => { 219 self.common 220 .unsubscribe_from_channels( 221 self.app_name.clone(), 222 self.stream_name.clone(), 223 self.session_id, 224 ) 225 .await?; 226 return Err(err); 227 } 228 } 229 230 Ok(()) 231 } 232 send_set_chunk_size(&mut self) -> Result<(), SessionError>233 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 234 let mut controlmessage = 235 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 236 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 237 238 Ok(()) 239 } 240 process_messages( &mut self, rtmp_msg: &mut RtmpMessageData, msg_stream_id: &u32, timestamp: &u32, ) -> Result<(), SessionError>241 pub async fn process_messages( 242 &mut self, 243 rtmp_msg: &mut RtmpMessageData, 244 msg_stream_id: &u32, 245 timestamp: &u32, 246 ) -> Result<(), SessionError> { 247 match rtmp_msg { 248 RtmpMessageData::Amf0Command { 249 command_name, 250 transaction_id, 251 command_object, 252 others, 253 } => { 254 self.on_amf0_command_message( 255 msg_stream_id, 256 command_name, 257 transaction_id, 258 command_object, 259 others, 260 ) 261 .await? 262 } 263 RtmpMessageData::SetChunkSize { chunk_size } => { 264 self.on_set_chunk_size(*chunk_size as usize)?; 265 } 266 RtmpMessageData::AudioData { data } => { 267 self.common.on_audio_data(data, timestamp).await?; 268 } 269 RtmpMessageData::VideoData { data } => { 270 self.common.on_video_data(data, timestamp).await?; 271 } 272 RtmpMessageData::AmfData { raw_data } => { 273 self.common.on_meta_data(raw_data, timestamp).await?; 274 } 275 276 _ => {} 277 } 278 Ok(()) 279 } 280 on_amf0_command_message( &mut self, stream_id: &u32, command_name: &Amf0ValueType, transaction_id: &Amf0ValueType, command_object: &Amf0ValueType, others: &mut Vec<Amf0ValueType>, ) -> Result<(), SessionError>281 pub async fn on_amf0_command_message( 282 &mut self, 283 stream_id: &u32, 284 command_name: &Amf0ValueType, 285 transaction_id: &Amf0ValueType, 286 command_object: &Amf0ValueType, 287 others: &mut Vec<Amf0ValueType>, 288 ) -> Result<(), SessionError> { 289 let empty_cmd_name = &String::new(); 290 let cmd_name = match command_name { 291 Amf0ValueType::UTF8String(str) => str, 292 _ => empty_cmd_name, 293 }; 294 295 let transaction_id = match transaction_id { 296 Amf0ValueType::Number(number) => number, 297 _ => &0.0, 298 }; 299 300 let empty_cmd_obj: IndexMap<String, Amf0ValueType> = IndexMap::new(); 301 let obj = match command_object { 302 Amf0ValueType::Object(obj) => obj, 303 _ => &empty_cmd_obj, 304 }; 305 306 match cmd_name.as_str() { 307 "connect" => { 308 log::info!("[ S<-C ] [connect] "); 309 self.on_connect(transaction_id, obj).await?; 310 } 311 "createStream" => { 312 log::info!("[ S<-C ] [create stream] "); 313 self.on_create_stream(transaction_id).await?; 314 } 315 "deleteStream" => { 316 if !others.is_empty() { 317 let stream_id = match others.pop() { 318 Some(Amf0ValueType::Number(streamid)) => streamid, 319 _ => 0.0, 320 }; 321 322 log::info!( 323 "[ S<-C ] [delete stream] app_name: {}, stream_name: {}", 324 self.app_name, 325 self.stream_name 326 ); 327 328 self.on_delete_stream(transaction_id, &stream_id).await?; 329 self.state = ServerSessionState::DeleteStream; 330 } 331 } 332 "play" => { 333 log::info!( 334 "[ S<-C ] [play] app_name: {}, stream_name: {}", 335 self.app_name, 336 self.stream_name 337 ); 338 self.unpacketizer.session_type = config::SERVER_PULL; 339 self.on_play(transaction_id, stream_id, others).await?; 340 } 341 "publish" => { 342 self.unpacketizer.session_type = config::SERVER_PUSH; 343 self.on_publish(transaction_id, stream_id, others).await?; 344 } 345 _ => {} 346 } 347 348 Ok(()) 349 } 350 on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError>351 fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> { 352 log::info!( 353 "[ S<-C ] [set chunk size] app_name: {}, stream_name: {}, chunk size: {}", 354 self.app_name, 355 self.stream_name, 356 chunk_size 357 ); 358 self.unpacketizer.update_max_chunk_size(chunk_size); 359 Ok(()) 360 } 361 parse_connect_properties(&mut self, command_obj: &IndexMap<String, Amf0ValueType>)362 fn parse_connect_properties(&mut self, command_obj: &IndexMap<String, Amf0ValueType>) { 363 for (property, value) in command_obj { 364 match property.as_str() { 365 "app" => { 366 if let Amf0ValueType::UTF8String(app) = value { 367 self.connect_properties.app = Some(app.clone()); 368 } 369 } 370 "flashVer" => { 371 if let Amf0ValueType::UTF8String(flash_ver) = value { 372 self.connect_properties.flash_ver = Some(flash_ver.clone()); 373 } 374 } 375 "swfUrl" => { 376 if let Amf0ValueType::UTF8String(swf_url) = value { 377 self.connect_properties.swf_url = Some(swf_url.clone()); 378 } 379 } 380 "tcUrl" => { 381 if let Amf0ValueType::UTF8String(tc_url) = value { 382 self.connect_properties.tc_url = Some(tc_url.clone()); 383 } 384 } 385 "fpad" => { 386 if let Amf0ValueType::Boolean(fpad) = value { 387 self.connect_properties.fpad = Some(*fpad); 388 } 389 } 390 "audioCodecs" => { 391 if let Amf0ValueType::Number(audio_codecs) = value { 392 self.connect_properties.audio_codecs = Some(*audio_codecs); 393 } 394 } 395 "videoCodecs" => { 396 if let Amf0ValueType::Number(video_codecs) = value { 397 self.connect_properties.video_codecs = Some(*video_codecs); 398 } 399 } 400 "videoFunction" => { 401 if let Amf0ValueType::Number(video_function) = value { 402 self.connect_properties.video_function = Some(*video_function); 403 } 404 } 405 "pageUrl" => { 406 if let Amf0ValueType::UTF8String(page_url) = value { 407 self.connect_properties.page_url = Some(page_url.clone()); 408 } 409 } 410 "objectEncoding" => { 411 if let Amf0ValueType::Number(object_encoding) = value { 412 self.connect_properties.object_encoding = Some(*object_encoding); 413 } 414 } 415 _ => { 416 log::warn!("unknown connect properties: {}:{:?}", property, value); 417 } 418 } 419 } 420 } 421 on_connect( &mut self, transaction_id: &f64, command_obj: &IndexMap<String, Amf0ValueType>, ) -> Result<(), SessionError>422 async fn on_connect( 423 &mut self, 424 transaction_id: &f64, 425 command_obj: &IndexMap<String, Amf0ValueType>, 426 ) -> Result<(), SessionError> { 427 self.parse_connect_properties(command_obj); 428 log::info!("connect properties: {:?}", self.connect_properties); 429 let mut control_message = 430 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 431 log::info!("[ S->C ] [set window_acknowledgement_size]"); 432 control_message 433 .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 434 .await?; 435 436 log::info!("[ S->C ] [set set_peer_bandwidth]",); 437 control_message 438 .write_set_peer_bandwidth( 439 define::PEER_BANDWIDTH, 440 define::peer_bandwidth_limit_type::DYNAMIC, 441 ) 442 .await?; 443 444 let obj_encoding = command_obj.get("objectEncoding"); 445 let encoding = match obj_encoding { 446 Some(Amf0ValueType::Number(encoding)) => encoding, 447 _ => &define::OBJENCODING_AMF0, 448 }; 449 450 let app_name = command_obj.get("app"); 451 self.app_name = match app_name { 452 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 453 _ => { 454 return Err(SessionError { 455 value: SessionErrorValue::NoAppName, 456 }); 457 } 458 }; 459 460 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 461 log::info!("[ S->C ] [set connect_response]",); 462 netconnection 463 .write_connect_response( 464 transaction_id, 465 define::FMSVER, 466 &define::CAPABILITIES, 467 &String::from("NetConnection.Connect.Success"), 468 define::LEVEL, 469 &String::from("Connection Succeeded."), 470 encoding, 471 ) 472 .await?; 473 474 Ok(()) 475 } 476 on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError>477 pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 478 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 479 netconnection 480 .write_create_stream_response(transaction_id, &define::STREAM_ID) 481 .await?; 482 483 log::info!( 484 "[ S->C ] [create_stream_response] app_name: {}", 485 self.app_name, 486 ); 487 488 Ok(()) 489 } 490 on_delete_stream( &mut self, transaction_id: &f64, stream_id: &f64, ) -> Result<(), SessionError>491 pub async fn on_delete_stream( 492 &mut self, 493 transaction_id: &f64, 494 stream_id: &f64, 495 ) -> Result<(), SessionError> { 496 self.common 497 .unpublish_to_channels( 498 self.app_name.clone(), 499 self.stream_name.clone(), 500 self.session_id, 501 ) 502 .await?; 503 504 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 505 netstream 506 .write_on_status( 507 transaction_id, 508 "status", 509 "NetStream.DeleteStream.Suceess", 510 "", 511 ) 512 .await?; 513 514 //self.unsubscribe_from_channels().await?; 515 log::info!( 516 "[ S->C ] [delete stream success] app_name: {}, stream_name: {}", 517 self.app_name, 518 self.stream_name 519 ); 520 log::trace!("{}", stream_id); 521 522 Ok(()) 523 } 524 get_request_url(&mut self, raw_stream_name: String) -> String525 fn get_request_url(&mut self, raw_stream_name: String) -> String { 526 if let Some(tc_url) = &self.connect_properties.tc_url { 527 format!("{tc_url}/{raw_stream_name}") 528 } else { 529 format!("{}/{}", self.app_name.clone(), raw_stream_name) 530 } 531 } 532 533 #[allow(clippy::never_loop)] on_play( &mut self, transaction_id: &f64, stream_id: &u32, other_values: &mut Vec<Amf0ValueType>, ) -> Result<(), SessionError>534 pub async fn on_play( 535 &mut self, 536 transaction_id: &f64, 537 stream_id: &u32, 538 other_values: &mut Vec<Amf0ValueType>, 539 ) -> Result<(), SessionError> { 540 let length = other_values.len() as u8; 541 let mut index: u8 = 0; 542 543 let mut stream_name: Option<String> = None; 544 let mut start: Option<f64> = None; 545 let mut duration: Option<f64> = None; 546 let mut reset: Option<bool> = None; 547 548 loop { 549 if index >= length { 550 break; 551 } 552 index += 1; 553 stream_name = match other_values.remove(0) { 554 Amf0ValueType::UTF8String(val) => Some(val), 555 _ => None, 556 }; 557 558 if index >= length { 559 break; 560 } 561 index += 1; 562 start = match other_values.remove(0) { 563 Amf0ValueType::Number(val) => Some(val), 564 _ => None, 565 }; 566 567 if index >= length { 568 break; 569 } 570 index += 1; 571 duration = match other_values.remove(0) { 572 Amf0ValueType::Number(val) => Some(val), 573 _ => None, 574 }; 575 576 if index >= length { 577 break; 578 } 579 //index = index + 1; 580 reset = match other_values.remove(0) { 581 Amf0ValueType::Boolean(val) => Some(val), 582 _ => None, 583 }; 584 break; 585 } 586 587 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 588 event_messages.write_stream_begin(*stream_id).await?; 589 log::info!( 590 "[ S->C ] [stream begin] app_name: {}, stream_name: {}", 591 self.app_name, 592 self.stream_name 593 ); 594 log::trace!( 595 "{} {} {}", 596 start.is_some(), 597 duration.is_some(), 598 reset.is_some() 599 ); 600 601 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 602 netstream 603 .write_on_status(transaction_id, "status", "NetStream.Play.Reset", "reset") 604 .await?; 605 606 netstream 607 .write_on_status( 608 transaction_id, 609 "status", 610 "NetStream.Play.Start", 611 "play start", 612 ) 613 .await?; 614 615 netstream 616 .write_on_status( 617 transaction_id, 618 "status", 619 "NetStream.Data.Start", 620 "data start.", 621 ) 622 .await?; 623 624 netstream 625 .write_on_status( 626 transaction_id, 627 "status", 628 "NetStream.Play.PublishNotify", 629 "play publish notify.", 630 ) 631 .await?; 632 633 event_messages.write_stream_is_record(*stream_id).await?; 634 635 let raw_stream_name = stream_name.unwrap(); 636 637 (self.stream_name, self.url_parameters) = RtmpUrlParser::default() 638 .set_raw_stream_name(raw_stream_name.clone()) 639 .parse_raw_stream_name(); 640 641 log::info!( 642 "[ S->C ] [stream is record] app_name: {}, stream_name: {}, url parameters: {}", 643 self.app_name, 644 self.stream_name, 645 self.url_parameters 646 ); 647 648 /*Now it can update the request url*/ 649 self.common.request_url = self.get_request_url(raw_stream_name); 650 self.common 651 .subscribe_from_channels( 652 self.app_name.clone(), 653 self.stream_name.clone(), 654 self.session_id, 655 ) 656 .await?; 657 658 self.state = ServerSessionState::Play; 659 660 Ok(()) 661 } 662 on_publish( &mut self, transaction_id: &f64, stream_id: &u32, other_values: &mut Vec<Amf0ValueType>, ) -> Result<(), SessionError>663 pub async fn on_publish( 664 &mut self, 665 transaction_id: &f64, 666 stream_id: &u32, 667 other_values: &mut Vec<Amf0ValueType>, 668 ) -> Result<(), SessionError> { 669 let length = other_values.len(); 670 671 if length < 2 { 672 return Err(SessionError { 673 value: SessionErrorValue::Amf0ValueCountNotCorrect, 674 }); 675 } 676 677 let raw_stream_name = match other_values.remove(0) { 678 Amf0ValueType::UTF8String(val) => val, 679 _ => { 680 return Err(SessionError { 681 value: SessionErrorValue::Amf0ValueCountNotCorrect, 682 }); 683 } 684 }; 685 686 (self.stream_name, self.url_parameters) = RtmpUrlParser::default() 687 .set_raw_stream_name(raw_stream_name.clone()) 688 .parse_raw_stream_name(); 689 690 /*Now it can update the request url*/ 691 self.common.request_url = self.get_request_url(raw_stream_name); 692 693 let _ = match other_values.remove(0) { 694 Amf0ValueType::UTF8String(val) => val, 695 _ => { 696 return Err(SessionError { 697 value: SessionErrorValue::Amf0ValueCountNotCorrect, 698 }); 699 } 700 }; 701 702 log::info!( 703 "[ S<-C ] [publish] app_name: {}, stream_name: {}, url parameters: {}", 704 self.app_name, 705 self.stream_name, 706 self.url_parameters 707 ); 708 709 log::info!( 710 "[ S->C ] [stream begin] app_name: {}, stream_name: {}, url parameters: {}", 711 self.app_name, 712 self.stream_name, 713 self.url_parameters 714 ); 715 716 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 717 event_messages.write_stream_begin(*stream_id).await?; 718 719 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 720 netstream 721 .write_on_status(transaction_id, "status", "NetStream.Publish.Start", "") 722 .await?; 723 log::info!( 724 "[ S->C ] [NetStream.Publish.Start] app_name: {}, stream_name: {}", 725 self.app_name, 726 self.stream_name 727 ); 728 729 self.common 730 .publish_to_channels( 731 self.app_name.clone(), 732 self.stream_name.clone(), 733 self.session_id, 734 self.gop_num, 735 ) 736 .await?; 737 738 Ok(()) 739 } 740 } 741