1 use { 2 super::{ 3 common::Common, 4 define, 5 define::SessionType, 6 errors::{SessionError, SessionErrorValue}, 7 }, 8 crate::{ 9 amf0::Amf0ValueType, 10 channels::define::ChannelEventProducer, 11 chunk::{ 12 define::CHUNK_SIZE, 13 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 14 }, 15 config, handshake, 16 handshake::{define::ServerHandshakeState, handshake_server::HandshakeServer}, 17 messages::{define::RtmpMessageData, parser::MessageParser}, 18 netconnection::writer::{ConnectProperties, NetConnection}, 19 netstream::writer::NetStreamWriter, 20 protocol_control_messages::writer::ProtocolControlMessagesWriter, 21 user_control_messages::writer::EventMessagesWriter, 22 }, 23 bytes::BytesMut, 24 bytesio::{bytes_writer::AsyncBytesWriter, bytesio::BytesIO}, 25 std::{collections::HashMap, sync::Arc, time::Duration}, 26 tokio::{net::TcpStream, sync::Mutex}, 27 uuid::Uuid, 28 }; 29 30 enum ServerSessionState { 31 Handshake, 32 ReadChunk, 33 // OnConnect, 34 // OnCreateStream, 35 //Publish, 36 DeleteStream, 37 Play, 38 } 39 40 pub struct ServerSession { 41 pub app_name: String, 42 pub stream_name: String, 43 pub url_parameters: String, 44 io: Arc<Mutex<BytesIO>>, 45 handshaker: HandshakeServer, 46 unpacketizer: ChunkUnpacketizer, 47 state: ServerSessionState, 48 pub common: Common, 49 bytesio_data: BytesMut, 50 has_remaing_data: bool, 51 /* Used to mark the subscriber's the data producer 52 in channels and delete it from map when unsubscribe 53 is called. */ 54 pub session_id: Uuid, 55 connect_properties: ConnectProperties, 56 } 57 58 impl ServerSession { 59 pub fn new(stream: TcpStream, event_producer: ChannelEventProducer) -> Self { 60 let remote_addr = if let Ok(addr) = stream.peer_addr() { 61 log::info!("server session: {}", addr.to_string()); 62 Some(addr) 63 } else { 64 None 65 }; 66 67 let net_io = Arc::new(Mutex::new(BytesIO::new(stream))); 68 let subscriber_id = Uuid::new_v4(); 69 Self { 70 app_name: String::from(""), 71 stream_name: String::from(""), 72 url_parameters: String::from(""), 73 io: Arc::clone(&net_io), 74 handshaker: HandshakeServer::new(Arc::clone(&net_io)), 75 unpacketizer: ChunkUnpacketizer::new(), 76 state: ServerSessionState::Handshake, 77 common: Common::new( 78 Arc::clone(&net_io), 79 event_producer, 80 SessionType::Server, 81 remote_addr, 82 ), 83 session_id: subscriber_id, 84 bytesio_data: BytesMut::new(), 85 has_remaing_data: false, 86 connect_properties: ConnectProperties::default(), 87 } 88 } 89 90 pub async fn run(&mut self) -> Result<(), SessionError> { 91 loop { 92 match self.state { 93 ServerSessionState::Handshake => { 94 self.handshake().await?; 95 } 96 ServerSessionState::ReadChunk => { 97 self.read_parse_chunks().await?; 98 } 99 ServerSessionState::Play => { 100 self.play().await?; 101 } 102 ServerSessionState::DeleteStream => { 103 return Ok(()); 104 } 105 } 106 } 107 108 //Ok(()) 109 } 110 111 async fn handshake(&mut self) -> Result<(), SessionError> { 112 let mut bytes_len = 0; 113 114 while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE { 115 self.bytesio_data = self.io.lock().await.read().await?; 116 bytes_len += self.bytesio_data.len(); 117 self.handshaker.extend_data(&self.bytesio_data[..]); 118 } 119 120 self.handshaker.handshake().await?; 121 122 if let ServerHandshakeState::Finish = self.handshaker.state() { 123 self.state = ServerSessionState::ReadChunk; 124 let left_bytes = self.handshaker.get_remaining_bytes(); 125 if !left_bytes.is_empty() { 126 self.unpacketizer.extend_data(&left_bytes[..]); 127 self.has_remaing_data = true; 128 } 129 log::info!("[ S->C ] [send_set_chunk_size] "); 130 self.send_set_chunk_size().await?; 131 return Ok(()); 132 } 133 134 Ok(()) 135 } 136 137 async fn read_parse_chunks(&mut self) -> Result<(), SessionError> { 138 if !self.has_remaing_data { 139 match self 140 .io 141 .lock() 142 .await 143 .read_timeout(Duration::from_secs(2)) 144 .await 145 { 146 Ok(data) => { 147 self.bytesio_data = data; 148 } 149 Err(err) => { 150 self.common 151 .unpublish_to_channels( 152 self.app_name.clone(), 153 self.stream_name.clone(), 154 self.session_id, 155 ) 156 .await?; 157 158 return Err(SessionError { 159 value: SessionErrorValue::BytesIOError(err), 160 }); 161 } 162 } 163 164 self.unpacketizer.extend_data(&self.bytesio_data[..]); 165 } 166 167 self.has_remaing_data = false; 168 169 loop { 170 let result = self.unpacketizer.read_chunks(); 171 172 if let Ok(rv) = result { 173 if let UnpackResult::Chunks(chunks) = rv { 174 for chunk_info in chunks { 175 let timestamp = chunk_info.message_header.timestamp; 176 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 177 178 let mut msg = MessageParser::new(chunk_info).parse()?; 179 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 180 .await?; 181 } 182 } 183 } else { 184 break; 185 } 186 } 187 Ok(()) 188 } 189 190 async fn play(&mut self) -> Result<(), SessionError> { 191 match self.common.send_channel_data().await { 192 Ok(_) => {} 193 Err(err) => { 194 self.common 195 .unsubscribe_from_channels( 196 self.app_name.clone(), 197 self.stream_name.clone(), 198 self.session_id, 199 ) 200 .await?; 201 return Err(err); 202 } 203 } 204 205 Ok(()) 206 } 207 208 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 209 let mut controlmessage = 210 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 211 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 212 213 Ok(()) 214 } 215 216 pub async fn process_messages( 217 &mut self, 218 rtmp_msg: &mut RtmpMessageData, 219 msg_stream_id: &u32, 220 timestamp: &u32, 221 ) -> Result<(), SessionError> { 222 match rtmp_msg { 223 RtmpMessageData::Amf0Command { 224 command_name, 225 transaction_id, 226 command_object, 227 others, 228 } => { 229 self.on_amf0_command_message( 230 msg_stream_id, 231 command_name, 232 transaction_id, 233 command_object, 234 others, 235 ) 236 .await? 237 } 238 RtmpMessageData::SetChunkSize { chunk_size } => { 239 self.on_set_chunk_size(*chunk_size as usize)?; 240 } 241 RtmpMessageData::AudioData { data } => { 242 self.common.on_audio_data(data, timestamp)?; 243 } 244 RtmpMessageData::VideoData { data } => { 245 self.common.on_video_data(data, timestamp)?; 246 } 247 RtmpMessageData::AmfData { raw_data } => { 248 self.common.on_meta_data(raw_data, timestamp)?; 249 } 250 251 _ => {} 252 } 253 Ok(()) 254 } 255 256 pub async fn on_amf0_command_message( 257 &mut self, 258 stream_id: &u32, 259 command_name: &Amf0ValueType, 260 transaction_id: &Amf0ValueType, 261 command_object: &Amf0ValueType, 262 others: &mut Vec<Amf0ValueType>, 263 ) -> Result<(), SessionError> { 264 let empty_cmd_name = &String::new(); 265 let cmd_name = match command_name { 266 Amf0ValueType::UTF8String(str) => str, 267 _ => empty_cmd_name, 268 }; 269 270 let transaction_id = match transaction_id { 271 Amf0ValueType::Number(number) => number, 272 _ => &0.0, 273 }; 274 275 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 276 let obj = match command_object { 277 Amf0ValueType::Object(obj) => obj, 278 _ => &empty_cmd_obj, 279 }; 280 281 match cmd_name.as_str() { 282 "connect" => { 283 log::info!("[ S<-C ] [connect] "); 284 self.on_connect(transaction_id, obj).await?; 285 } 286 "createStream" => { 287 log::info!("[ S<-C ] [create stream] "); 288 self.on_create_stream(transaction_id).await?; 289 } 290 "deleteStream" => { 291 if !others.is_empty() { 292 let stream_id = match others.pop() { 293 Some(Amf0ValueType::Number(streamid)) => streamid, 294 _ => 0.0, 295 }; 296 297 log::info!( 298 "[ S<-C ] [delete stream] app_name: {}, stream_name: {}", 299 self.app_name, 300 self.stream_name 301 ); 302 303 self.on_delete_stream(transaction_id, &stream_id).await?; 304 self.state = ServerSessionState::DeleteStream; 305 } 306 } 307 "play" => { 308 log::info!( 309 "[ S<-C ] [play] app_name: {}, stream_name: {}", 310 self.app_name, 311 self.stream_name 312 ); 313 self.unpacketizer.session_type = config::SERVER_PULL; 314 self.on_play(transaction_id, stream_id, others).await?; 315 } 316 "publish" => { 317 self.unpacketizer.session_type = config::SERVER_PUSH; 318 self.on_publish(transaction_id, stream_id, others).await?; 319 } 320 _ => {} 321 } 322 323 Ok(()) 324 } 325 326 fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> { 327 log::info!( 328 "[ S<-C ] [set chunk size] app_name: {}, stream_name: {}, chunk size: {}", 329 self.app_name, 330 self.stream_name, 331 chunk_size 332 ); 333 self.unpacketizer.update_max_chunk_size(chunk_size); 334 Ok(()) 335 } 336 337 fn parse_connect_properties(&mut self, command_obj: &HashMap<String, Amf0ValueType>) { 338 for (property, value) in command_obj { 339 match property.as_str() { 340 "app" => { 341 if let Amf0ValueType::UTF8String(app) = value { 342 self.connect_properties.app = Some(app.clone()); 343 } 344 } 345 "flashVer" => { 346 if let Amf0ValueType::UTF8String(flash_ver) = value { 347 self.connect_properties.flash_ver = Some(flash_ver.clone()); 348 } 349 } 350 "swfUrl" => { 351 if let Amf0ValueType::UTF8String(swf_url) = value { 352 self.connect_properties.swf_url = Some(swf_url.clone()); 353 } 354 } 355 "tcUrl" => { 356 if let Amf0ValueType::UTF8String(tc_url) = value { 357 self.connect_properties.tc_url = Some(tc_url.clone()); 358 } 359 } 360 "fpad" => { 361 if let Amf0ValueType::Boolean(fpad) = value { 362 self.connect_properties.fpad = Some(*fpad); 363 } 364 } 365 "audioCodecs" => { 366 if let Amf0ValueType::Number(audio_codecs) = value { 367 self.connect_properties.audio_codecs = Some(*audio_codecs); 368 } 369 } 370 "videoCodecs" => { 371 if let Amf0ValueType::Number(video_codecs) = value { 372 self.connect_properties.video_codecs = Some(*video_codecs); 373 } 374 } 375 "videoFunction" => { 376 if let Amf0ValueType::Number(video_function) = value { 377 self.connect_properties.video_function = Some(*video_function); 378 } 379 } 380 "pageUrl" => { 381 if let Amf0ValueType::UTF8String(page_url) = value { 382 self.connect_properties.page_url = Some(page_url.clone()); 383 } 384 } 385 "objectEncoding" => { 386 if let Amf0ValueType::Number(object_encoding) = value { 387 self.connect_properties.object_encoding = Some(*object_encoding); 388 } 389 } 390 _ => { 391 log::warn!("unknown connect properties: {}:{:?}", property, value); 392 } 393 } 394 } 395 } 396 397 async fn on_connect( 398 &mut self, 399 transaction_id: &f64, 400 command_obj: &HashMap<String, Amf0ValueType>, 401 ) -> Result<(), SessionError> { 402 self.parse_connect_properties(command_obj); 403 log::info!("connect properties: {:?}", self.connect_properties); 404 let mut control_message = 405 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 406 log::info!("[ S->C ] [set window_acknowledgement_size]"); 407 control_message 408 .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 409 .await?; 410 411 log::info!("[ S->C ] [set set_peer_bandwidth]",); 412 control_message 413 .write_set_peer_bandwidth( 414 define::PEER_BANDWIDTH, 415 define::peer_bandwidth_limit_type::DYNAMIC, 416 ) 417 .await?; 418 419 let obj_encoding = command_obj.get("objectEncoding"); 420 let encoding = match obj_encoding { 421 Some(Amf0ValueType::Number(encoding)) => encoding, 422 _ => &define::OBJENCODING_AMF0, 423 }; 424 425 let app_name = command_obj.get("app"); 426 self.app_name = match app_name { 427 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 428 _ => { 429 return Err(SessionError { 430 value: SessionErrorValue::NoAppName, 431 }); 432 } 433 }; 434 435 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 436 log::info!("[ S->C ] [set connect_response]",); 437 netconnection 438 .write_connect_response( 439 transaction_id, 440 define::FMSVER, 441 &define::CAPABILITIES, 442 &String::from("NetConnection.Connect.Success"), 443 define::LEVEL, 444 &String::from("Connection Succeeded."), 445 encoding, 446 ) 447 .await?; 448 449 Ok(()) 450 } 451 452 pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 453 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 454 netconnection 455 .write_create_stream_response(transaction_id, &define::STREAM_ID) 456 .await?; 457 458 log::info!( 459 "[ S->C ] [create_stream_response] app_name: {}", 460 self.app_name, 461 ); 462 463 Ok(()) 464 } 465 466 pub async fn on_delete_stream( 467 &mut self, 468 transaction_id: &f64, 469 stream_id: &f64, 470 ) -> Result<(), SessionError> { 471 self.common 472 .unpublish_to_channels( 473 self.app_name.clone(), 474 self.stream_name.clone(), 475 self.session_id, 476 ) 477 .await?; 478 479 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 480 netstream 481 .write_on_status( 482 transaction_id, 483 "status", 484 "NetStream.DeleteStream.Suceess", 485 "", 486 ) 487 .await?; 488 489 //self.unsubscribe_from_channels().await?; 490 log::info!( 491 "[ S->C ] [delete stream success] app_name: {}, stream_name: {}", 492 self.app_name, 493 self.stream_name 494 ); 495 log::trace!("{}", stream_id); 496 497 Ok(()) 498 } 499 500 fn get_request_url(&mut self, raw_stream_name: String) -> String { 501 if let Some(tc_url) = &self.connect_properties.tc_url { 502 format!("{tc_url}/{raw_stream_name}") 503 } else { 504 format!("{}/{}", self.app_name.clone(), raw_stream_name) 505 } 506 } 507 /*parse the raw stream name to get real stream name and the URL parameters*/ 508 fn parse_raw_stream_name(&mut self, raw_stream_name: String) { 509 let data: Vec<&str> = raw_stream_name.split('?').collect(); 510 self.stream_name = data[0].to_string(); 511 if data.len() > 1 { 512 self.url_parameters = data[1].to_string(); 513 } 514 } 515 516 #[allow(clippy::never_loop)] 517 pub async fn on_play( 518 &mut self, 519 transaction_id: &f64, 520 stream_id: &u32, 521 other_values: &mut Vec<Amf0ValueType>, 522 ) -> Result<(), SessionError> { 523 let length = other_values.len() as u8; 524 let mut index: u8 = 0; 525 526 let mut stream_name: Option<String> = None; 527 let mut start: Option<f64> = None; 528 let mut duration: Option<f64> = None; 529 let mut reset: Option<bool> = None; 530 531 loop { 532 if index >= length { 533 break; 534 } 535 index += 1; 536 stream_name = match other_values.remove(0) { 537 Amf0ValueType::UTF8String(val) => Some(val), 538 _ => None, 539 }; 540 541 if index >= length { 542 break; 543 } 544 index += 1; 545 start = match other_values.remove(0) { 546 Amf0ValueType::Number(val) => Some(val), 547 _ => None, 548 }; 549 550 if index >= length { 551 break; 552 } 553 index += 1; 554 duration = match other_values.remove(0) { 555 Amf0ValueType::Number(val) => Some(val), 556 _ => None, 557 }; 558 559 if index >= length { 560 break; 561 } 562 //index = index + 1; 563 reset = match other_values.remove(0) { 564 Amf0ValueType::Boolean(val) => Some(val), 565 _ => None, 566 }; 567 break; 568 } 569 570 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 571 event_messages.write_stream_begin(*stream_id).await?; 572 log::info!( 573 "[ S->C ] [stream begin] app_name: {}, stream_name: {}", 574 self.app_name, 575 self.stream_name 576 ); 577 log::trace!( 578 "{} {} {}", 579 start.is_some(), 580 duration.is_some(), 581 reset.is_some() 582 ); 583 584 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 585 netstream 586 .write_on_status(transaction_id, "status", "NetStream.Play.Reset", "reset") 587 .await?; 588 589 netstream 590 .write_on_status( 591 transaction_id, 592 "status", 593 "NetStream.Play.Start", 594 "play start", 595 ) 596 .await?; 597 598 netstream 599 .write_on_status( 600 transaction_id, 601 "status", 602 "NetStream.Data.Start", 603 "data start.", 604 ) 605 .await?; 606 607 netstream 608 .write_on_status( 609 transaction_id, 610 "status", 611 "NetStream.Play.PublishNotify", 612 "play publish notify.", 613 ) 614 .await?; 615 616 event_messages.write_stream_is_record(*stream_id).await?; 617 618 let raw_stream_name = stream_name.unwrap(); 619 self.parse_raw_stream_name(raw_stream_name.clone()); 620 621 log::info!( 622 "[ S->C ] [stream is record] app_name: {}, stream_name: {}, url parameters: {}", 623 self.app_name, 624 self.stream_name, 625 self.url_parameters 626 ); 627 628 /*Now it can update the request url*/ 629 self.common.request_url = self.get_request_url(raw_stream_name); 630 self.common 631 .subscribe_from_channels( 632 self.app_name.clone(), 633 self.stream_name.clone(), 634 self.session_id, 635 ) 636 .await?; 637 638 self.state = ServerSessionState::Play; 639 640 Ok(()) 641 } 642 643 pub async fn on_publish( 644 &mut self, 645 transaction_id: &f64, 646 stream_id: &u32, 647 other_values: &mut Vec<Amf0ValueType>, 648 ) -> Result<(), SessionError> { 649 let length = other_values.len(); 650 651 if length < 2 { 652 return Err(SessionError { 653 value: SessionErrorValue::Amf0ValueCountNotCorrect, 654 }); 655 } 656 657 let raw_stream_name = match other_values.remove(0) { 658 Amf0ValueType::UTF8String(val) => val, 659 _ => { 660 return Err(SessionError { 661 value: SessionErrorValue::Amf0ValueCountNotCorrect, 662 }); 663 } 664 }; 665 666 self.parse_raw_stream_name(raw_stream_name.clone()); 667 /*Now it can update the request url*/ 668 self.common.request_url = self.get_request_url(raw_stream_name); 669 670 let _ = match other_values.remove(0) { 671 Amf0ValueType::UTF8String(val) => val, 672 _ => { 673 return Err(SessionError { 674 value: SessionErrorValue::Amf0ValueCountNotCorrect, 675 }); 676 } 677 }; 678 679 log::info!( 680 "[ S<-C ] [publish] app_name: {}, stream_name: {}, url parameters: {}", 681 self.app_name, 682 self.stream_name, 683 self.url_parameters 684 ); 685 686 log::info!( 687 "[ S->C ] [stream begin] app_name: {}, stream_name: {}, url parameters: {}", 688 self.app_name, 689 self.stream_name, 690 self.url_parameters 691 ); 692 693 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 694 event_messages.write_stream_begin(*stream_id).await?; 695 696 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 697 netstream 698 .write_on_status(transaction_id, "status", "NetStream.Publish.Start", "") 699 .await?; 700 log::info!( 701 "[ S->C ] [NetStream.Publish.Start] app_name: {}, stream_name: {}", 702 self.app_name, 703 self.stream_name 704 ); 705 706 self.common 707 .publish_to_channels( 708 self.app_name.clone(), 709 self.stream_name.clone(), 710 self.session_id, 711 ) 712 .await?; 713 714 Ok(()) 715 } 716 } 717