1 use crate::chunk::packetizer::ChunkPacketizer; 2 3 use { 4 super::{ 5 common::Common, 6 define, 7 define::SessionType, 8 errors::{SessionError, SessionErrorValue}, 9 }, 10 crate::{ 11 amf0::Amf0ValueType, 12 chunk::{ 13 define::CHUNK_SIZE, 14 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 15 }, 16 config, handshake, 17 handshake::{define::ServerHandshakeState, handshake_server::HandshakeServer}, 18 messages::{define::RtmpMessageData, parser::MessageParser}, 19 netconnection::writer::{ConnectProperties, NetConnection}, 20 netstream::writer::NetStreamWriter, 21 protocol_control_messages::writer::ProtocolControlMessagesWriter, 22 user_control_messages::writer::EventMessagesWriter, 23 utils::RtmpUrlParser, 24 }, 25 bytes::BytesMut, 26 bytesio::{ 27 bytes_writer::AsyncBytesWriter, 28 bytesio::{TNetIO, TcpIO}, 29 }, 30 indexmap::IndexMap, 31 std::{sync::Arc, time::Duration}, 32 streamhub::{ 33 define::StreamHubEventSender, 34 utils::{RandomDigitCount, Uuid}, 35 }, 36 tokio::{net::TcpStream, sync::Mutex}, 37 }; 38 39 enum ServerSessionState { 40 Handshake, 41 ReadChunk, 42 // OnConnect, 43 // OnCreateStream, 44 //Publish, 45 DeleteStream, 46 Play, 47 } 48 49 pub struct ServerSession { 50 pub app_name: String, 51 pub stream_name: String, 52 pub url_parameters: String, 53 io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, 54 handshaker: HandshakeServer, 55 unpacketizer: ChunkUnpacketizer, 56 state: ServerSessionState, 57 bytesio_data: BytesMut, 58 has_remaing_data: bool, 59 /* Used to mark the subscriber's the data producer 60 in channels and delete it from map when unsubscribe 61 is called. */ 62 pub session_id: Uuid, 63 connect_properties: ConnectProperties, 64 pub common: Common, 65 /*configure how many gops will be cached.*/ 66 gop_num: usize, 67 } 68 69 impl ServerSession { 70 pub fn new(stream: TcpStream, event_producer: StreamHubEventSender, gop_num: usize) -> Self { 71 let remote_addr = if let Ok(addr) = stream.peer_addr() { 72 log::info!("server session: {}", addr.to_string()); 73 Some(addr) 74 } else { 75 None 76 }; 77 78 let tcp_io: Box<dyn TNetIO + Send + Sync> = Box::new(TcpIO::new(stream)); 79 let net_io = Arc::new(Mutex::new(tcp_io)); 80 81 Self { 82 app_name: String::from(""), 83 stream_name: String::from(""), 84 url_parameters: String::from(""), 85 io: Arc::clone(&net_io), 86 handshaker: HandshakeServer::new(Arc::clone(&net_io)), 87 unpacketizer: ChunkUnpacketizer::new(), 88 state: ServerSessionState::Handshake, 89 common: Common::new( 90 Some(ChunkPacketizer::new(Arc::clone(&net_io))), 91 event_producer, 92 SessionType::Server, 93 remote_addr, 94 ), 95 session_id: Uuid::new(RandomDigitCount::Four), 96 bytesio_data: BytesMut::new(), 97 has_remaing_data: false, 98 connect_properties: ConnectProperties::default(), 99 gop_num, 100 } 101 } 102 103 pub async fn run(&mut self) -> Result<(), SessionError> { 104 loop { 105 match self.state { 106 ServerSessionState::Handshake => { 107 self.handshake().await?; 108 } 109 ServerSessionState::ReadChunk => { 110 self.read_parse_chunks().await?; 111 } 112 ServerSessionState::Play => { 113 self.play().await?; 114 } 115 ServerSessionState::DeleteStream => { 116 return Ok(()); 117 } 118 } 119 } 120 121 //Ok(()) 122 } 123 124 async fn handshake(&mut self) -> Result<(), SessionError> { 125 let mut bytes_len = 0; 126 127 while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE { 128 self.bytesio_data = self.io.lock().await.read().await?; 129 bytes_len += self.bytesio_data.len(); 130 self.handshaker.extend_data(&self.bytesio_data[..]); 131 } 132 133 self.handshaker.handshake().await?; 134 135 if let ServerHandshakeState::Finish = self.handshaker.state() { 136 self.state = ServerSessionState::ReadChunk; 137 let left_bytes = self.handshaker.get_remaining_bytes(); 138 if !left_bytes.is_empty() { 139 self.unpacketizer.extend_data(&left_bytes[..]); 140 self.has_remaing_data = true; 141 } 142 log::info!("[ S->C ] [send_set_chunk_size] "); 143 self.send_set_chunk_size().await?; 144 return Ok(()); 145 } 146 147 Ok(()) 148 } 149 150 async fn read_parse_chunks(&mut self) -> Result<(), SessionError> { 151 if !self.has_remaing_data { 152 match self 153 .io 154 .lock() 155 .await 156 .read_timeout(Duration::from_secs(2)) 157 .await 158 { 159 Ok(data) => { 160 self.bytesio_data = data; 161 } 162 Err(err) => { 163 self.common 164 .unpublish_to_channels( 165 self.app_name.clone(), 166 self.stream_name.clone(), 167 self.session_id, 168 ) 169 .await?; 170 171 return Err(SessionError { 172 value: SessionErrorValue::BytesIOError(err), 173 }); 174 } 175 } 176 177 self.unpacketizer.extend_data(&self.bytesio_data[..]); 178 } 179 180 self.has_remaing_data = false; 181 182 loop { 183 let result = self.unpacketizer.read_chunks(); 184 185 if let Ok(rv) = result { 186 if let UnpackResult::Chunks(chunks) = rv { 187 for chunk_info in chunks { 188 let timestamp = chunk_info.message_header.timestamp; 189 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 190 191 let mut msg = MessageParser::new(chunk_info).parse()?; 192 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 193 .await?; 194 } 195 } 196 } else { 197 break; 198 } 199 } 200 Ok(()) 201 } 202 203 async fn play(&mut self) -> Result<(), SessionError> { 204 match self.common.send_channel_data().await { 205 Ok(_) => {} 206 Err(err) => { 207 self.common 208 .unsubscribe_from_channels( 209 self.app_name.clone(), 210 self.stream_name.clone(), 211 self.session_id, 212 ) 213 .await?; 214 return Err(err); 215 } 216 } 217 218 Ok(()) 219 } 220 221 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 222 let mut controlmessage = 223 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 224 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 225 226 Ok(()) 227 } 228 229 pub async fn process_messages( 230 &mut self, 231 rtmp_msg: &mut RtmpMessageData, 232 msg_stream_id: &u32, 233 timestamp: &u32, 234 ) -> Result<(), SessionError> { 235 match rtmp_msg { 236 RtmpMessageData::Amf0Command { 237 command_name, 238 transaction_id, 239 command_object, 240 others, 241 } => { 242 self.on_amf0_command_message( 243 msg_stream_id, 244 command_name, 245 transaction_id, 246 command_object, 247 others, 248 ) 249 .await? 250 } 251 RtmpMessageData::SetChunkSize { chunk_size } => { 252 self.on_set_chunk_size(*chunk_size as usize)?; 253 } 254 RtmpMessageData::AudioData { data } => { 255 self.common.on_audio_data(data, timestamp).await?; 256 } 257 RtmpMessageData::VideoData { data } => { 258 self.common.on_video_data(data, timestamp).await?; 259 } 260 RtmpMessageData::AmfData { raw_data } => { 261 self.common.on_meta_data(raw_data, timestamp).await?; 262 } 263 264 _ => {} 265 } 266 Ok(()) 267 } 268 269 pub async fn on_amf0_command_message( 270 &mut self, 271 stream_id: &u32, 272 command_name: &Amf0ValueType, 273 transaction_id: &Amf0ValueType, 274 command_object: &Amf0ValueType, 275 others: &mut Vec<Amf0ValueType>, 276 ) -> Result<(), SessionError> { 277 let empty_cmd_name = &String::new(); 278 let cmd_name = match command_name { 279 Amf0ValueType::UTF8String(str) => str, 280 _ => empty_cmd_name, 281 }; 282 283 let transaction_id = match transaction_id { 284 Amf0ValueType::Number(number) => number, 285 _ => &0.0, 286 }; 287 288 let empty_cmd_obj: IndexMap<String, Amf0ValueType> = IndexMap::new(); 289 let obj = match command_object { 290 Amf0ValueType::Object(obj) => obj, 291 _ => &empty_cmd_obj, 292 }; 293 294 match cmd_name.as_str() { 295 "connect" => { 296 log::info!("[ S<-C ] [connect] "); 297 self.on_connect(transaction_id, obj).await?; 298 } 299 "createStream" => { 300 log::info!("[ S<-C ] [create stream] "); 301 self.on_create_stream(transaction_id).await?; 302 } 303 "deleteStream" => { 304 if !others.is_empty() { 305 let stream_id = match others.pop() { 306 Some(Amf0ValueType::Number(streamid)) => streamid, 307 _ => 0.0, 308 }; 309 310 log::info!( 311 "[ S<-C ] [delete stream] app_name: {}, stream_name: {}", 312 self.app_name, 313 self.stream_name 314 ); 315 316 self.on_delete_stream(transaction_id, &stream_id).await?; 317 self.state = ServerSessionState::DeleteStream; 318 } 319 } 320 "play" => { 321 log::info!( 322 "[ S<-C ] [play] app_name: {}, stream_name: {}", 323 self.app_name, 324 self.stream_name 325 ); 326 self.unpacketizer.session_type = config::SERVER_PULL; 327 self.on_play(transaction_id, stream_id, others).await?; 328 } 329 "publish" => { 330 self.unpacketizer.session_type = config::SERVER_PUSH; 331 self.on_publish(transaction_id, stream_id, others).await?; 332 } 333 _ => {} 334 } 335 336 Ok(()) 337 } 338 339 fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> { 340 log::info!( 341 "[ S<-C ] [set chunk size] app_name: {}, stream_name: {}, chunk size: {}", 342 self.app_name, 343 self.stream_name, 344 chunk_size 345 ); 346 self.unpacketizer.update_max_chunk_size(chunk_size); 347 Ok(()) 348 } 349 350 fn parse_connect_properties(&mut self, command_obj: &IndexMap<String, Amf0ValueType>) { 351 for (property, value) in command_obj { 352 match property.as_str() { 353 "app" => { 354 if let Amf0ValueType::UTF8String(app) = value { 355 self.connect_properties.app = Some(app.clone()); 356 } 357 } 358 "flashVer" => { 359 if let Amf0ValueType::UTF8String(flash_ver) = value { 360 self.connect_properties.flash_ver = Some(flash_ver.clone()); 361 } 362 } 363 "swfUrl" => { 364 if let Amf0ValueType::UTF8String(swf_url) = value { 365 self.connect_properties.swf_url = Some(swf_url.clone()); 366 } 367 } 368 "tcUrl" => { 369 if let Amf0ValueType::UTF8String(tc_url) = value { 370 self.connect_properties.tc_url = Some(tc_url.clone()); 371 } 372 } 373 "fpad" => { 374 if let Amf0ValueType::Boolean(fpad) = value { 375 self.connect_properties.fpad = Some(*fpad); 376 } 377 } 378 "audioCodecs" => { 379 if let Amf0ValueType::Number(audio_codecs) = value { 380 self.connect_properties.audio_codecs = Some(*audio_codecs); 381 } 382 } 383 "videoCodecs" => { 384 if let Amf0ValueType::Number(video_codecs) = value { 385 self.connect_properties.video_codecs = Some(*video_codecs); 386 } 387 } 388 "videoFunction" => { 389 if let Amf0ValueType::Number(video_function) = value { 390 self.connect_properties.video_function = Some(*video_function); 391 } 392 } 393 "pageUrl" => { 394 if let Amf0ValueType::UTF8String(page_url) = value { 395 self.connect_properties.page_url = Some(page_url.clone()); 396 } 397 } 398 "objectEncoding" => { 399 if let Amf0ValueType::Number(object_encoding) = value { 400 self.connect_properties.object_encoding = Some(*object_encoding); 401 } 402 } 403 _ => { 404 log::warn!("unknown connect properties: {}:{:?}", property, value); 405 } 406 } 407 } 408 } 409 410 async fn on_connect( 411 &mut self, 412 transaction_id: &f64, 413 command_obj: &IndexMap<String, Amf0ValueType>, 414 ) -> Result<(), SessionError> { 415 self.parse_connect_properties(command_obj); 416 log::info!("connect properties: {:?}", self.connect_properties); 417 let mut control_message = 418 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 419 log::info!("[ S->C ] [set window_acknowledgement_size]"); 420 control_message 421 .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 422 .await?; 423 424 log::info!("[ S->C ] [set set_peer_bandwidth]",); 425 control_message 426 .write_set_peer_bandwidth( 427 define::PEER_BANDWIDTH, 428 define::peer_bandwidth_limit_type::DYNAMIC, 429 ) 430 .await?; 431 432 let obj_encoding = command_obj.get("objectEncoding"); 433 let encoding = match obj_encoding { 434 Some(Amf0ValueType::Number(encoding)) => encoding, 435 _ => &define::OBJENCODING_AMF0, 436 }; 437 438 let app_name = command_obj.get("app"); 439 self.app_name = match app_name { 440 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 441 _ => { 442 return Err(SessionError { 443 value: SessionErrorValue::NoAppName, 444 }); 445 } 446 }; 447 448 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 449 log::info!("[ S->C ] [set connect_response]",); 450 netconnection 451 .write_connect_response( 452 transaction_id, 453 define::FMSVER, 454 &define::CAPABILITIES, 455 &String::from("NetConnection.Connect.Success"), 456 define::LEVEL, 457 &String::from("Connection Succeeded."), 458 encoding, 459 ) 460 .await?; 461 462 Ok(()) 463 } 464 465 pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 466 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 467 netconnection 468 .write_create_stream_response(transaction_id, &define::STREAM_ID) 469 .await?; 470 471 log::info!( 472 "[ S->C ] [create_stream_response] app_name: {}", 473 self.app_name, 474 ); 475 476 Ok(()) 477 } 478 479 pub async fn on_delete_stream( 480 &mut self, 481 transaction_id: &f64, 482 stream_id: &f64, 483 ) -> Result<(), SessionError> { 484 self.common 485 .unpublish_to_channels( 486 self.app_name.clone(), 487 self.stream_name.clone(), 488 self.session_id, 489 ) 490 .await?; 491 492 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 493 netstream 494 .write_on_status( 495 transaction_id, 496 "status", 497 "NetStream.DeleteStream.Suceess", 498 "", 499 ) 500 .await?; 501 502 //self.unsubscribe_from_channels().await?; 503 log::info!( 504 "[ S->C ] [delete stream success] app_name: {}, stream_name: {}", 505 self.app_name, 506 self.stream_name 507 ); 508 log::trace!("{}", stream_id); 509 510 Ok(()) 511 } 512 513 fn get_request_url(&mut self, raw_stream_name: String) -> String { 514 if let Some(tc_url) = &self.connect_properties.tc_url { 515 format!("{tc_url}/{raw_stream_name}") 516 } else { 517 format!("{}/{}", self.app_name.clone(), raw_stream_name) 518 } 519 } 520 521 #[allow(clippy::never_loop)] 522 pub async fn on_play( 523 &mut self, 524 transaction_id: &f64, 525 stream_id: &u32, 526 other_values: &mut Vec<Amf0ValueType>, 527 ) -> Result<(), SessionError> { 528 let length = other_values.len() as u8; 529 let mut index: u8 = 0; 530 531 let mut stream_name: Option<String> = None; 532 let mut start: Option<f64> = None; 533 let mut duration: Option<f64> = None; 534 let mut reset: Option<bool> = None; 535 536 loop { 537 if index >= length { 538 break; 539 } 540 index += 1; 541 stream_name = match other_values.remove(0) { 542 Amf0ValueType::UTF8String(val) => Some(val), 543 _ => None, 544 }; 545 546 if index >= length { 547 break; 548 } 549 index += 1; 550 start = match other_values.remove(0) { 551 Amf0ValueType::Number(val) => Some(val), 552 _ => None, 553 }; 554 555 if index >= length { 556 break; 557 } 558 index += 1; 559 duration = match other_values.remove(0) { 560 Amf0ValueType::Number(val) => Some(val), 561 _ => None, 562 }; 563 564 if index >= length { 565 break; 566 } 567 //index = index + 1; 568 reset = match other_values.remove(0) { 569 Amf0ValueType::Boolean(val) => Some(val), 570 _ => None, 571 }; 572 break; 573 } 574 575 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 576 event_messages.write_stream_begin(*stream_id).await?; 577 log::info!( 578 "[ S->C ] [stream begin] app_name: {}, stream_name: {}", 579 self.app_name, 580 self.stream_name 581 ); 582 log::trace!( 583 "{} {} {}", 584 start.is_some(), 585 duration.is_some(), 586 reset.is_some() 587 ); 588 589 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 590 netstream 591 .write_on_status(transaction_id, "status", "NetStream.Play.Reset", "reset") 592 .await?; 593 594 netstream 595 .write_on_status( 596 transaction_id, 597 "status", 598 "NetStream.Play.Start", 599 "play start", 600 ) 601 .await?; 602 603 netstream 604 .write_on_status( 605 transaction_id, 606 "status", 607 "NetStream.Data.Start", 608 "data start.", 609 ) 610 .await?; 611 612 netstream 613 .write_on_status( 614 transaction_id, 615 "status", 616 "NetStream.Play.PublishNotify", 617 "play publish notify.", 618 ) 619 .await?; 620 621 event_messages.write_stream_is_record(*stream_id).await?; 622 623 let raw_stream_name = stream_name.unwrap(); 624 625 (self.stream_name, self.url_parameters) = RtmpUrlParser::default() 626 .set_raw_stream_name(raw_stream_name.clone()) 627 .parse_raw_stream_name(); 628 629 log::info!( 630 "[ S->C ] [stream is record] app_name: {}, stream_name: {}, url parameters: {}", 631 self.app_name, 632 self.stream_name, 633 self.url_parameters 634 ); 635 636 /*Now it can update the request url*/ 637 self.common.request_url = self.get_request_url(raw_stream_name); 638 self.common 639 .subscribe_from_channels( 640 self.app_name.clone(), 641 self.stream_name.clone(), 642 self.session_id, 643 ) 644 .await?; 645 646 self.state = ServerSessionState::Play; 647 648 Ok(()) 649 } 650 651 pub async fn on_publish( 652 &mut self, 653 transaction_id: &f64, 654 stream_id: &u32, 655 other_values: &mut Vec<Amf0ValueType>, 656 ) -> Result<(), SessionError> { 657 let length = other_values.len(); 658 659 if length < 2 { 660 return Err(SessionError { 661 value: SessionErrorValue::Amf0ValueCountNotCorrect, 662 }); 663 } 664 665 let raw_stream_name = match other_values.remove(0) { 666 Amf0ValueType::UTF8String(val) => val, 667 _ => { 668 return Err(SessionError { 669 value: SessionErrorValue::Amf0ValueCountNotCorrect, 670 }); 671 } 672 }; 673 674 (self.stream_name, self.url_parameters) = RtmpUrlParser::default() 675 .set_raw_stream_name(raw_stream_name.clone()) 676 .parse_raw_stream_name(); 677 678 /*Now it can update the request url*/ 679 self.common.request_url = self.get_request_url(raw_stream_name); 680 681 let _ = match other_values.remove(0) { 682 Amf0ValueType::UTF8String(val) => val, 683 _ => { 684 return Err(SessionError { 685 value: SessionErrorValue::Amf0ValueCountNotCorrect, 686 }); 687 } 688 }; 689 690 log::info!( 691 "[ S<-C ] [publish] app_name: {}, stream_name: {}, url parameters: {}", 692 self.app_name, 693 self.stream_name, 694 self.url_parameters 695 ); 696 697 log::info!( 698 "[ S->C ] [stream begin] app_name: {}, stream_name: {}, url parameters: {}", 699 self.app_name, 700 self.stream_name, 701 self.url_parameters 702 ); 703 704 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 705 event_messages.write_stream_begin(*stream_id).await?; 706 707 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 708 netstream 709 .write_on_status(transaction_id, "status", "NetStream.Publish.Start", "") 710 .await?; 711 log::info!( 712 "[ S->C ] [NetStream.Publish.Start] app_name: {}, stream_name: {}", 713 self.app_name, 714 self.stream_name 715 ); 716 717 self.common 718 .publish_to_channels( 719 self.app_name.clone(), 720 self.stream_name.clone(), 721 self.session_id, 722 self.gop_num, 723 ) 724 .await?; 725 726 Ok(()) 727 } 728 } 729