1 use crate::chunk::packetizer::ChunkPacketizer; 2 3 use { 4 super::{ 5 common::Common, 6 define, 7 define::SessionType, 8 errors::{SessionError, SessionErrorValue}, 9 }, 10 crate::{ 11 amf0::Amf0ValueType, 12 chunk::{ 13 define::CHUNK_SIZE, 14 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 15 }, 16 config, handshake, 17 handshake::{define::ServerHandshakeState, handshake_server::HandshakeServer}, 18 messages::{define::RtmpMessageData, parser::MessageParser}, 19 netconnection::writer::{ConnectProperties, NetConnection}, 20 netstream::writer::NetStreamWriter, 21 protocol_control_messages::writer::ProtocolControlMessagesWriter, 22 user_control_messages::writer::EventMessagesWriter, 23 utils::RtmpUrlParser, 24 }, 25 bytes::BytesMut, 26 bytesio::{ 27 bytes_writer::AsyncBytesWriter, 28 bytesio::{TNetIO, TcpIO}, 29 }, 30 indexmap::IndexMap, 31 std::{sync::Arc, time::Duration}, 32 streamhub::{ 33 define::StreamHubEventSender, 34 utils::{RandomDigitCount, Uuid}, 35 }, 36 tokio::{net::TcpStream, sync::Mutex}, 37 }; 38 39 enum ServerSessionState { 40 Handshake, 41 ReadChunk, 42 // OnConnect, 43 // OnCreateStream, 44 //Publish, 45 DeleteStream, 46 Play, 47 } 48 49 pub struct ServerSession { 50 pub app_name: String, 51 pub stream_name: String, 52 pub url_parameters: String, 53 io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, 54 handshaker: HandshakeServer, 55 unpacketizer: ChunkUnpacketizer, 56 state: ServerSessionState, 57 bytesio_data: BytesMut, 58 has_remaing_data: bool, 59 /* Used to mark the subscriber's the data producer 60 in channels and delete it from map when unsubscribe 61 is called. */ 62 pub session_id: Uuid, 63 connect_properties: ConnectProperties, 64 pub common: Common, 65 /*configure how many gops will be cached.*/ 66 gop_num: usize, 67 } 68 69 impl ServerSession { 70 pub fn new(stream: TcpStream, event_producer: StreamHubEventSender, gop_num: usize) -> Self { 71 let remote_addr = if let Ok(addr) = stream.peer_addr() { 72 log::info!("server session: {}", addr.to_string()); 73 Some(addr) 74 } else { 75 None 76 }; 77 78 let tcp_io: Box<dyn TNetIO + Send + Sync> = Box::new(TcpIO::new(stream)); 79 let net_io = Arc::new(Mutex::new(tcp_io)); 80 81 Self { 82 app_name: String::from(""), 83 stream_name: String::from(""), 84 url_parameters: String::from(""), 85 io: Arc::clone(&net_io), 86 handshaker: HandshakeServer::new(Arc::clone(&net_io)), 87 unpacketizer: ChunkUnpacketizer::new(), 88 state: ServerSessionState::Handshake, 89 common: Common::new( 90 Some(ChunkPacketizer::new(Arc::clone(&net_io))), 91 event_producer, 92 SessionType::Server, 93 remote_addr, 94 ), 95 session_id: Uuid::new(RandomDigitCount::Four), 96 bytesio_data: BytesMut::new(), 97 has_remaing_data: false, 98 connect_properties: ConnectProperties::default(), 99 gop_num, 100 } 101 } 102 103 pub async fn run(&mut self) -> Result<(), SessionError> { 104 loop { 105 match self.state { 106 ServerSessionState::Handshake => { 107 self.handshake().await?; 108 } 109 ServerSessionState::ReadChunk => { 110 self.read_parse_chunks().await?; 111 } 112 ServerSessionState::Play => { 113 self.play().await?; 114 } 115 ServerSessionState::DeleteStream => { 116 return Ok(()); 117 } 118 } 119 } 120 121 //Ok(()) 122 } 123 124 async fn handshake(&mut self) -> Result<(), SessionError> { 125 let mut bytes_len = 0; 126 127 while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE { 128 self.bytesio_data = self.io.lock().await.read().await?; 129 bytes_len += self.bytesio_data.len(); 130 self.handshaker.extend_data(&self.bytesio_data[..]); 131 } 132 133 self.handshaker.handshake().await?; 134 135 if let ServerHandshakeState::Finish = self.handshaker.state() { 136 self.state = ServerSessionState::ReadChunk; 137 let left_bytes = self.handshaker.get_remaining_bytes(); 138 if !left_bytes.is_empty() { 139 self.unpacketizer.extend_data(&left_bytes[..]); 140 self.has_remaing_data = true; 141 } 142 log::info!("[ S->C ] [send_set_chunk_size] "); 143 self.send_set_chunk_size().await?; 144 return Ok(()); 145 } 146 147 Ok(()) 148 } 149 150 async fn read_parse_chunks(&mut self) -> Result<(), SessionError> { 151 if !self.has_remaing_data { 152 match self 153 .io 154 .lock() 155 .await 156 .read_timeout(Duration::from_secs(2)) 157 .await 158 { 159 Ok(data) => { 160 self.bytesio_data = data; 161 } 162 Err(err) => { 163 self.common 164 .unpublish_to_channels( 165 self.app_name.clone(), 166 self.stream_name.clone(), 167 self.session_id, 168 ) 169 .await?; 170 171 return Err(SessionError { 172 value: SessionErrorValue::BytesIOError(err), 173 }); 174 } 175 } 176 177 self.unpacketizer.extend_data(&self.bytesio_data[..]); 178 } 179 180 self.has_remaing_data = false; 181 182 loop { 183 let result = self.unpacketizer.read_chunks(); 184 185 if let Ok(rv) = result { 186 if let UnpackResult::Chunks(chunks) = rv { 187 for chunk_info in chunks { 188 let timestamp = chunk_info.message_header.timestamp; 189 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 190 191 if let Some(mut msg) = MessageParser::new(chunk_info).parse()? { 192 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 193 .await?; 194 } 195 } 196 } 197 } else { 198 break; 199 } 200 } 201 Ok(()) 202 } 203 204 async fn play(&mut self) -> Result<(), SessionError> { 205 match self.common.send_channel_data().await { 206 Ok(_) => {} 207 Err(err) => { 208 self.common 209 .unsubscribe_from_channels( 210 self.app_name.clone(), 211 self.stream_name.clone(), 212 self.session_id, 213 ) 214 .await?; 215 return Err(err); 216 } 217 } 218 219 Ok(()) 220 } 221 222 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 223 let mut controlmessage = 224 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 225 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 226 227 Ok(()) 228 } 229 230 pub async fn process_messages( 231 &mut self, 232 rtmp_msg: &mut RtmpMessageData, 233 msg_stream_id: &u32, 234 timestamp: &u32, 235 ) -> Result<(), SessionError> { 236 match rtmp_msg { 237 RtmpMessageData::Amf0Command { 238 command_name, 239 transaction_id, 240 command_object, 241 others, 242 } => { 243 self.on_amf0_command_message( 244 msg_stream_id, 245 command_name, 246 transaction_id, 247 command_object, 248 others, 249 ) 250 .await? 251 } 252 RtmpMessageData::SetChunkSize { chunk_size } => { 253 self.on_set_chunk_size(*chunk_size as usize)?; 254 } 255 RtmpMessageData::AudioData { data } => { 256 self.common.on_audio_data(data, timestamp).await?; 257 } 258 RtmpMessageData::VideoData { data } => { 259 self.common.on_video_data(data, timestamp).await?; 260 } 261 RtmpMessageData::AmfData { raw_data } => { 262 self.common.on_meta_data(raw_data, timestamp).await?; 263 } 264 265 _ => {} 266 } 267 Ok(()) 268 } 269 270 pub async fn on_amf0_command_message( 271 &mut self, 272 stream_id: &u32, 273 command_name: &Amf0ValueType, 274 transaction_id: &Amf0ValueType, 275 command_object: &Amf0ValueType, 276 others: &mut Vec<Amf0ValueType>, 277 ) -> Result<(), SessionError> { 278 let empty_cmd_name = &String::new(); 279 let cmd_name = match command_name { 280 Amf0ValueType::UTF8String(str) => str, 281 _ => empty_cmd_name, 282 }; 283 284 let transaction_id = match transaction_id { 285 Amf0ValueType::Number(number) => number, 286 _ => &0.0, 287 }; 288 289 let empty_cmd_obj: IndexMap<String, Amf0ValueType> = IndexMap::new(); 290 let obj = match command_object { 291 Amf0ValueType::Object(obj) => obj, 292 _ => &empty_cmd_obj, 293 }; 294 295 match cmd_name.as_str() { 296 "connect" => { 297 log::info!("[ S<-C ] [connect] "); 298 self.on_connect(transaction_id, obj).await?; 299 } 300 "createStream" => { 301 log::info!("[ S<-C ] [create stream] "); 302 self.on_create_stream(transaction_id).await?; 303 } 304 "deleteStream" => { 305 if !others.is_empty() { 306 let stream_id = match others.pop() { 307 Some(Amf0ValueType::Number(streamid)) => streamid, 308 _ => 0.0, 309 }; 310 311 log::info!( 312 "[ S<-C ] [delete stream] app_name: {}, stream_name: {}", 313 self.app_name, 314 self.stream_name 315 ); 316 317 self.on_delete_stream(transaction_id, &stream_id).await?; 318 self.state = ServerSessionState::DeleteStream; 319 } 320 } 321 "play" => { 322 log::info!( 323 "[ S<-C ] [play] app_name: {}, stream_name: {}", 324 self.app_name, 325 self.stream_name 326 ); 327 self.unpacketizer.session_type = config::SERVER_PULL; 328 self.on_play(transaction_id, stream_id, others).await?; 329 } 330 "publish" => { 331 self.unpacketizer.session_type = config::SERVER_PUSH; 332 self.on_publish(transaction_id, stream_id, others).await?; 333 } 334 _ => {} 335 } 336 337 Ok(()) 338 } 339 340 fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> { 341 log::info!( 342 "[ S<-C ] [set chunk size] app_name: {}, stream_name: {}, chunk size: {}", 343 self.app_name, 344 self.stream_name, 345 chunk_size 346 ); 347 self.unpacketizer.update_max_chunk_size(chunk_size); 348 Ok(()) 349 } 350 351 fn parse_connect_properties(&mut self, command_obj: &IndexMap<String, Amf0ValueType>) { 352 for (property, value) in command_obj { 353 match property.as_str() { 354 "app" => { 355 if let Amf0ValueType::UTF8String(app) = value { 356 self.connect_properties.app = Some(app.clone()); 357 } 358 } 359 "flashVer" => { 360 if let Amf0ValueType::UTF8String(flash_ver) = value { 361 self.connect_properties.flash_ver = Some(flash_ver.clone()); 362 } 363 } 364 "swfUrl" => { 365 if let Amf0ValueType::UTF8String(swf_url) = value { 366 self.connect_properties.swf_url = Some(swf_url.clone()); 367 } 368 } 369 "tcUrl" => { 370 if let Amf0ValueType::UTF8String(tc_url) = value { 371 self.connect_properties.tc_url = Some(tc_url.clone()); 372 } 373 } 374 "fpad" => { 375 if let Amf0ValueType::Boolean(fpad) = value { 376 self.connect_properties.fpad = Some(*fpad); 377 } 378 } 379 "audioCodecs" => { 380 if let Amf0ValueType::Number(audio_codecs) = value { 381 self.connect_properties.audio_codecs = Some(*audio_codecs); 382 } 383 } 384 "videoCodecs" => { 385 if let Amf0ValueType::Number(video_codecs) = value { 386 self.connect_properties.video_codecs = Some(*video_codecs); 387 } 388 } 389 "videoFunction" => { 390 if let Amf0ValueType::Number(video_function) = value { 391 self.connect_properties.video_function = Some(*video_function); 392 } 393 } 394 "pageUrl" => { 395 if let Amf0ValueType::UTF8String(page_url) = value { 396 self.connect_properties.page_url = Some(page_url.clone()); 397 } 398 } 399 "objectEncoding" => { 400 if let Amf0ValueType::Number(object_encoding) = value { 401 self.connect_properties.object_encoding = Some(*object_encoding); 402 } 403 } 404 _ => { 405 log::warn!("unknown connect properties: {}:{:?}", property, value); 406 } 407 } 408 } 409 } 410 411 async fn on_connect( 412 &mut self, 413 transaction_id: &f64, 414 command_obj: &IndexMap<String, Amf0ValueType>, 415 ) -> Result<(), SessionError> { 416 self.parse_connect_properties(command_obj); 417 log::info!("connect properties: {:?}", self.connect_properties); 418 let mut control_message = 419 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 420 log::info!("[ S->C ] [set window_acknowledgement_size]"); 421 control_message 422 .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 423 .await?; 424 425 log::info!("[ S->C ] [set set_peer_bandwidth]",); 426 control_message 427 .write_set_peer_bandwidth( 428 define::PEER_BANDWIDTH, 429 define::peer_bandwidth_limit_type::DYNAMIC, 430 ) 431 .await?; 432 433 let obj_encoding = command_obj.get("objectEncoding"); 434 let encoding = match obj_encoding { 435 Some(Amf0ValueType::Number(encoding)) => encoding, 436 _ => &define::OBJENCODING_AMF0, 437 }; 438 439 let app_name = command_obj.get("app"); 440 self.app_name = match app_name { 441 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 442 _ => { 443 return Err(SessionError { 444 value: SessionErrorValue::NoAppName, 445 }); 446 } 447 }; 448 449 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 450 log::info!("[ S->C ] [set connect_response]",); 451 netconnection 452 .write_connect_response( 453 transaction_id, 454 define::FMSVER, 455 &define::CAPABILITIES, 456 &String::from("NetConnection.Connect.Success"), 457 define::LEVEL, 458 &String::from("Connection Succeeded."), 459 encoding, 460 ) 461 .await?; 462 463 Ok(()) 464 } 465 466 pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 467 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 468 netconnection 469 .write_create_stream_response(transaction_id, &define::STREAM_ID) 470 .await?; 471 472 log::info!( 473 "[ S->C ] [create_stream_response] app_name: {}", 474 self.app_name, 475 ); 476 477 Ok(()) 478 } 479 480 pub async fn on_delete_stream( 481 &mut self, 482 transaction_id: &f64, 483 stream_id: &f64, 484 ) -> Result<(), SessionError> { 485 self.common 486 .unpublish_to_channels( 487 self.app_name.clone(), 488 self.stream_name.clone(), 489 self.session_id, 490 ) 491 .await?; 492 493 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 494 netstream 495 .write_on_status( 496 transaction_id, 497 "status", 498 "NetStream.DeleteStream.Suceess", 499 "", 500 ) 501 .await?; 502 503 //self.unsubscribe_from_channels().await?; 504 log::info!( 505 "[ S->C ] [delete stream success] app_name: {}, stream_name: {}", 506 self.app_name, 507 self.stream_name 508 ); 509 log::trace!("{}", stream_id); 510 511 Ok(()) 512 } 513 514 fn get_request_url(&mut self, raw_stream_name: String) -> String { 515 if let Some(tc_url) = &self.connect_properties.tc_url { 516 format!("{tc_url}/{raw_stream_name}") 517 } else { 518 format!("{}/{}", self.app_name.clone(), raw_stream_name) 519 } 520 } 521 522 #[allow(clippy::never_loop)] 523 pub async fn on_play( 524 &mut self, 525 transaction_id: &f64, 526 stream_id: &u32, 527 other_values: &mut Vec<Amf0ValueType>, 528 ) -> Result<(), SessionError> { 529 let length = other_values.len() as u8; 530 let mut index: u8 = 0; 531 532 let mut stream_name: Option<String> = None; 533 let mut start: Option<f64> = None; 534 let mut duration: Option<f64> = None; 535 let mut reset: Option<bool> = None; 536 537 loop { 538 if index >= length { 539 break; 540 } 541 index += 1; 542 stream_name = match other_values.remove(0) { 543 Amf0ValueType::UTF8String(val) => Some(val), 544 _ => None, 545 }; 546 547 if index >= length { 548 break; 549 } 550 index += 1; 551 start = match other_values.remove(0) { 552 Amf0ValueType::Number(val) => Some(val), 553 _ => None, 554 }; 555 556 if index >= length { 557 break; 558 } 559 index += 1; 560 duration = match other_values.remove(0) { 561 Amf0ValueType::Number(val) => Some(val), 562 _ => None, 563 }; 564 565 if index >= length { 566 break; 567 } 568 //index = index + 1; 569 reset = match other_values.remove(0) { 570 Amf0ValueType::Boolean(val) => Some(val), 571 _ => None, 572 }; 573 break; 574 } 575 576 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 577 event_messages.write_stream_begin(*stream_id).await?; 578 log::info!( 579 "[ S->C ] [stream begin] app_name: {}, stream_name: {}", 580 self.app_name, 581 self.stream_name 582 ); 583 log::trace!( 584 "{} {} {}", 585 start.is_some(), 586 duration.is_some(), 587 reset.is_some() 588 ); 589 590 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 591 netstream 592 .write_on_status(transaction_id, "status", "NetStream.Play.Reset", "reset") 593 .await?; 594 595 netstream 596 .write_on_status( 597 transaction_id, 598 "status", 599 "NetStream.Play.Start", 600 "play start", 601 ) 602 .await?; 603 604 netstream 605 .write_on_status( 606 transaction_id, 607 "status", 608 "NetStream.Data.Start", 609 "data start.", 610 ) 611 .await?; 612 613 netstream 614 .write_on_status( 615 transaction_id, 616 "status", 617 "NetStream.Play.PublishNotify", 618 "play publish notify.", 619 ) 620 .await?; 621 622 event_messages.write_stream_is_record(*stream_id).await?; 623 624 let raw_stream_name = stream_name.unwrap(); 625 626 (self.stream_name, self.url_parameters) = RtmpUrlParser::default() 627 .set_raw_stream_name(raw_stream_name.clone()) 628 .parse_raw_stream_name(); 629 630 log::info!( 631 "[ S->C ] [stream is record] app_name: {}, stream_name: {}, url parameters: {}", 632 self.app_name, 633 self.stream_name, 634 self.url_parameters 635 ); 636 637 /*Now it can update the request url*/ 638 self.common.request_url = self.get_request_url(raw_stream_name); 639 self.common 640 .subscribe_from_channels( 641 self.app_name.clone(), 642 self.stream_name.clone(), 643 self.session_id, 644 ) 645 .await?; 646 647 self.state = ServerSessionState::Play; 648 649 Ok(()) 650 } 651 652 pub async fn on_publish( 653 &mut self, 654 transaction_id: &f64, 655 stream_id: &u32, 656 other_values: &mut Vec<Amf0ValueType>, 657 ) -> Result<(), SessionError> { 658 let length = other_values.len(); 659 660 if length < 2 { 661 return Err(SessionError { 662 value: SessionErrorValue::Amf0ValueCountNotCorrect, 663 }); 664 } 665 666 let raw_stream_name = match other_values.remove(0) { 667 Amf0ValueType::UTF8String(val) => val, 668 _ => { 669 return Err(SessionError { 670 value: SessionErrorValue::Amf0ValueCountNotCorrect, 671 }); 672 } 673 }; 674 675 (self.stream_name, self.url_parameters) = RtmpUrlParser::default() 676 .set_raw_stream_name(raw_stream_name.clone()) 677 .parse_raw_stream_name(); 678 679 /*Now it can update the request url*/ 680 self.common.request_url = self.get_request_url(raw_stream_name); 681 682 let _ = match other_values.remove(0) { 683 Amf0ValueType::UTF8String(val) => val, 684 _ => { 685 return Err(SessionError { 686 value: SessionErrorValue::Amf0ValueCountNotCorrect, 687 }); 688 } 689 }; 690 691 log::info!( 692 "[ S<-C ] [publish] app_name: {}, stream_name: {}, url parameters: {}", 693 self.app_name, 694 self.stream_name, 695 self.url_parameters 696 ); 697 698 log::info!( 699 "[ S->C ] [stream begin] app_name: {}, stream_name: {}, url parameters: {}", 700 self.app_name, 701 self.stream_name, 702 self.url_parameters 703 ); 704 705 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 706 event_messages.write_stream_begin(*stream_id).await?; 707 708 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 709 netstream 710 .write_on_status(transaction_id, "status", "NetStream.Publish.Start", "") 711 .await?; 712 log::info!( 713 "[ S->C ] [NetStream.Publish.Start] app_name: {}, stream_name: {}", 714 self.app_name, 715 self.stream_name 716 ); 717 718 self.common 719 .publish_to_channels( 720 self.app_name.clone(), 721 self.stream_name.clone(), 722 self.session_id, 723 self.gop_num, 724 ) 725 .await?; 726 727 Ok(()) 728 } 729 } 730