1 use { 2 super::{ 3 common::Common, 4 define, 5 define::SessionType, 6 errors::{SessionError, SessionErrorValue}, 7 }, 8 crate::{ 9 amf0::Amf0ValueType, 10 channels::define::ChannelEventProducer, 11 chunk::{ 12 define::CHUNK_SIZE, 13 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 14 }, 15 config, handshake, 16 handshake::{define::ServerHandshakeState, handshake_server::HandshakeServer}, 17 messages::{define::RtmpMessageData, parser::MessageParser}, 18 netconnection::writer::{ConnectProperties, NetConnection}, 19 netstream::writer::NetStreamWriter, 20 protocol_control_messages::writer::ProtocolControlMessagesWriter, 21 user_control_messages::writer::EventMessagesWriter, 22 utils::RtmpUrlParser, 23 }, 24 bytes::BytesMut, 25 bytesio::{bytes_writer::AsyncBytesWriter, bytesio::BytesIO}, 26 indexmap::IndexMap, 27 std::{sync::Arc, time::Duration}, 28 tokio::{net::TcpStream, sync::Mutex}, 29 uuid::Uuid, 30 }; 31 32 enum ServerSessionState { 33 Handshake, 34 ReadChunk, 35 // OnConnect, 36 // OnCreateStream, 37 //Publish, 38 DeleteStream, 39 Play, 40 } 41 42 pub struct ServerSession { 43 pub app_name: String, 44 pub stream_name: String, 45 pub url_parameters: String, 46 io: Arc<Mutex<BytesIO>>, 47 handshaker: HandshakeServer, 48 unpacketizer: ChunkUnpacketizer, 49 state: ServerSessionState, 50 pub common: Common, 51 bytesio_data: BytesMut, 52 has_remaing_data: bool, 53 /* Used to mark the subscriber's the data producer 54 in channels and delete it from map when unsubscribe 55 is called. */ 56 pub session_id: Uuid, 57 connect_properties: ConnectProperties, 58 } 59 60 impl ServerSession { 61 pub fn new(stream: TcpStream, event_producer: ChannelEventProducer) -> Self { 62 let remote_addr = if let Ok(addr) = stream.peer_addr() { 63 log::info!("server session: {}", addr.to_string()); 64 Some(addr) 65 } else { 66 None 67 }; 68 69 let net_io = Arc::new(Mutex::new(BytesIO::new(stream))); 70 let subscriber_id = Uuid::new_v4(); 71 Self { 72 app_name: String::from(""), 73 stream_name: String::from(""), 74 url_parameters: String::from(""), 75 io: Arc::clone(&net_io), 76 handshaker: HandshakeServer::new(Arc::clone(&net_io)), 77 unpacketizer: ChunkUnpacketizer::new(), 78 state: ServerSessionState::Handshake, 79 common: Common::new( 80 Arc::clone(&net_io), 81 event_producer, 82 SessionType::Server, 83 remote_addr, 84 ), 85 session_id: subscriber_id, 86 bytesio_data: BytesMut::new(), 87 has_remaing_data: false, 88 connect_properties: ConnectProperties::default(), 89 } 90 } 91 92 pub async fn run(&mut self) -> Result<(), SessionError> { 93 loop { 94 match self.state { 95 ServerSessionState::Handshake => { 96 self.handshake().await?; 97 } 98 ServerSessionState::ReadChunk => { 99 self.read_parse_chunks().await?; 100 } 101 ServerSessionState::Play => { 102 self.play().await?; 103 } 104 ServerSessionState::DeleteStream => { 105 return Ok(()); 106 } 107 } 108 } 109 110 //Ok(()) 111 } 112 113 async fn handshake(&mut self) -> Result<(), SessionError> { 114 let mut bytes_len = 0; 115 116 while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE { 117 self.bytesio_data = self.io.lock().await.read().await?; 118 bytes_len += self.bytesio_data.len(); 119 self.handshaker.extend_data(&self.bytesio_data[..]); 120 } 121 122 self.handshaker.handshake().await?; 123 124 if let ServerHandshakeState::Finish = self.handshaker.state() { 125 self.state = ServerSessionState::ReadChunk; 126 let left_bytes = self.handshaker.get_remaining_bytes(); 127 if !left_bytes.is_empty() { 128 self.unpacketizer.extend_data(&left_bytes[..]); 129 self.has_remaing_data = true; 130 } 131 log::info!("[ S->C ] [send_set_chunk_size] "); 132 self.send_set_chunk_size().await?; 133 return Ok(()); 134 } 135 136 Ok(()) 137 } 138 139 async fn read_parse_chunks(&mut self) -> Result<(), SessionError> { 140 if !self.has_remaing_data { 141 match self 142 .io 143 .lock() 144 .await 145 .read_timeout(Duration::from_secs(2)) 146 .await 147 { 148 Ok(data) => { 149 self.bytesio_data = data; 150 } 151 Err(err) => { 152 self.common 153 .unpublish_to_channels( 154 self.app_name.clone(), 155 self.stream_name.clone(), 156 self.session_id, 157 ) 158 .await?; 159 160 return Err(SessionError { 161 value: SessionErrorValue::BytesIOError(err), 162 }); 163 } 164 } 165 166 self.unpacketizer.extend_data(&self.bytesio_data[..]); 167 } 168 169 self.has_remaing_data = false; 170 171 loop { 172 let result = self.unpacketizer.read_chunks(); 173 174 if let Ok(rv) = result { 175 if let UnpackResult::Chunks(chunks) = rv { 176 for chunk_info in chunks { 177 let timestamp = chunk_info.message_header.timestamp; 178 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 179 180 let mut msg = MessageParser::new(chunk_info).parse()?; 181 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 182 .await?; 183 } 184 } 185 } else { 186 break; 187 } 188 } 189 Ok(()) 190 } 191 192 async fn play(&mut self) -> Result<(), SessionError> { 193 match self.common.send_channel_data().await { 194 Ok(_) => {} 195 Err(err) => { 196 self.common 197 .unsubscribe_from_channels( 198 self.app_name.clone(), 199 self.stream_name.clone(), 200 self.session_id, 201 ) 202 .await?; 203 return Err(err); 204 } 205 } 206 207 Ok(()) 208 } 209 210 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 211 let mut controlmessage = 212 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 213 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 214 215 Ok(()) 216 } 217 218 pub async fn process_messages( 219 &mut self, 220 rtmp_msg: &mut RtmpMessageData, 221 msg_stream_id: &u32, 222 timestamp: &u32, 223 ) -> Result<(), SessionError> { 224 match rtmp_msg { 225 RtmpMessageData::Amf0Command { 226 command_name, 227 transaction_id, 228 command_object, 229 others, 230 } => { 231 self.on_amf0_command_message( 232 msg_stream_id, 233 command_name, 234 transaction_id, 235 command_object, 236 others, 237 ) 238 .await? 239 } 240 RtmpMessageData::SetChunkSize { chunk_size } => { 241 self.on_set_chunk_size(*chunk_size as usize)?; 242 } 243 RtmpMessageData::AudioData { data } => { 244 self.common.on_audio_data(data, timestamp)?; 245 } 246 RtmpMessageData::VideoData { data } => { 247 self.common.on_video_data(data, timestamp)?; 248 } 249 RtmpMessageData::AmfData { raw_data } => { 250 self.common.on_meta_data(raw_data, timestamp)?; 251 } 252 253 _ => {} 254 } 255 Ok(()) 256 } 257 258 pub async fn on_amf0_command_message( 259 &mut self, 260 stream_id: &u32, 261 command_name: &Amf0ValueType, 262 transaction_id: &Amf0ValueType, 263 command_object: &Amf0ValueType, 264 others: &mut Vec<Amf0ValueType>, 265 ) -> Result<(), SessionError> { 266 let empty_cmd_name = &String::new(); 267 let cmd_name = match command_name { 268 Amf0ValueType::UTF8String(str) => str, 269 _ => empty_cmd_name, 270 }; 271 272 let transaction_id = match transaction_id { 273 Amf0ValueType::Number(number) => number, 274 _ => &0.0, 275 }; 276 277 let empty_cmd_obj: IndexMap<String, Amf0ValueType> = IndexMap::new(); 278 let obj = match command_object { 279 Amf0ValueType::Object(obj) => obj, 280 _ => &empty_cmd_obj, 281 }; 282 283 match cmd_name.as_str() { 284 "connect" => { 285 log::info!("[ S<-C ] [connect] "); 286 self.on_connect(transaction_id, obj).await?; 287 } 288 "createStream" => { 289 log::info!("[ S<-C ] [create stream] "); 290 self.on_create_stream(transaction_id).await?; 291 } 292 "deleteStream" => { 293 if !others.is_empty() { 294 let stream_id = match others.pop() { 295 Some(Amf0ValueType::Number(streamid)) => streamid, 296 _ => 0.0, 297 }; 298 299 log::info!( 300 "[ S<-C ] [delete stream] app_name: {}, stream_name: {}", 301 self.app_name, 302 self.stream_name 303 ); 304 305 self.on_delete_stream(transaction_id, &stream_id).await?; 306 self.state = ServerSessionState::DeleteStream; 307 } 308 } 309 "play" => { 310 log::info!( 311 "[ S<-C ] [play] app_name: {}, stream_name: {}", 312 self.app_name, 313 self.stream_name 314 ); 315 self.unpacketizer.session_type = config::SERVER_PULL; 316 self.on_play(transaction_id, stream_id, others).await?; 317 } 318 "publish" => { 319 self.unpacketizer.session_type = config::SERVER_PUSH; 320 self.on_publish(transaction_id, stream_id, others).await?; 321 } 322 _ => {} 323 } 324 325 Ok(()) 326 } 327 328 fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> { 329 log::info!( 330 "[ S<-C ] [set chunk size] app_name: {}, stream_name: {}, chunk size: {}", 331 self.app_name, 332 self.stream_name, 333 chunk_size 334 ); 335 self.unpacketizer.update_max_chunk_size(chunk_size); 336 Ok(()) 337 } 338 339 fn parse_connect_properties(&mut self, command_obj: &IndexMap<String, Amf0ValueType>) { 340 for (property, value) in command_obj { 341 match property.as_str() { 342 "app" => { 343 if let Amf0ValueType::UTF8String(app) = value { 344 self.connect_properties.app = Some(app.clone()); 345 } 346 } 347 "flashVer" => { 348 if let Amf0ValueType::UTF8String(flash_ver) = value { 349 self.connect_properties.flash_ver = Some(flash_ver.clone()); 350 } 351 } 352 "swfUrl" => { 353 if let Amf0ValueType::UTF8String(swf_url) = value { 354 self.connect_properties.swf_url = Some(swf_url.clone()); 355 } 356 } 357 "tcUrl" => { 358 if let Amf0ValueType::UTF8String(tc_url) = value { 359 self.connect_properties.tc_url = Some(tc_url.clone()); 360 } 361 } 362 "fpad" => { 363 if let Amf0ValueType::Boolean(fpad) = value { 364 self.connect_properties.fpad = Some(*fpad); 365 } 366 } 367 "audioCodecs" => { 368 if let Amf0ValueType::Number(audio_codecs) = value { 369 self.connect_properties.audio_codecs = Some(*audio_codecs); 370 } 371 } 372 "videoCodecs" => { 373 if let Amf0ValueType::Number(video_codecs) = value { 374 self.connect_properties.video_codecs = Some(*video_codecs); 375 } 376 } 377 "videoFunction" => { 378 if let Amf0ValueType::Number(video_function) = value { 379 self.connect_properties.video_function = Some(*video_function); 380 } 381 } 382 "pageUrl" => { 383 if let Amf0ValueType::UTF8String(page_url) = value { 384 self.connect_properties.page_url = Some(page_url.clone()); 385 } 386 } 387 "objectEncoding" => { 388 if let Amf0ValueType::Number(object_encoding) = value { 389 self.connect_properties.object_encoding = Some(*object_encoding); 390 } 391 } 392 _ => { 393 log::warn!("unknown connect properties: {}:{:?}", property, value); 394 } 395 } 396 } 397 } 398 399 async fn on_connect( 400 &mut self, 401 transaction_id: &f64, 402 command_obj: &IndexMap<String, Amf0ValueType>, 403 ) -> Result<(), SessionError> { 404 self.parse_connect_properties(command_obj); 405 log::info!("connect properties: {:?}", self.connect_properties); 406 let mut control_message = 407 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 408 log::info!("[ S->C ] [set window_acknowledgement_size]"); 409 control_message 410 .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 411 .await?; 412 413 log::info!("[ S->C ] [set set_peer_bandwidth]",); 414 control_message 415 .write_set_peer_bandwidth( 416 define::PEER_BANDWIDTH, 417 define::peer_bandwidth_limit_type::DYNAMIC, 418 ) 419 .await?; 420 421 let obj_encoding = command_obj.get("objectEncoding"); 422 let encoding = match obj_encoding { 423 Some(Amf0ValueType::Number(encoding)) => encoding, 424 _ => &define::OBJENCODING_AMF0, 425 }; 426 427 let app_name = command_obj.get("app"); 428 self.app_name = match app_name { 429 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 430 _ => { 431 return Err(SessionError { 432 value: SessionErrorValue::NoAppName, 433 }); 434 } 435 }; 436 437 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 438 log::info!("[ S->C ] [set connect_response]",); 439 netconnection 440 .write_connect_response( 441 transaction_id, 442 define::FMSVER, 443 &define::CAPABILITIES, 444 &String::from("NetConnection.Connect.Success"), 445 define::LEVEL, 446 &String::from("Connection Succeeded."), 447 encoding, 448 ) 449 .await?; 450 451 Ok(()) 452 } 453 454 pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 455 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 456 netconnection 457 .write_create_stream_response(transaction_id, &define::STREAM_ID) 458 .await?; 459 460 log::info!( 461 "[ S->C ] [create_stream_response] app_name: {}", 462 self.app_name, 463 ); 464 465 Ok(()) 466 } 467 468 pub async fn on_delete_stream( 469 &mut self, 470 transaction_id: &f64, 471 stream_id: &f64, 472 ) -> Result<(), SessionError> { 473 self.common 474 .unpublish_to_channels( 475 self.app_name.clone(), 476 self.stream_name.clone(), 477 self.session_id, 478 ) 479 .await?; 480 481 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 482 netstream 483 .write_on_status( 484 transaction_id, 485 "status", 486 "NetStream.DeleteStream.Suceess", 487 "", 488 ) 489 .await?; 490 491 //self.unsubscribe_from_channels().await?; 492 log::info!( 493 "[ S->C ] [delete stream success] app_name: {}, stream_name: {}", 494 self.app_name, 495 self.stream_name 496 ); 497 log::trace!("{}", stream_id); 498 499 Ok(()) 500 } 501 502 fn get_request_url(&mut self, raw_stream_name: String) -> String { 503 if let Some(tc_url) = &self.connect_properties.tc_url { 504 format!("{tc_url}/{raw_stream_name}") 505 } else { 506 format!("{}/{}", self.app_name.clone(), raw_stream_name) 507 } 508 } 509 510 #[allow(clippy::never_loop)] 511 pub async fn on_play( 512 &mut self, 513 transaction_id: &f64, 514 stream_id: &u32, 515 other_values: &mut Vec<Amf0ValueType>, 516 ) -> Result<(), SessionError> { 517 let length = other_values.len() as u8; 518 let mut index: u8 = 0; 519 520 let mut stream_name: Option<String> = None; 521 let mut start: Option<f64> = None; 522 let mut duration: Option<f64> = None; 523 let mut reset: Option<bool> = None; 524 525 loop { 526 if index >= length { 527 break; 528 } 529 index += 1; 530 stream_name = match other_values.remove(0) { 531 Amf0ValueType::UTF8String(val) => Some(val), 532 _ => None, 533 }; 534 535 if index >= length { 536 break; 537 } 538 index += 1; 539 start = match other_values.remove(0) { 540 Amf0ValueType::Number(val) => Some(val), 541 _ => None, 542 }; 543 544 if index >= length { 545 break; 546 } 547 index += 1; 548 duration = match other_values.remove(0) { 549 Amf0ValueType::Number(val) => Some(val), 550 _ => None, 551 }; 552 553 if index >= length { 554 break; 555 } 556 //index = index + 1; 557 reset = match other_values.remove(0) { 558 Amf0ValueType::Boolean(val) => Some(val), 559 _ => None, 560 }; 561 break; 562 } 563 564 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 565 event_messages.write_stream_begin(*stream_id).await?; 566 log::info!( 567 "[ S->C ] [stream begin] app_name: {}, stream_name: {}", 568 self.app_name, 569 self.stream_name 570 ); 571 log::trace!( 572 "{} {} {}", 573 start.is_some(), 574 duration.is_some(), 575 reset.is_some() 576 ); 577 578 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 579 netstream 580 .write_on_status(transaction_id, "status", "NetStream.Play.Reset", "reset") 581 .await?; 582 583 netstream 584 .write_on_status( 585 transaction_id, 586 "status", 587 "NetStream.Play.Start", 588 "play start", 589 ) 590 .await?; 591 592 netstream 593 .write_on_status( 594 transaction_id, 595 "status", 596 "NetStream.Data.Start", 597 "data start.", 598 ) 599 .await?; 600 601 netstream 602 .write_on_status( 603 transaction_id, 604 "status", 605 "NetStream.Play.PublishNotify", 606 "play publish notify.", 607 ) 608 .await?; 609 610 event_messages.write_stream_is_record(*stream_id).await?; 611 612 let raw_stream_name = stream_name.unwrap(); 613 614 (self.stream_name, self.url_parameters) = RtmpUrlParser::default() 615 .set_raw_stream_name(raw_stream_name.clone()) 616 .parse_raw_stream_name(); 617 618 log::info!( 619 "[ S->C ] [stream is record] app_name: {}, stream_name: {}, url parameters: {}", 620 self.app_name, 621 self.stream_name, 622 self.url_parameters 623 ); 624 625 /*Now it can update the request url*/ 626 self.common.request_url = self.get_request_url(raw_stream_name); 627 self.common 628 .subscribe_from_channels( 629 self.app_name.clone(), 630 self.stream_name.clone(), 631 self.session_id, 632 ) 633 .await?; 634 635 self.state = ServerSessionState::Play; 636 637 Ok(()) 638 } 639 640 pub async fn on_publish( 641 &mut self, 642 transaction_id: &f64, 643 stream_id: &u32, 644 other_values: &mut Vec<Amf0ValueType>, 645 ) -> Result<(), SessionError> { 646 let length = other_values.len(); 647 648 if length < 2 { 649 return Err(SessionError { 650 value: SessionErrorValue::Amf0ValueCountNotCorrect, 651 }); 652 } 653 654 let raw_stream_name = match other_values.remove(0) { 655 Amf0ValueType::UTF8String(val) => val, 656 _ => { 657 return Err(SessionError { 658 value: SessionErrorValue::Amf0ValueCountNotCorrect, 659 }); 660 } 661 }; 662 663 (self.stream_name, self.url_parameters) = RtmpUrlParser::default() 664 .set_raw_stream_name(raw_stream_name.clone()) 665 .parse_raw_stream_name(); 666 667 /*Now it can update the request url*/ 668 self.common.request_url = self.get_request_url(raw_stream_name); 669 670 let _ = match other_values.remove(0) { 671 Amf0ValueType::UTF8String(val) => val, 672 _ => { 673 return Err(SessionError { 674 value: SessionErrorValue::Amf0ValueCountNotCorrect, 675 }); 676 } 677 }; 678 679 log::info!( 680 "[ S<-C ] [publish] app_name: {}, stream_name: {}, url parameters: {}", 681 self.app_name, 682 self.stream_name, 683 self.url_parameters 684 ); 685 686 log::info!( 687 "[ S->C ] [stream begin] app_name: {}, stream_name: {}, url parameters: {}", 688 self.app_name, 689 self.stream_name, 690 self.url_parameters 691 ); 692 693 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 694 event_messages.write_stream_begin(*stream_id).await?; 695 696 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 697 netstream 698 .write_on_status(transaction_id, "status", "NetStream.Publish.Start", "") 699 .await?; 700 log::info!( 701 "[ S->C ] [NetStream.Publish.Start] app_name: {}, stream_name: {}", 702 self.app_name, 703 self.stream_name 704 ); 705 706 self.common 707 .publish_to_channels( 708 self.app_name.clone(), 709 self.stream_name.clone(), 710 self.session_id, 711 ) 712 .await?; 713 714 Ok(()) 715 } 716 } 717