1 use { 2 super::{ 3 define, 4 errors::{SessionError, SessionErrorValue}, 5 }, 6 crate::{ 7 amf0::Amf0ValueType, 8 channels::define::{ 9 ChannelData, ChannelDataConsumer, ChannelDataPublisher, ChannelEvent, 10 ChannelEventPublisher, 11 }, 12 chunk::{ 13 define::{chunk_type, csid_type, CHUNK_SIZE}, 14 packetizer::ChunkPacketizer, 15 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 16 ChunkInfo, 17 }, 18 config, 19 handshake::handshake::{ServerHandshakeState, SimpleHandshakeServer}, 20 messages::{ 21 define::{msg_type_id, RtmpMessageData}, 22 parser::MessageParser, 23 }, 24 netconnection::commands::NetConnection, 25 netstream::writer::NetStreamWriter, 26 protocol_control_messages::writer::ProtocolControlMessagesWriter, 27 user_control_messages::writer::EventMessagesWriter, 28 }, 29 bytes::BytesMut, 30 netio::{ 31 bytes_writer::{AsyncBytesWriter, BytesWriter}, 32 netio::NetworkIO, 33 }, 34 std::{collections::HashMap, sync::Arc}, 35 tokio::{ 36 net::TcpStream, 37 sync::{mpsc, oneshot, Mutex}, 38 }, 39 }; 40 41 enum ServerSessionState { 42 Handshake, 43 ReadChunk, 44 // OnConnect, 45 // OnCreateStream, 46 //Publish, 47 Play, 48 } 49 50 pub struct ServerSession { 51 app_name: String, 52 stream_name: String, 53 54 io: Arc<Mutex<NetworkIO>>, 55 simple_handshaker: SimpleHandshakeServer, 56 //complex_handshaker: ComplexHandshakeServer, 57 packetizer: ChunkPacketizer, 58 unpacketizer: ChunkUnpacketizer, 59 60 state: ServerSessionState, 61 62 event_producer: ChannelEventPublisher, 63 64 //send video, audio or metadata from publish server session to player server sessions 65 data_producer: ChannelDataPublisher, 66 //receive video, audio or metadata from publish server session and send out to player 67 data_consumer: ChannelDataConsumer, 68 69 netio_data: BytesMut, 70 need_process: bool, 71 72 pub session_id: u64, 73 pub session_type: u8, 74 } 75 76 impl ServerSession { 77 pub fn new(stream: TcpStream, event_producer: ChannelEventPublisher, session_id: u64) -> Self { 78 let net_io = Arc::new(Mutex::new(NetworkIO::new(stream))); 79 //only used for init,since I don't found a better way to deal with this. 80 let (init_producer, init_consumer) = mpsc::unbounded_channel(); 81 82 Self { 83 app_name: String::from(""), 84 stream_name: String::from(""), 85 86 io: Arc::clone(&net_io), 87 simple_handshaker: SimpleHandshakeServer::new(Arc::clone(&net_io)), 88 //complex_handshaker: ComplexHandshakeServer::new(Arc::clone(&net_io)), 89 packetizer: ChunkPacketizer::new(Arc::clone(&net_io)), 90 unpacketizer: ChunkUnpacketizer::new(), 91 92 state: ServerSessionState::Handshake, 93 94 event_producer, 95 data_producer: init_producer, 96 data_consumer: init_consumer, 97 session_id: session_id, 98 netio_data: BytesMut::new(), 99 need_process: false, 100 session_type: 0, 101 } 102 } 103 104 pub async fn run(&mut self) -> Result<(), SessionError> { 105 loop { 106 match self.state { 107 ServerSessionState::Handshake => { 108 self.handshake().await?; 109 } 110 ServerSessionState::ReadChunk => { 111 self.read_parse_chunks().await?; 112 } 113 ServerSessionState::Play => { 114 self.play().await?; 115 } 116 } 117 } 118 119 //Ok(()) 120 } 121 122 async fn handshake(&mut self) -> Result<(), SessionError> { 123 self.netio_data = self.io.lock().await.read().await?; 124 self.simple_handshaker.extend_data(&self.netio_data[..]); 125 self.simple_handshaker.handshake().await?; 126 127 match self.simple_handshaker.state { 128 ServerHandshakeState::Finish => { 129 self.state = ServerSessionState::ReadChunk; 130 131 let left_bytes = self.simple_handshaker.get_remaining_bytes(); 132 if left_bytes.len() > 0 { 133 self.unpacketizer.extend_data(&left_bytes[..]); 134 self.need_process = true; 135 } 136 137 return Ok(()); 138 } 139 _ => {} 140 } 141 142 Ok(()) 143 } 144 145 async fn read_parse_chunks(&mut self) -> Result<(), SessionError> { 146 if !self.need_process { 147 self.netio_data = self.io.lock().await.read().await?; 148 self.unpacketizer.extend_data(&self.netio_data[..]); 149 } 150 151 self.need_process = false; 152 153 loop { 154 let result = self.unpacketizer.read_chunks(); 155 156 if let Ok(rv) = result { 157 match rv { 158 UnpackResult::Chunks(chunks) => { 159 for chunk_info in chunks.iter() { 160 let mut msg = MessageParser::new(chunk_info.clone(), self.session_type) 161 .parse()?; 162 163 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 164 let timestamp = chunk_info.message_header.timestamp; 165 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 166 .await?; 167 } 168 } 169 _ => {} 170 } 171 } else { 172 break; 173 } 174 } 175 Ok(()) 176 } 177 178 async fn play(&mut self) -> Result<(), SessionError> { 179 match self.send_media_data().await { 180 Ok(_) => {} 181 Err(err) => { 182 // let len = self.unpacketizer.reader.get_remaining_bytes().len(); 183 // print!("send meidi data err len:{}\n", len); 184 185 // utils::print::print(self.unpacketizer.reader.get_remaining_bytes()); 186 187 // if len > 0 { 188 // self.need_process = true; 189 // } 190 191 // self.state = ServerSessionState::ReadChunk; 192 193 self.unsubscribe_from_channels().await?; 194 195 return Err(err); 196 } 197 } 198 199 Ok(()) 200 } 201 202 async fn send_media_data(&mut self) -> Result<(), SessionError> { 203 loop { 204 if let Some(data) = self.data_consumer.recv().await { 205 match data { 206 ChannelData::Audio { timestamp, data } => { 207 //print!("send audio data\n"); 208 self.send_audio(data, timestamp).await?; 209 } 210 ChannelData::Video { timestamp, data } => { 211 //print!("send video data\n"); 212 self.send_video(data, timestamp).await?; 213 } 214 ChannelData::MetaData { body } => { 215 print!("send meta data\n"); 216 self.send_metadata(body).await?; 217 } 218 } 219 } 220 } 221 } 222 223 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 224 let mut controlmessage = 225 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 226 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 227 228 Ok(()) 229 } 230 pub async fn send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> { 231 let mut chunk_info = ChunkInfo::new( 232 csid_type::AUDIO, 233 chunk_type::TYPE_0, 234 timestamp, 235 data.len() as u32, 236 msg_type_id::AUDIO, 237 0, 238 data, 239 ); 240 241 self.packetizer.write_chunk(&mut chunk_info).await?; 242 243 Ok(()) 244 } 245 246 pub async fn send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> { 247 let mut chunk_info = ChunkInfo::new( 248 csid_type::VIDEO, 249 chunk_type::TYPE_0, 250 timestamp, 251 data.len() as u32, 252 msg_type_id::VIDEO, 253 0, 254 data, 255 ); 256 257 self.packetizer.write_chunk(&mut chunk_info).await?; 258 259 Ok(()) 260 } 261 262 async fn send_metadata(&mut self, data: BytesMut) -> Result<(), SessionError> { 263 let mut chunk_info = ChunkInfo::new( 264 csid_type::DATA_AMF0_AMF3, 265 chunk_type::TYPE_0, 266 0, 267 data.len() as u32, 268 msg_type_id::DATA_AMF0, 269 0, 270 data, 271 ); 272 273 self.packetizer.write_chunk(&mut chunk_info).await?; 274 Ok(()) 275 } 276 pub async fn process_messages( 277 &mut self, 278 rtmp_msg: &mut RtmpMessageData, 279 msg_stream_id: &u32, 280 timestamp: &u32, 281 ) -> Result<(), SessionError> { 282 match rtmp_msg { 283 RtmpMessageData::Amf0Command { 284 command_name, 285 transaction_id, 286 command_object, 287 others, 288 } => { 289 self.on_amf0_command_message( 290 msg_stream_id, 291 command_name, 292 transaction_id, 293 command_object, 294 others, 295 ) 296 .await? 297 } 298 RtmpMessageData::SetChunkSize { chunk_size } => { 299 self.on_set_chunk_size(chunk_size.clone() as usize)?; 300 } 301 RtmpMessageData::AudioData { data } => { 302 self.on_audio_data(data, timestamp)?; 303 } 304 RtmpMessageData::VideoData { data } => { 305 self.on_video_data(data, timestamp)?; 306 } 307 RtmpMessageData::AmfData { raw_data } => { 308 self.on_amf_data(raw_data)?; 309 } 310 311 _ => {} 312 } 313 Ok(()) 314 } 315 316 pub async fn on_amf0_command_message( 317 &mut self, 318 stream_id: &u32, 319 command_name: &Amf0ValueType, 320 transaction_id: &Amf0ValueType, 321 command_object: &Amf0ValueType, 322 others: &mut Vec<Amf0ValueType>, 323 ) -> Result<(), SessionError> { 324 let empty_cmd_name = &String::new(); 325 let cmd_name = match command_name { 326 Amf0ValueType::UTF8String(str) => str, 327 _ => empty_cmd_name, 328 }; 329 330 let transaction_id = match transaction_id { 331 Amf0ValueType::Number(number) => number, 332 _ => &0.0, 333 }; 334 335 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 336 let obj = match command_object { 337 Amf0ValueType::Object(obj) => obj, 338 _ => &empty_cmd_obj, 339 }; 340 341 match cmd_name.as_str() { 342 "connect" => { 343 print!("connect ......."); 344 self.on_connect(&transaction_id, &obj).await?; 345 } 346 "createStream" => { 347 self.on_create_stream(transaction_id).await?; 348 } 349 "deleteStream" => { 350 print!("deletestream....\n"); 351 if others.len() > 0 { 352 let stream_id = match others.pop() { 353 Some(val) => match val { 354 Amf0ValueType::Number(streamid) => streamid, 355 _ => 0.0, 356 }, 357 _ => 0.0, 358 }; 359 360 print!("deletestream....{}\n", stream_id); 361 362 self.on_delete_stream(transaction_id, &stream_id).await?; 363 } 364 } 365 "play" => { 366 self.session_type = config::SERVER_PULL; 367 self.unpacketizer.session_type = config::SERVER_PULL; 368 self.on_play(transaction_id, stream_id, others).await?; 369 } 370 "publish" => { 371 self.session_type = config::SERVER_PUSH; 372 self.unpacketizer.session_type = config::SERVER_PUSH; 373 self.on_publish(transaction_id, stream_id, others).await?; 374 } 375 _ => {} 376 } 377 378 Ok(()) 379 } 380 381 fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> { 382 self.unpacketizer.update_max_chunk_size(chunk_size); 383 Ok(()) 384 } 385 386 async fn on_connect( 387 &mut self, 388 transaction_id: &f64, 389 command_obj: &HashMap<String, Amf0ValueType>, 390 ) -> Result<(), SessionError> { 391 let mut control_message = 392 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 393 control_message 394 .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 395 .await?; 396 control_message 397 .write_set_peer_bandwidth( 398 define::PEER_BANDWIDTH, 399 define::peer_bandwidth_limit_type::DYNAMIC, 400 ) 401 .await?; 402 control_message.write_set_chunk_size(CHUNK_SIZE).await?; 403 404 let obj_encoding = command_obj.get("objectEncoding"); 405 let encoding = match obj_encoding { 406 Some(Amf0ValueType::Number(encoding)) => encoding, 407 _ => &define::OBJENCODING_AMF0, 408 }; 409 410 let app_name = command_obj.get("app"); 411 self.app_name = match app_name { 412 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 413 _ => { 414 return Err(SessionError { 415 value: SessionErrorValue::NoAppName, 416 }); 417 } 418 }; 419 420 let mut netconnection = NetConnection::new(BytesWriter::new()); 421 let data = netconnection.connect_response( 422 &transaction_id, 423 &define::FMSVER.to_string(), 424 &define::CAPABILITIES, 425 &String::from("NetConnection.Connect.Success"), 426 &define::LEVEL.to_string(), 427 &String::from("Connection Succeeded."), 428 encoding, 429 )?; 430 431 let mut chunk_info = ChunkInfo::new( 432 csid_type::COMMAND_AMF0_AMF3, 433 chunk_type::TYPE_0, 434 0, 435 data.len() as u32, 436 msg_type_id::COMMAND_AMF0, 437 0, 438 data, 439 ); 440 441 self.packetizer.write_chunk(&mut chunk_info).await?; 442 443 Ok(()) 444 } 445 446 pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 447 let mut netconnection = NetConnection::new(BytesWriter::new()); 448 let data = netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?; 449 450 let mut chunk_info = ChunkInfo::new( 451 csid_type::COMMAND_AMF0_AMF3, 452 chunk_type::TYPE_0, 453 0, 454 data.len() as u32, 455 msg_type_id::COMMAND_AMF0, 456 0, 457 data, 458 ); 459 460 self.packetizer.write_chunk(&mut chunk_info).await?; 461 462 Ok(()) 463 } 464 465 pub async fn on_delete_stream( 466 &mut self, 467 transaction_id: &f64, 468 stream_id: &f64, 469 ) -> Result<(), SessionError> { 470 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 471 netstream 472 .on_status( 473 transaction_id, 474 &"status".to_string(), 475 &"NetStream.DeleteStream.Suceess".to_string(), 476 &"".to_string(), 477 ) 478 .await?; 479 480 print!("stream id{}", stream_id); 481 482 //self.unsubscribe_from_channels().await?; 483 484 Ok(()) 485 } 486 pub async fn on_play( 487 &mut self, 488 transaction_id: &f64, 489 stream_id: &u32, 490 other_values: &mut Vec<Amf0ValueType>, 491 ) -> Result<(), SessionError> { 492 let length = other_values.len() as u8; 493 let mut index: u8 = 0; 494 495 let mut stream_name: Option<String> = None; 496 let mut start: Option<f64> = None; 497 let mut duration: Option<f64> = None; 498 let mut reset: Option<bool> = None; 499 500 loop { 501 if index >= length { 502 break; 503 } 504 index = index + 1; 505 stream_name = match other_values.remove(0) { 506 Amf0ValueType::UTF8String(val) => Some(val), 507 _ => None, 508 }; 509 510 if index >= length { 511 break; 512 } 513 index = index + 1; 514 start = match other_values.remove(0) { 515 Amf0ValueType::Number(val) => Some(val), 516 _ => None, 517 }; 518 519 if index >= length { 520 break; 521 } 522 index = index + 1; 523 duration = match other_values.remove(0) { 524 Amf0ValueType::Number(val) => Some(val), 525 _ => None, 526 }; 527 528 if index >= length { 529 break; 530 } 531 //index = index + 1; 532 reset = match other_values.remove(0) { 533 Amf0ValueType::Boolean(val) => Some(val), 534 _ => None, 535 }; 536 break; 537 } 538 print!("start {}", start.is_some()); 539 print!("druation {}", duration.is_some()); 540 print!("reset {}", reset.is_some()); 541 542 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 543 event_messages.write_stream_begin(stream_id.clone()).await?; 544 545 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 546 netstream 547 .on_status( 548 transaction_id, 549 &"status".to_string(), 550 &"NetStream.Play.Reset".to_string(), 551 &"reset".to_string(), 552 ) 553 .await?; 554 555 netstream 556 .on_status( 557 transaction_id, 558 &"status".to_string(), 559 &"NetStream.Play.Start".to_string(), 560 &"play start".to_string(), 561 ) 562 .await?; 563 564 netstream 565 .on_status( 566 transaction_id, 567 &"status".to_string(), 568 &"NetStream.Data.Start".to_string(), 569 &"data start.".to_string(), 570 ) 571 .await?; 572 573 netstream 574 .on_status( 575 transaction_id, 576 &"status".to_string(), 577 &"NetStream.Play.PublishNotify".to_string(), 578 &"play publish notify.".to_string(), 579 ) 580 .await?; 581 582 event_messages 583 .write_stream_is_record(stream_id.clone()) 584 .await?; 585 586 self.subscribe_from_channels(stream_name.unwrap()).await?; 587 self.state = ServerSessionState::Play; 588 589 Ok(()) 590 } 591 592 async fn unsubscribe_from_channels(&mut self) -> Result<(), SessionError> { 593 let subscribe_event = ChannelEvent::UnSubscribe { 594 app_name: self.app_name.clone(), 595 stream_name: self.stream_name.clone(), 596 session_id: self.session_id, 597 }; 598 599 let _ = self.event_producer.send(subscribe_event); 600 601 Ok(()) 602 } 603 604 async fn subscribe_from_channels(&mut self, stream_name: String) -> Result<(), SessionError> { 605 self.stream_name = stream_name.clone(); 606 607 print!( 608 "subscribe info............{} {} {}\n", 609 self.app_name, 610 stream_name.clone(), 611 self.session_id 612 ); 613 614 let (sender, receiver) = oneshot::channel(); 615 let subscribe_event = ChannelEvent::Subscribe { 616 app_name: self.app_name.clone(), 617 stream_name, 618 session_id: self.session_id, 619 responder: sender, 620 }; 621 622 let rv = self.event_producer.send(subscribe_event); 623 match rv { 624 Err(_) => { 625 return Err(SessionError { 626 value: SessionErrorValue::ChannelEventSendErr, 627 }) 628 } 629 _ => {} 630 } 631 632 match receiver.await { 633 Ok(consumer) => { 634 self.data_consumer = consumer; 635 } 636 Err(_) => {} 637 } 638 Ok(()) 639 } 640 641 pub async fn on_publish( 642 &mut self, 643 transaction_id: &f64, 644 stream_id: &u32, 645 other_values: &mut Vec<Amf0ValueType>, 646 ) -> Result<(), SessionError> { 647 let length = other_values.len(); 648 649 if length < 2 { 650 return Err(SessionError { 651 value: SessionErrorValue::Amf0ValueCountNotCorrect, 652 }); 653 } 654 655 let stream_name = match other_values.remove(0) { 656 Amf0ValueType::UTF8String(val) => val, 657 _ => { 658 return Err(SessionError { 659 value: SessionErrorValue::Amf0ValueCountNotCorrect, 660 }); 661 } 662 }; 663 664 let _ = match other_values.remove(0) { 665 Amf0ValueType::UTF8String(val) => val, 666 _ => { 667 return Err(SessionError { 668 value: SessionErrorValue::Amf0ValueCountNotCorrect, 669 }); 670 } 671 }; 672 673 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 674 event_messages.write_stream_begin(stream_id.clone()).await?; 675 676 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 677 netstream 678 .on_status( 679 transaction_id, 680 &"status".to_string(), 681 &"NetStream.Publish.Start".to_string(), 682 &"".to_string(), 683 ) 684 .await?; 685 686 print!("before publish_to_channels\n"); 687 self.publish_to_channels(stream_name).await?; 688 print!("after publish_to_channels\n"); 689 690 Ok(()) 691 } 692 693 async fn publish_to_channels(&mut self, stream_name: String) -> Result<(), SessionError> { 694 let (sender, receiver) = oneshot::channel(); 695 let publish_event = ChannelEvent::Publish { 696 app_name: self.app_name.clone(), 697 stream_name, 698 responder: sender, 699 }; 700 701 let rv = self.event_producer.send(publish_event); 702 match rv { 703 Err(_) => { 704 return Err(SessionError { 705 value: SessionErrorValue::ChannelEventSendErr, 706 }) 707 } 708 _ => {} 709 } 710 711 match receiver.await { 712 Ok(producer) => { 713 print!("set producer before\n"); 714 self.data_producer = producer; 715 print!("set producer after\n"); 716 } 717 Err(_) => {} 718 } 719 Ok(()) 720 } 721 722 pub fn on_video_data( 723 &mut self, 724 data: &mut BytesMut, 725 timestamp: &u32, 726 ) -> Result<(), SessionError> { 727 let data = ChannelData::Video { 728 timestamp: timestamp.clone(), 729 data: data.clone(), 730 }; 731 732 //print!("receive video data\n"); 733 match self.data_producer.send(data) { 734 Ok(_) => {} 735 Err(err) => { 736 print!("send video err {}\n", err); 737 return Err(SessionError { 738 value: SessionErrorValue::SendChannelDataErr, 739 }); 740 } 741 } 742 743 Ok(()) 744 } 745 746 pub fn on_audio_data( 747 &mut self, 748 data: &mut BytesMut, 749 timestamp: &u32, 750 ) -> Result<(), SessionError> { 751 let data = ChannelData::Audio { 752 timestamp: timestamp.clone(), 753 data: data.clone(), 754 }; 755 756 match self.data_producer.send(data) { 757 Ok(_) => {} 758 Err(err) => { 759 print!("receive audio err {}\n", err); 760 return Err(SessionError { 761 value: SessionErrorValue::SendChannelDataErr, 762 }); 763 } 764 } 765 766 Ok(()) 767 } 768 769 pub fn on_amf_data(&mut self, body: &mut BytesMut) -> Result<(), SessionError> { 770 let data = ChannelData::MetaData { body: body.clone() }; 771 772 match self.data_producer.send(data) { 773 Ok(_) => {} 774 Err(_) => { 775 return Err(SessionError { 776 value: SessionErrorValue::SendChannelDataErr, 777 }) 778 } 779 } 780 781 Ok(()) 782 } 783 } 784