1 use { 2 super::{ 3 common::Common, 4 define, 5 define::SessionType, 6 errors::{SessionError, SessionErrorValue}, 7 }, 8 crate::{ 9 amf0::Amf0ValueType, 10 channels::define::ChannelEventProducer, 11 chunk::{ 12 define::CHUNK_SIZE, 13 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 14 }, 15 config, handshake, 16 handshake::{define::ServerHandshakeState, handshake_server::HandshakeServer}, 17 messages::{define::RtmpMessageData, parser::MessageParser}, 18 netconnection::writer::{ConnectProperties, NetConnection}, 19 netstream::writer::NetStreamWriter, 20 protocol_control_messages::writer::ProtocolControlMessagesWriter, 21 user_control_messages::writer::EventMessagesWriter, 22 }, 23 bytes::BytesMut, 24 bytesio::{bytes_writer::AsyncBytesWriter, bytesio::BytesIO}, 25 std::{collections::HashMap, sync::Arc, time::Duration}, 26 tokio::{net::TcpStream, sync::Mutex}, 27 uuid::Uuid, 28 }; 29 30 enum ServerSessionState { 31 Handshake, 32 ReadChunk, 33 // OnConnect, 34 // OnCreateStream, 35 //Publish, 36 DeleteStream, 37 Play, 38 } 39 40 pub struct ServerSession { 41 pub app_name: String, 42 pub stream_name: String, 43 io: Arc<Mutex<BytesIO>>, 44 handshaker: HandshakeServer, 45 unpacketizer: ChunkUnpacketizer, 46 state: ServerSessionState, 47 pub common: Common, 48 bytesio_data: BytesMut, 49 has_remaing_data: bool, 50 /* Used to mark the subscriber's the data producer 51 in channels and delete it from map when unsubscribe 52 is called. */ 53 pub session_id: Uuid, 54 connect_properties: ConnectProperties, 55 } 56 57 impl ServerSession { 58 pub fn new(stream: TcpStream, event_producer: ChannelEventProducer) -> Self { 59 let remote_addr = if let Ok(addr) = stream.peer_addr() { 60 log::info!("server session: {}", addr.to_string()); 61 Some(addr) 62 } else { 63 None 64 }; 65 66 let net_io = Arc::new(Mutex::new(BytesIO::new(stream))); 67 let subscriber_id = Uuid::new_v4(); 68 Self { 69 app_name: String::from(""), 70 stream_name: String::from(""), 71 io: Arc::clone(&net_io), 72 handshaker: HandshakeServer::new(Arc::clone(&net_io)), 73 unpacketizer: ChunkUnpacketizer::new(), 74 state: ServerSessionState::Handshake, 75 common: Common::new( 76 Arc::clone(&net_io), 77 event_producer, 78 SessionType::Server, 79 remote_addr, 80 ), 81 session_id: subscriber_id, 82 bytesio_data: BytesMut::new(), 83 has_remaing_data: false, 84 connect_properties: ConnectProperties::default(), 85 } 86 } 87 88 pub async fn run(&mut self) -> Result<(), SessionError> { 89 loop { 90 match self.state { 91 ServerSessionState::Handshake => { 92 self.handshake().await?; 93 } 94 ServerSessionState::ReadChunk => { 95 self.read_parse_chunks().await?; 96 } 97 ServerSessionState::Play => { 98 self.play().await?; 99 } 100 ServerSessionState::DeleteStream => { 101 return Ok(()); 102 } 103 } 104 } 105 106 //Ok(()) 107 } 108 109 async fn handshake(&mut self) -> Result<(), SessionError> { 110 let mut bytes_len = 0; 111 112 while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE { 113 self.bytesio_data = self.io.lock().await.read().await?; 114 bytes_len += self.bytesio_data.len(); 115 self.handshaker.extend_data(&self.bytesio_data[..]); 116 } 117 118 self.handshaker.handshake().await?; 119 120 if let ServerHandshakeState::Finish = self.handshaker.state() { 121 self.state = ServerSessionState::ReadChunk; 122 let left_bytes = self.handshaker.get_remaining_bytes(); 123 if !left_bytes.is_empty() { 124 self.unpacketizer.extend_data(&left_bytes[..]); 125 self.has_remaing_data = true; 126 } 127 log::info!("[ S->C ] [send_set_chunk_size] "); 128 self.send_set_chunk_size().await?; 129 return Ok(()); 130 } 131 132 Ok(()) 133 } 134 135 async fn read_parse_chunks(&mut self) -> Result<(), SessionError> { 136 if !self.has_remaing_data { 137 match self 138 .io 139 .lock() 140 .await 141 .read_timeout(Duration::from_secs(2)) 142 .await 143 { 144 Ok(data) => { 145 self.bytesio_data = data; 146 } 147 Err(err) => { 148 self.common 149 .unpublish_to_channels( 150 self.app_name.clone(), 151 self.stream_name.clone(), 152 self.session_id, 153 ) 154 .await?; 155 156 return Err(SessionError { 157 value: SessionErrorValue::BytesIOError(err), 158 }); 159 } 160 } 161 162 self.unpacketizer.extend_data(&self.bytesio_data[..]); 163 } 164 165 self.has_remaing_data = false; 166 167 loop { 168 let result = self.unpacketizer.read_chunks(); 169 170 if let Ok(rv) = result { 171 if let UnpackResult::Chunks(chunks) = rv { 172 for chunk_info in chunks { 173 let timestamp = chunk_info.message_header.timestamp; 174 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 175 176 let mut msg = MessageParser::new(chunk_info).parse()?; 177 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 178 .await?; 179 } 180 } 181 } else { 182 break; 183 } 184 } 185 Ok(()) 186 } 187 188 async fn play(&mut self) -> Result<(), SessionError> { 189 match self.common.send_channel_data().await { 190 Ok(_) => {} 191 Err(err) => { 192 self.common 193 .unsubscribe_from_channels( 194 self.app_name.clone(), 195 self.stream_name.clone(), 196 self.session_id, 197 ) 198 .await?; 199 return Err(err); 200 } 201 } 202 203 Ok(()) 204 } 205 206 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 207 let mut controlmessage = 208 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 209 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 210 211 Ok(()) 212 } 213 214 pub async fn process_messages( 215 &mut self, 216 rtmp_msg: &mut RtmpMessageData, 217 msg_stream_id: &u32, 218 timestamp: &u32, 219 ) -> Result<(), SessionError> { 220 match rtmp_msg { 221 RtmpMessageData::Amf0Command { 222 command_name, 223 transaction_id, 224 command_object, 225 others, 226 } => { 227 self.on_amf0_command_message( 228 msg_stream_id, 229 command_name, 230 transaction_id, 231 command_object, 232 others, 233 ) 234 .await? 235 } 236 RtmpMessageData::SetChunkSize { chunk_size } => { 237 self.on_set_chunk_size(*chunk_size as usize)?; 238 } 239 RtmpMessageData::AudioData { data } => { 240 self.common.on_audio_data(data, timestamp)?; 241 } 242 RtmpMessageData::VideoData { data } => { 243 self.common.on_video_data(data, timestamp)?; 244 } 245 RtmpMessageData::AmfData { raw_data } => { 246 self.common.on_meta_data(raw_data, timestamp)?; 247 } 248 249 _ => {} 250 } 251 Ok(()) 252 } 253 254 pub async fn on_amf0_command_message( 255 &mut self, 256 stream_id: &u32, 257 command_name: &Amf0ValueType, 258 transaction_id: &Amf0ValueType, 259 command_object: &Amf0ValueType, 260 others: &mut Vec<Amf0ValueType>, 261 ) -> Result<(), SessionError> { 262 let empty_cmd_name = &String::new(); 263 let cmd_name = match command_name { 264 Amf0ValueType::UTF8String(str) => str, 265 _ => empty_cmd_name, 266 }; 267 268 let transaction_id = match transaction_id { 269 Amf0ValueType::Number(number) => number, 270 _ => &0.0, 271 }; 272 273 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 274 let obj = match command_object { 275 Amf0ValueType::Object(obj) => obj, 276 _ => &empty_cmd_obj, 277 }; 278 279 match cmd_name.as_str() { 280 "connect" => { 281 log::info!("[ S<-C ] [connect] "); 282 self.on_connect(transaction_id, obj).await?; 283 } 284 "createStream" => { 285 log::info!("[ S<-C ] [create stream] "); 286 self.on_create_stream(transaction_id).await?; 287 } 288 "deleteStream" => { 289 if !others.is_empty() { 290 let stream_id = match others.pop() { 291 Some(Amf0ValueType::Number(streamid)) => streamid, 292 _ => 0.0, 293 }; 294 295 log::info!( 296 "[ S<-C ] [delete stream] app_name: {}, stream_name: {}", 297 self.app_name, 298 self.stream_name 299 ); 300 301 self.on_delete_stream(transaction_id, &stream_id).await?; 302 self.state = ServerSessionState::DeleteStream; 303 } 304 } 305 "play" => { 306 log::info!( 307 "[ S<-C ] [play] app_name: {}, stream_name: {}", 308 self.app_name, 309 self.stream_name 310 ); 311 self.unpacketizer.session_type = config::SERVER_PULL; 312 self.on_play(transaction_id, stream_id, others).await?; 313 } 314 "publish" => { 315 self.unpacketizer.session_type = config::SERVER_PUSH; 316 self.on_publish(transaction_id, stream_id, others).await?; 317 } 318 _ => {} 319 } 320 321 Ok(()) 322 } 323 324 fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> { 325 log::info!( 326 "[ S<-C ] [set chunk size] app_name: {}, stream_name: {}, chunk size: {}", 327 self.app_name, 328 self.stream_name, 329 chunk_size 330 ); 331 self.unpacketizer.update_max_chunk_size(chunk_size); 332 Ok(()) 333 } 334 335 fn parse_connect_properties(&mut self, command_obj: &HashMap<String, Amf0ValueType>) { 336 for (property, value) in command_obj { 337 match property.as_str() { 338 "app" => { 339 if let Amf0ValueType::UTF8String(app) = value { 340 self.connect_properties.app = Some(app.clone()); 341 } 342 } 343 "flashVer" => { 344 if let Amf0ValueType::UTF8String(flash_ver) = value { 345 self.connect_properties.flash_ver = Some(flash_ver.clone()); 346 } 347 } 348 "swfUrl" => { 349 if let Amf0ValueType::UTF8String(swf_url) = value { 350 self.connect_properties.swf_url = Some(swf_url.clone()); 351 } 352 } 353 "tcUrl" => { 354 if let Amf0ValueType::UTF8String(tc_url) = value { 355 self.connect_properties.tc_url = Some(tc_url.clone()); 356 } 357 } 358 "fpad" => { 359 if let Amf0ValueType::Boolean(fpad) = value { 360 self.connect_properties.fpad = Some(*fpad); 361 } 362 } 363 "audioCodecs" => { 364 if let Amf0ValueType::Number(audio_codecs) = value { 365 self.connect_properties.audio_codecs = Some(*audio_codecs); 366 } 367 } 368 "videoCodecs" => { 369 if let Amf0ValueType::Number(video_codecs) = value { 370 self.connect_properties.video_codecs = Some(*video_codecs); 371 } 372 } 373 "videoFunction" => { 374 if let Amf0ValueType::Number(video_function) = value { 375 self.connect_properties.video_function = Some(*video_function); 376 } 377 } 378 "pageUrl" => { 379 if let Amf0ValueType::UTF8String(page_url) = value { 380 self.connect_properties.page_url = Some(page_url.clone()); 381 } 382 } 383 "objectEncoding" => { 384 if let Amf0ValueType::Number(object_encoding) = value { 385 self.connect_properties.object_encoding = Some(*object_encoding); 386 } 387 } 388 _ => { 389 log::warn!("unknown connect properties: {}:{:?}", property, value); 390 } 391 } 392 } 393 } 394 395 async fn on_connect( 396 &mut self, 397 transaction_id: &f64, 398 command_obj: &HashMap<String, Amf0ValueType>, 399 ) -> Result<(), SessionError> { 400 self.parse_connect_properties(command_obj); 401 log::info!("connect properties: {:?}", self.connect_properties); 402 let mut control_message = 403 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 404 log::info!("[ S->C ] [set window_acknowledgement_size]"); 405 control_message 406 .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 407 .await?; 408 409 log::info!("[ S->C ] [set set_peer_bandwidth]",); 410 control_message 411 .write_set_peer_bandwidth( 412 define::PEER_BANDWIDTH, 413 define::peer_bandwidth_limit_type::DYNAMIC, 414 ) 415 .await?; 416 417 let obj_encoding = command_obj.get("objectEncoding"); 418 let encoding = match obj_encoding { 419 Some(Amf0ValueType::Number(encoding)) => encoding, 420 _ => &define::OBJENCODING_AMF0, 421 }; 422 423 let app_name = command_obj.get("app"); 424 self.app_name = match app_name { 425 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 426 _ => { 427 return Err(SessionError { 428 value: SessionErrorValue::NoAppName, 429 }); 430 } 431 }; 432 433 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 434 log::info!("[ S->C ] [set connect_response]",); 435 netconnection 436 .write_connect_response( 437 transaction_id, 438 define::FMSVER, 439 &define::CAPABILITIES, 440 &String::from("NetConnection.Connect.Success"), 441 define::LEVEL, 442 &String::from("Connection Succeeded."), 443 encoding, 444 ) 445 .await?; 446 447 Ok(()) 448 } 449 450 pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 451 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 452 netconnection 453 .write_create_stream_response(transaction_id, &define::STREAM_ID) 454 .await?; 455 456 log::info!( 457 "[ S->C ] [create_stream_response] app_name: {}", 458 self.app_name, 459 ); 460 461 Ok(()) 462 } 463 464 pub async fn on_delete_stream( 465 &mut self, 466 transaction_id: &f64, 467 stream_id: &f64, 468 ) -> Result<(), SessionError> { 469 self.common 470 .unpublish_to_channels( 471 self.app_name.clone(), 472 self.stream_name.clone(), 473 self.session_id, 474 ) 475 .await?; 476 477 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 478 netstream 479 .write_on_status( 480 transaction_id, 481 "status", 482 "NetStream.DeleteStream.Suceess", 483 "", 484 ) 485 .await?; 486 487 //self.unsubscribe_from_channels().await?; 488 log::info!( 489 "[ S->C ] [delete stream success] app_name: {}, stream_name: {}", 490 self.app_name, 491 self.stream_name 492 ); 493 log::trace!("{}", stream_id); 494 495 Ok(()) 496 } 497 498 fn get_request_url(&mut self) -> String { 499 if let Some(tc_url) = &self.connect_properties.tc_url { 500 format!("{}/{}", tc_url, self.stream_name.clone()) 501 } else { 502 format!("{}/{}", self.app_name.clone(), self.stream_name.clone()) 503 } 504 } 505 506 #[allow(clippy::never_loop)] 507 pub async fn on_play( 508 &mut self, 509 transaction_id: &f64, 510 stream_id: &u32, 511 other_values: &mut Vec<Amf0ValueType>, 512 ) -> Result<(), SessionError> { 513 let length = other_values.len() as u8; 514 let mut index: u8 = 0; 515 516 let mut stream_name: Option<String> = None; 517 let mut start: Option<f64> = None; 518 let mut duration: Option<f64> = None; 519 let mut reset: Option<bool> = None; 520 521 loop { 522 if index >= length { 523 break; 524 } 525 index += 1; 526 stream_name = match other_values.remove(0) { 527 Amf0ValueType::UTF8String(val) => Some(val), 528 _ => None, 529 }; 530 531 if index >= length { 532 break; 533 } 534 index += 1; 535 start = match other_values.remove(0) { 536 Amf0ValueType::Number(val) => Some(val), 537 _ => None, 538 }; 539 540 if index >= length { 541 break; 542 } 543 index += 1; 544 duration = match other_values.remove(0) { 545 Amf0ValueType::Number(val) => Some(val), 546 _ => None, 547 }; 548 549 if index >= length { 550 break; 551 } 552 //index = index + 1; 553 reset = match other_values.remove(0) { 554 Amf0ValueType::Boolean(val) => Some(val), 555 _ => None, 556 }; 557 break; 558 } 559 560 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 561 event_messages.write_stream_begin(*stream_id).await?; 562 log::info!( 563 "[ S->C ] [stream begin] app_name: {}, stream_name: {}", 564 self.app_name, 565 self.stream_name 566 ); 567 log::trace!( 568 "{} {} {}", 569 start.is_some(), 570 duration.is_some(), 571 reset.is_some() 572 ); 573 574 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 575 netstream 576 .write_on_status(transaction_id, "status", "NetStream.Play.Reset", "reset") 577 .await?; 578 579 netstream 580 .write_on_status( 581 transaction_id, 582 "status", 583 "NetStream.Play.Start", 584 "play start", 585 ) 586 .await?; 587 588 netstream 589 .write_on_status( 590 transaction_id, 591 "status", 592 "NetStream.Data.Start", 593 "data start.", 594 ) 595 .await?; 596 597 netstream 598 .write_on_status( 599 transaction_id, 600 "status", 601 "NetStream.Play.PublishNotify", 602 "play publish notify.", 603 ) 604 .await?; 605 606 event_messages.write_stream_is_record(*stream_id).await?; 607 log::info!( 608 "[ S->C ] [stream is record] app_name: {}, stream_name: {}", 609 self.app_name, 610 self.stream_name 611 ); 612 self.stream_name = stream_name.clone().unwrap(); 613 /*Now it can update the request url*/ 614 self.common.request_url = self.get_request_url(); 615 self.common 616 .subscribe_from_channels(self.app_name.clone(), stream_name.unwrap(), self.session_id) 617 .await?; 618 619 self.state = ServerSessionState::Play; 620 621 Ok(()) 622 } 623 624 pub async fn on_publish( 625 &mut self, 626 transaction_id: &f64, 627 stream_id: &u32, 628 other_values: &mut Vec<Amf0ValueType>, 629 ) -> Result<(), SessionError> { 630 let length = other_values.len(); 631 632 if length < 2 { 633 return Err(SessionError { 634 value: SessionErrorValue::Amf0ValueCountNotCorrect, 635 }); 636 } 637 638 let stream_name = match other_values.remove(0) { 639 Amf0ValueType::UTF8String(val) => val, 640 _ => { 641 return Err(SessionError { 642 value: SessionErrorValue::Amf0ValueCountNotCorrect, 643 }); 644 } 645 }; 646 647 self.stream_name = stream_name; 648 /*Now it can update the request url*/ 649 self.common.request_url = self.get_request_url(); 650 651 let _ = match other_values.remove(0) { 652 Amf0ValueType::UTF8String(val) => val, 653 _ => { 654 return Err(SessionError { 655 value: SessionErrorValue::Amf0ValueCountNotCorrect, 656 }); 657 } 658 }; 659 660 log::info!( 661 "[ S<-C ] [publish] app_name: {}, stream_name: {}", 662 self.app_name, 663 self.stream_name 664 ); 665 666 log::info!( 667 "[ S->C ] [stream begin] app_name: {}, stream_name: {}", 668 self.app_name, 669 self.stream_name 670 ); 671 672 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 673 event_messages.write_stream_begin(*stream_id).await?; 674 675 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 676 netstream 677 .write_on_status(transaction_id, "status", "NetStream.Publish.Start", "") 678 .await?; 679 log::info!( 680 "[ S->C ] [NetStream.Publish.Start] app_name: {}, stream_name: {}", 681 self.app_name, 682 self.stream_name 683 ); 684 685 self.common 686 .publish_to_channels( 687 self.app_name.clone(), 688 self.stream_name.clone(), 689 self.session_id, 690 ) 691 .await?; 692 693 Ok(()) 694 } 695 } 696