1 use { 2 super::{ 3 define, 4 errors::{SessionError, SessionErrorValue}, 5 }, 6 crate::{ 7 amf0::Amf0ValueType, 8 channels::define::{ 9 ChannelData, ChannelDataConsumer, ChannelDataPublisher, ChannelEvent, 10 ChannelEventPublisher, 11 }, 12 chunk::{ 13 define::{chunk_type, csid_type, CHUNK_SIZE}, 14 packetizer::ChunkPacketizer, 15 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 16 ChunkInfo, 17 }, 18 handshake::handshake::{ 19 ComplexHandshakeServer, ServerHandshakeState, SimpleHandshakeServer, 20 }, 21 messages::{ 22 define::{msg_type_id, RtmpMessageData}, 23 parser::MessageParser, 24 }, 25 netconnection::commands::NetConnection, 26 netstream::writer::NetStreamWriter, 27 protocol_control_messages::writer::ProtocolControlMessagesWriter, 28 user_control_messages::writer::EventMessagesWriter, 29 }, 30 bytes::BytesMut, 31 netio::{ 32 bytes_writer::{AsyncBytesWriter, BytesWriter}, 33 netio::NetworkIO, 34 }, 35 std::{collections::HashMap, sync::Arc, time::Duration}, 36 tokio::{ 37 net::TcpStream, 38 sync::{mpsc, oneshot, Mutex}, 39 }, 40 }; 41 42 enum ServerSessionState { 43 Handshake, 44 ReadChunk, 45 // OnConnect, 46 // OnCreateStream, 47 //Publish, 48 Play, 49 } 50 51 pub struct ServerSession { 52 app_name: String, 53 54 io: Arc<Mutex<NetworkIO>>, 55 simple_handshaker: SimpleHandshakeServer, 56 complex_handshaker: ComplexHandshakeServer, 57 58 packetizer: ChunkPacketizer, 59 unpacketizer: ChunkUnpacketizer, 60 61 state: ServerSessionState, 62 63 event_producer: ChannelEventPublisher, 64 65 //send video, audio or metadata from publish server session to player server sessions 66 data_producer: ChannelDataPublisher, 67 //receive video, audio or metadata from publish server session and send out to player 68 data_consumer: ChannelDataConsumer, 69 70 session_id: u8, 71 } 72 73 impl ServerSession { 74 pub fn new( 75 stream: TcpStream, 76 event_producer: ChannelEventPublisher, 77 timeout: Duration, 78 session_id: u8, 79 ) -> Self { 80 let net_io = Arc::new(Mutex::new(NetworkIO::new(stream, timeout))); 81 //only used for init,since I don't found a better way to deal with this. 82 let (init_producer, init_consumer) = mpsc::unbounded_channel(); 83 84 // let reader = BytesReader::new(BytesMut::new()); 85 86 Self { 87 app_name: String::from(""), 88 89 io: Arc::clone(&net_io), 90 simple_handshaker: SimpleHandshakeServer::new(Arc::clone(&net_io)), 91 complex_handshaker: ComplexHandshakeServer::new(Arc::clone(&net_io)), 92 packetizer: ChunkPacketizer::new(Arc::clone(&net_io)), 93 unpacketizer: ChunkUnpacketizer::new(), 94 95 state: ServerSessionState::Handshake, 96 97 event_producer, 98 data_producer: init_producer, 99 data_consumer: init_consumer, 100 session_id: session_id, 101 } 102 } 103 104 pub async fn run(&mut self) -> Result<(), SessionError> { 105 let duration = Duration::new(10, 10); 106 107 let mut remaining_bytes: BytesMut = BytesMut::new(); 108 109 loop { 110 let mut net_io_data: BytesMut = BytesMut::new(); 111 if remaining_bytes.len() <= 0 { 112 net_io_data = self.io.lock().await.read().await?; 113 114 //utils::print::print(net_io_data.clone()); 115 } 116 117 match self.state { 118 ServerSessionState::Handshake => { 119 if self.session_id > 0 { 120 // utils::print::printu8(net_io_data.clone()); 121 } 122 123 self.simple_handshaker.extend_data(&net_io_data[..]); 124 self.simple_handshaker.handshake().await?; 125 126 match self.simple_handshaker.state { 127 ServerHandshakeState::Finish => { 128 self.state = ServerSessionState::ReadChunk; 129 remaining_bytes = self.simple_handshaker.get_remaining_bytes(); 130 } 131 _ => continue, 132 } 133 } 134 ServerSessionState::ReadChunk => { 135 if self.session_id > 0 { 136 // utils::print::printu8(net_io_data.clone()); 137 } 138 139 if remaining_bytes.len() > 0 { 140 let bytes = std::mem::replace(&mut remaining_bytes, BytesMut::new()); 141 self.unpacketizer.extend_data(&bytes[..]); 142 } else { 143 self.unpacketizer.extend_data(&net_io_data[..]); 144 } 145 146 loop { 147 let result = self.unpacketizer.read_chunks(); 148 let rv = match result { 149 Ok(val) => val, 150 Err(_) => { 151 break; 152 } 153 }; 154 155 match rv { 156 UnpackResult::Chunks(chunks) => { 157 for chunk_info in chunks.iter() { 158 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 159 let timestamp = chunk_info.message_header.timestamp; 160 161 let mut message_parser = MessageParser::new(chunk_info.clone()); 162 let mut msg = message_parser.parse()?; 163 164 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 165 .await?; 166 } 167 } 168 _ => {} 169 } 170 } 171 } 172 173 //when in play state, only transfer publisher's video/audio/metadta to player. 174 ServerSessionState::Play => loop { 175 if let Some(data) = self.data_consumer.recv().await { 176 match data { 177 ChannelData::Audio { timestamp, data } => { 178 print!("send audio data\n"); 179 self.send_audio(data, timestamp).await?; 180 } 181 ChannelData::Video { timestamp, data } => { 182 print!("send video data\n"); 183 self.send_video(data, timestamp).await?; 184 } 185 ChannelData::MetaData { body } => { 186 print!("send meta data\n"); 187 self.send_metadata(body).await?; 188 } 189 } 190 } else { 191 } 192 }, 193 } 194 } 195 196 //Ok(()) 197 } 198 199 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 200 let mut controlmessage = 201 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 202 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 203 204 Ok(()) 205 } 206 pub async fn send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> { 207 let mut chunk_info = ChunkInfo::new( 208 csid_type::AUDIO, 209 chunk_type::TYPE_0, 210 timestamp, 211 data.len() as u32, 212 msg_type_id::AUDIO, 213 0, 214 data, 215 ); 216 217 self.packetizer.write_chunk(&mut chunk_info).await?; 218 219 Ok(()) 220 } 221 222 pub async fn send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> { 223 let mut chunk_info = ChunkInfo::new( 224 csid_type::VIDEO, 225 chunk_type::TYPE_0, 226 timestamp, 227 data.len() as u32, 228 msg_type_id::VIDEO, 229 0, 230 data, 231 ); 232 233 self.packetizer.write_chunk(&mut chunk_info).await?; 234 235 Ok(()) 236 } 237 238 async fn send_metadata(&mut self, data: BytesMut) -> Result<(), SessionError> { 239 let mut chunk_info = ChunkInfo::new( 240 csid_type::DATA_AMF0_AMF3, 241 chunk_type::TYPE_0, 242 0, 243 data.len() as u32, 244 msg_type_id::DATA_AMF0, 245 0, 246 data, 247 ); 248 249 self.packetizer.write_chunk(&mut chunk_info).await?; 250 Ok(()) 251 } 252 pub async fn process_messages( 253 &mut self, 254 rtmp_msg: &mut RtmpMessageData, 255 msg_stream_id: &u32, 256 timestamp: &u32, 257 ) -> Result<(), SessionError> { 258 match rtmp_msg { 259 RtmpMessageData::Amf0Command { 260 command_name, 261 transaction_id, 262 command_object, 263 others, 264 } => { 265 self.process_amf0_command_message( 266 msg_stream_id, 267 command_name, 268 transaction_id, 269 command_object, 270 others, 271 ) 272 .await? 273 } 274 RtmpMessageData::SetChunkSize { chunk_size } => { 275 self.on_set_chunk_size(chunk_size.clone() as usize)?; 276 } 277 RtmpMessageData::AudioData { data } => { 278 self.on_audio_data(data, timestamp)?; 279 } 280 RtmpMessageData::VideoData { data } => { 281 self.on_video_data(data, timestamp)?; 282 } 283 RtmpMessageData::AmfData { raw_data } => { 284 self.on_amf_data(raw_data)?; 285 } 286 287 _ => {} 288 } 289 Ok(()) 290 } 291 292 pub async fn process_amf0_command_message( 293 &mut self, 294 stream_id: &u32, 295 command_name: &Amf0ValueType, 296 transaction_id: &Amf0ValueType, 297 command_object: &Amf0ValueType, 298 others: &mut Vec<Amf0ValueType>, 299 ) -> Result<(), SessionError> { 300 let empty_cmd_name = &String::new(); 301 let cmd_name = match command_name { 302 Amf0ValueType::UTF8String(str) => str, 303 _ => empty_cmd_name, 304 }; 305 306 let transaction_id = match transaction_id { 307 Amf0ValueType::Number(number) => number, 308 _ => &0.0, 309 }; 310 311 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 312 let obj = match command_object { 313 Amf0ValueType::Object(obj) => obj, 314 _ => &empty_cmd_obj, 315 }; 316 317 match cmd_name.as_str() { 318 "connect" => { 319 print!("connect ......."); 320 self.on_connect(&transaction_id, &obj).await?; 321 } 322 "createStream" => { 323 self.on_create_stream(transaction_id).await?; 324 } 325 "deleteStream" => { 326 if others.len() > 1 { 327 let stream_id = match others.pop() { 328 Some(val) => match val { 329 Amf0ValueType::Number(streamid) => streamid, 330 _ => 0.0, 331 }, 332 _ => 0.0, 333 }; 334 335 self.on_delete_stream(transaction_id, &stream_id).await?; 336 } 337 } 338 "play" => { 339 self.on_play(transaction_id, stream_id, others).await?; 340 } 341 "publish" => { 342 self.on_publish(transaction_id, stream_id, others).await?; 343 } 344 _ => {} 345 } 346 347 Ok(()) 348 } 349 350 fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> { 351 self.unpacketizer.update_max_chunk_size(chunk_size); 352 Ok(()) 353 } 354 355 async fn on_connect( 356 &mut self, 357 transaction_id: &f64, 358 command_obj: &HashMap<String, Amf0ValueType>, 359 ) -> Result<(), SessionError> { 360 let mut control_message = 361 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 362 control_message 363 .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 364 .await?; 365 control_message 366 .write_set_peer_bandwidth( 367 define::PEER_BANDWIDTH, 368 define::PeerBandWidthLimitType::DYNAMIC, 369 ) 370 .await?; 371 control_message.write_set_chunk_size(CHUNK_SIZE).await?; 372 373 let obj_encoding = command_obj.get("objectEncoding"); 374 let encoding = match obj_encoding { 375 Some(Amf0ValueType::Number(encoding)) => encoding, 376 _ => &define::OBJENCODING_AMF0, 377 }; 378 379 let app_name = command_obj.get("app"); 380 self.app_name = match app_name { 381 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 382 _ => { 383 return Err(SessionError { 384 value: SessionErrorValue::NoAppName, 385 }); 386 } 387 }; 388 389 let mut netconnection = NetConnection::new(BytesWriter::new()); 390 let data = netconnection.connect_response( 391 &transaction_id, 392 &define::FMSVER.to_string(), 393 &define::CAPABILITIES, 394 &String::from("NetConnection.Connect.Success"), 395 &define::LEVEL.to_string(), 396 &String::from("Connection Succeeded."), 397 encoding, 398 )?; 399 400 let mut chunk_info = ChunkInfo::new( 401 csid_type::COMMAND_AMF0_AMF3, 402 chunk_type::TYPE_0, 403 0, 404 data.len() as u32, 405 msg_type_id::COMMAND_AMF0, 406 0, 407 data, 408 ); 409 410 self.packetizer.write_chunk(&mut chunk_info).await?; 411 412 Ok(()) 413 } 414 415 pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 416 let mut netconnection = NetConnection::new(BytesWriter::new()); 417 let data = netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?; 418 419 let mut chunk_info = ChunkInfo::new( 420 csid_type::COMMAND_AMF0_AMF3, 421 chunk_type::TYPE_0, 422 0, 423 data.len() as u32, 424 msg_type_id::COMMAND_AMF0, 425 0, 426 data, 427 ); 428 429 self.packetizer.write_chunk(&mut chunk_info).await?; 430 431 Ok(()) 432 } 433 434 pub async fn on_delete_stream( 435 &mut self, 436 transaction_id: &f64, 437 stream_id: &f64, 438 ) -> Result<(), SessionError> { 439 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 440 netstream 441 .on_status( 442 transaction_id, 443 &"status".to_string(), 444 &"NetStream.DeleteStream.Suceess".to_string(), 445 &"".to_string(), 446 ) 447 .await?; 448 449 Ok(()) 450 } 451 pub async fn on_play( 452 &mut self, 453 transaction_id: &f64, 454 stream_id: &u32, 455 other_values: &mut Vec<Amf0ValueType>, 456 ) -> Result<(), SessionError> { 457 let length = other_values.len() as u8; 458 let mut index: u8 = 0; 459 460 let mut stream_name: Option<String> = None; 461 let mut start: Option<f64> = None; 462 let mut duration: Option<f64> = None; 463 let mut reset: Option<bool> = None; 464 465 loop { 466 if index >= length { 467 break; 468 } 469 index = index + 1; 470 stream_name = match other_values.remove(0) { 471 Amf0ValueType::UTF8String(val) => Some(val), 472 _ => None, 473 }; 474 475 if index >= length { 476 break; 477 } 478 index = index + 1; 479 start = match other_values.remove(0) { 480 Amf0ValueType::Number(val) => Some(val), 481 _ => None, 482 }; 483 484 if index >= length { 485 break; 486 } 487 index = index + 1; 488 duration = match other_values.remove(0) { 489 Amf0ValueType::Number(val) => Some(val), 490 _ => None, 491 }; 492 493 if index >= length { 494 break; 495 } 496 index = index + 1; 497 reset = match other_values.remove(0) { 498 Amf0ValueType::Boolean(val) => Some(val), 499 _ => None, 500 }; 501 break; 502 } 503 504 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 505 event_messages.write_stream_begin(stream_id.clone()).await?; 506 507 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 508 netstream 509 .on_status( 510 transaction_id, 511 &"status".to_string(), 512 &"NetStream.Play.Reset".to_string(), 513 &"reset".to_string(), 514 ) 515 .await?; 516 517 netstream 518 .on_status( 519 transaction_id, 520 &"status".to_string(), 521 &"NetStream.Play.Start".to_string(), 522 &"play start".to_string(), 523 ) 524 .await?; 525 526 netstream 527 .on_status( 528 transaction_id, 529 &"status".to_string(), 530 &"NetStream.Data.Start".to_string(), 531 &"data start.".to_string(), 532 ) 533 .await?; 534 535 netstream 536 .on_status( 537 transaction_id, 538 &"status".to_string(), 539 &"NetStream.Play.PublishNotify".to_string(), 540 &"play publish notify.".to_string(), 541 ) 542 .await?; 543 544 event_messages 545 .write_stream_is_record(stream_id.clone()) 546 .await?; 547 548 self.subscribe_from_channels(stream_name.unwrap()).await?; 549 self.state = ServerSessionState::Play; 550 551 Ok(()) 552 } 553 554 async fn subscribe_from_channels(&mut self, stream_name: String) -> Result<(), SessionError> { 555 let (sender, receiver) = oneshot::channel(); 556 let subscribe_event = ChannelEvent::Subscribe { 557 app_name: self.app_name.clone(), 558 stream_name, 559 responder: sender, 560 }; 561 562 let rv = self.event_producer.send(subscribe_event); 563 match rv { 564 Err(_) => { 565 return Err(SessionError { 566 value: SessionErrorValue::ChannelEventSendErr, 567 }) 568 } 569 _ => {} 570 } 571 572 match receiver.await { 573 Ok(consumer) => { 574 self.data_consumer = consumer; 575 } 576 Err(_) => {} 577 } 578 Ok(()) 579 } 580 581 pub async fn on_publish( 582 &mut self, 583 transaction_id: &f64, 584 stream_id: &u32, 585 other_values: &mut Vec<Amf0ValueType>, 586 ) -> Result<(), SessionError> { 587 let length = other_values.len(); 588 589 if length < 2 { 590 return Err(SessionError { 591 value: SessionErrorValue::Amf0ValueCountNotCorrect, 592 }); 593 } 594 595 let stream_name = match other_values.remove(0) { 596 Amf0ValueType::UTF8String(val) => val, 597 _ => { 598 return Err(SessionError { 599 value: SessionErrorValue::Amf0ValueCountNotCorrect, 600 }); 601 } 602 }; 603 604 let stream_type = match other_values.remove(0) { 605 Amf0ValueType::UTF8String(val) => val, 606 _ => { 607 return Err(SessionError { 608 value: SessionErrorValue::Amf0ValueCountNotCorrect, 609 }); 610 } 611 }; 612 613 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 614 event_messages.write_stream_begin(stream_id.clone()).await?; 615 616 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 617 netstream 618 .on_status( 619 transaction_id, 620 &"status".to_string(), 621 &"NetStream.Publish.Start".to_string(), 622 &"".to_string(), 623 ) 624 .await?; 625 626 print!("before publish_to_channels\n"); 627 self.publish_to_channels(stream_name).await?; 628 print!("after publish_to_channels\n"); 629 630 Ok(()) 631 } 632 633 async fn publish_to_channels(&mut self, stream_name: String) -> Result<(), SessionError> { 634 let (sender, receiver) = oneshot::channel(); 635 let publish_event = ChannelEvent::Publish { 636 app_name: self.app_name.clone(), 637 stream_name, 638 responder: sender, 639 }; 640 641 let rv = self.event_producer.send(publish_event); 642 match rv { 643 Err(_) => { 644 return Err(SessionError { 645 value: SessionErrorValue::ChannelEventSendErr, 646 }) 647 } 648 _ => {} 649 } 650 651 match receiver.await { 652 Ok(producer) => { 653 print!("set producer before\n"); 654 self.data_producer = producer; 655 print!("set producer after\n"); 656 } 657 Err(_) => {} 658 } 659 Ok(()) 660 } 661 662 pub fn on_video_data( 663 &mut self, 664 data: &mut BytesMut, 665 timestamp: &u32, 666 ) -> Result<(), SessionError> { 667 let data = ChannelData::Video { 668 timestamp: timestamp.clone(), 669 data: data.clone(), 670 }; 671 672 //print!("receive video data\n"); 673 match self.data_producer.send(data) { 674 Ok(size) => {} 675 Err(err) => { 676 format!("receive video err {}", err); 677 return Err(SessionError { 678 value: SessionErrorValue::SendChannelDataErr, 679 }); 680 } 681 } 682 683 Ok(()) 684 } 685 686 pub fn on_audio_data( 687 &mut self, 688 data: &mut BytesMut, 689 timestamp: &u32, 690 ) -> Result<(), SessionError> { 691 let data = ChannelData::Audio { 692 timestamp: timestamp.clone(), 693 data: data.clone(), 694 }; 695 696 match self.data_producer.send(data) { 697 Ok(size) => {} 698 Err(_) => { 699 return Err(SessionError { 700 value: SessionErrorValue::SendChannelDataErr, 701 }) 702 } 703 } 704 705 Ok(()) 706 } 707 708 pub fn on_amf_data(&mut self, body: &mut BytesMut) -> Result<(), SessionError> { 709 let data = ChannelData::MetaData { body: body.clone() }; 710 711 match self.data_producer.send(data) { 712 Ok(size) => {} 713 Err(_) => { 714 return Err(SessionError { 715 value: SessionErrorValue::SendChannelDataErr, 716 }) 717 } 718 } 719 // let mut metadata_handler = metadata::MetaData::default(); 720 721 // metadata_handler.save(body, values); 722 723 Ok(()) 724 } 725 } 726