1 use super::define; 2 use super::errors::SessionError; 3 use super::errors::SessionErrorValue; 4 use crate::chunk::packetizer::ChunkPacketizer; 5 use crate::chunk::{unpacketizer::ChunkUnpacketizer, ChunkInfo}; 6 use crate::handshake::handshake::{ComplexHandshakeServer, SimpleHandshakeServer}; 7 use crate::{amf0::Amf0ValueType, chunk::unpacketizer::UnpackResult}; 8 use crate::{ 9 chunk::define::CHUNK_SIZE, 10 chunk::define::{chunk_type, csid_type}, 11 }; 12 13 use crate::messages::define::msg_type_id; 14 use crate::messages::define::RtmpMessageData; 15 16 use crate::messages::parser::MessageParser; 17 use bytes::BytesMut; 18 19 use netio::bytes_writer::AsyncBytesWriter; 20 use netio::bytes_writer::BytesWriter; 21 use netio::netio::NetworkIO; 22 use std::{ time::Duration}; 23 24 use crate::channels::define::ChannelDataConsumer; 25 use crate::channels::define::ChannelDataPublisher; 26 use crate::channels::define::ChannelEvent; 27 use crate::channels::define::ChannelEventPublisher; 28 // use crate::channels::define::PlayerConsumer; 29 use crate::netconnection::commands::NetConnection; 30 use crate::netstream::writer::NetStreamWriter; 31 use crate::protocol_control_messages::writer::ProtocolControlMessagesWriter; 32 33 use crate::user_control_messages::writer::EventMessagesWriter; 34 35 use std::collections::HashMap; 36 37 use tokio::net::TcpStream; 38 use tokio::sync::broadcast; 39 40 use std::sync::Arc; 41 use tokio::sync::oneshot; 42 use tokio::sync::Mutex; 43 44 use crate::channels::define::ChannelData; 45 46 use crate::handshake::handshake::ServerHandshakeState; 47 48 enum ServerSessionState { 49 Handshake, 50 ReadChunk, 51 // OnConnect, 52 // OnCreateStream, 53 //Publish, 54 Play, 55 } 56 57 pub struct ServerSession { 58 app_name: String, 59 60 io: Arc<Mutex<NetworkIO>>, 61 simple_handshaker: SimpleHandshakeServer, 62 complex_handshaker: ComplexHandshakeServer, 63 64 packetizer: ChunkPacketizer, 65 unpacketizer: ChunkUnpacketizer, 66 67 state: ServerSessionState, 68 69 event_producer: ChannelEventPublisher, 70 71 //send video, audio or metadata from publish server session to player server sessions 72 data_producer: ChannelDataPublisher, 73 //receive video, audio or metadata from publish server session and send out to player 74 data_consumer: ChannelDataConsumer, 75 76 session_id: u8, 77 } 78 79 impl ServerSession { 80 pub fn new( 81 stream: TcpStream, 82 event_producer: ChannelEventPublisher, 83 timeout: Duration, 84 session_id: u8, 85 ) -> Self { 86 let net_io = Arc::new(Mutex::new(NetworkIO::new(stream, timeout))); 87 //only used for init,since I don't found a better way to deal with this. 88 let (init_producer, init_consumer) = broadcast::channel(1); 89 90 // let reader = BytesReader::new(BytesMut::new()); 91 92 Self { 93 app_name: String::from(""), 94 95 io: Arc::clone(&net_io), 96 simple_handshaker: SimpleHandshakeServer::new(Arc::clone(&net_io)), 97 complex_handshaker: ComplexHandshakeServer::new(Arc::clone(&net_io)), 98 packetizer: ChunkPacketizer::new(Arc::clone(&net_io)), 99 unpacketizer: ChunkUnpacketizer::new(), 100 101 state: ServerSessionState::Handshake, 102 103 event_producer, 104 data_producer: init_producer, 105 data_consumer: init_consumer, 106 session_id: session_id, 107 } 108 } 109 110 pub async fn run(&mut self) -> Result<(), SessionError> { 111 let duration = Duration::new(10, 10); 112 113 let mut remaining_bytes: BytesMut = BytesMut::new(); 114 115 loop { 116 let mut net_io_data: BytesMut = BytesMut::new(); 117 if remaining_bytes.len() <= 0 { 118 net_io_data = self.io.lock().await.read().await?; 119 120 //utils::print::print(net_io_data.clone()); 121 } 122 123 match self.state { 124 ServerSessionState::Handshake => { 125 if self.session_id > 0 { 126 // utils::print::printu8(net_io_data.clone()); 127 } 128 129 self.simple_handshaker.extend_data(&net_io_data[..]); 130 self.simple_handshaker.handshake().await?; 131 132 match self.simple_handshaker.state { 133 ServerHandshakeState::Finish => { 134 self.state = ServerSessionState::ReadChunk; 135 remaining_bytes = self.simple_handshaker.get_remaining_bytes(); 136 } 137 _ => continue, 138 } 139 } 140 ServerSessionState::ReadChunk => { 141 if self.session_id > 0 { 142 // utils::print::printu8(net_io_data.clone()); 143 } 144 145 if remaining_bytes.len() > 0 { 146 let bytes = std::mem::replace(&mut remaining_bytes, BytesMut::new()); 147 self.unpacketizer.extend_data(&bytes[..]); 148 } else { 149 self.unpacketizer.extend_data(&net_io_data[..]); 150 } 151 152 loop { 153 let result = self.unpacketizer.read_chunks(); 154 let rv = match result { 155 Ok(val) => val, 156 Err(_) => { 157 break; 158 } 159 }; 160 161 match rv { 162 UnpackResult::Chunks(chunks) => { 163 for chunk_info in chunks.iter() { 164 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 165 let timestamp = chunk_info.message_header.timestamp; 166 167 let mut message_parser = MessageParser::new(chunk_info.clone()); 168 let mut msg = message_parser.parse()?; 169 170 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 171 .await?; 172 } 173 } 174 _ => {} 175 } 176 } 177 } 178 179 //when in play state, only transfer publisher's video/audio/metadta to player. 180 ServerSessionState::Play => loop { 181 let data = self.data_consumer.recv().await; 182 183 match data { 184 Ok(val) => match val { 185 ChannelData::Audio { timestamp, data } => { 186 print!("send audio data\n"); 187 self.send_audio(data, timestamp).await?; 188 } 189 ChannelData::Video { timestamp, data } => { 190 print!("send video data\n"); 191 self.send_video(data, timestamp).await?; 192 } 193 ChannelData::MetaData { body } => { 194 print!("send meta data\n"); 195 self.send_metadata(body).await?; 196 } 197 }, 198 Err(err) => {} 199 } 200 }, 201 } 202 } 203 204 //Ok(()) 205 } 206 207 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 208 let mut controlmessage = 209 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 210 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 211 212 Ok(()) 213 } 214 pub async fn send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> { 215 let mut chunk_info = ChunkInfo::new( 216 csid_type::AUDIO, 217 chunk_type::TYPE_0, 218 timestamp, 219 data.len() as u32, 220 msg_type_id::AUDIO, 221 0, 222 data, 223 ); 224 225 self.packetizer.write_chunk(&mut chunk_info).await?; 226 227 Ok(()) 228 } 229 230 pub async fn send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> { 231 let mut chunk_info = ChunkInfo::new( 232 csid_type::VIDEO, 233 chunk_type::TYPE_0, 234 timestamp, 235 data.len() as u32, 236 msg_type_id::VIDEO, 237 0, 238 data, 239 ); 240 241 self.packetizer.write_chunk(&mut chunk_info).await?; 242 243 Ok(()) 244 } 245 246 async fn send_metadata(&mut self, data: BytesMut) -> Result<(), SessionError> { 247 let mut chunk_info = ChunkInfo::new( 248 csid_type::DATA_AMF0_AMF3, 249 chunk_type::TYPE_0, 250 0, 251 data.len() as u32, 252 msg_type_id::DATA_AMF0, 253 0, 254 data, 255 ); 256 257 self.packetizer.write_chunk(&mut chunk_info).await?; 258 Ok(()) 259 } 260 pub async fn process_messages( 261 &mut self, 262 rtmp_msg: &mut RtmpMessageData, 263 msg_stream_id: &u32, 264 timestamp: &u32, 265 ) -> Result<(), SessionError> { 266 match rtmp_msg { 267 RtmpMessageData::Amf0Command { 268 command_name, 269 transaction_id, 270 command_object, 271 others, 272 } => { 273 self.process_amf0_command_message( 274 msg_stream_id, 275 command_name, 276 transaction_id, 277 command_object, 278 others, 279 ) 280 .await? 281 } 282 RtmpMessageData::SetChunkSize { chunk_size } => { 283 self.on_set_chunk_size(chunk_size.clone() as usize)?; 284 } 285 RtmpMessageData::AudioData { data } => { 286 self.on_audio_data(data, timestamp)?; 287 } 288 RtmpMessageData::VideoData { data } => { 289 self.on_video_data(data, timestamp)?; 290 } 291 RtmpMessageData::AmfData { raw_data } => { 292 self.on_amf_data(raw_data)?; 293 } 294 295 _ => {} 296 } 297 Ok(()) 298 } 299 300 pub async fn process_amf0_command_message( 301 &mut self, 302 stream_id: &u32, 303 command_name: &Amf0ValueType, 304 transaction_id: &Amf0ValueType, 305 command_object: &Amf0ValueType, 306 others: &mut Vec<Amf0ValueType>, 307 ) -> Result<(), SessionError> { 308 let empty_cmd_name = &String::new(); 309 let cmd_name = match command_name { 310 Amf0ValueType::UTF8String(str) => str, 311 _ => empty_cmd_name, 312 }; 313 314 let transaction_id = match transaction_id { 315 Amf0ValueType::Number(number) => number, 316 _ => &0.0, 317 }; 318 319 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 320 let obj = match command_object { 321 Amf0ValueType::Object(obj) => obj, 322 _ => &empty_cmd_obj, 323 }; 324 325 match cmd_name.as_str() { 326 "connect" => { 327 print!("connect ......."); 328 self.on_connect(&transaction_id, &obj).await?; 329 } 330 "createStream" => { 331 self.on_create_stream(transaction_id).await?; 332 } 333 "deleteStream" => { 334 if others.len() > 1 { 335 let stream_id = match others.pop() { 336 Some(val) => match val { 337 Amf0ValueType::Number(streamid) => streamid, 338 _ => 0.0, 339 }, 340 _ => 0.0, 341 }; 342 343 self.on_delete_stream(transaction_id, &stream_id).await?; 344 } 345 } 346 "play" => { 347 self.on_play(transaction_id, stream_id, others).await?; 348 } 349 "publish" => { 350 self.on_publish(transaction_id, stream_id, others).await?; 351 } 352 _ => {} 353 } 354 355 Ok(()) 356 } 357 358 fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> { 359 self.unpacketizer.update_max_chunk_size(chunk_size); 360 Ok(()) 361 } 362 363 async fn on_connect( 364 &mut self, 365 transaction_id: &f64, 366 command_obj: &HashMap<String, Amf0ValueType>, 367 ) -> Result<(), SessionError> { 368 let mut control_message = 369 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 370 control_message 371 .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 372 .await?; 373 control_message 374 .write_set_peer_bandwidth( 375 define::PEER_BANDWIDTH, 376 define::PeerBandWidthLimitType::DYNAMIC, 377 ) 378 .await?; 379 control_message.write_set_chunk_size(CHUNK_SIZE).await?; 380 381 let obj_encoding = command_obj.get("objectEncoding"); 382 let encoding = match obj_encoding { 383 Some(Amf0ValueType::Number(encoding)) => encoding, 384 _ => &define::OBJENCODING_AMF0, 385 }; 386 387 let app_name = command_obj.get("app"); 388 self.app_name = match app_name { 389 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 390 _ => { 391 return Err(SessionError { 392 value: SessionErrorValue::NoAppName, 393 }); 394 } 395 }; 396 397 let mut netconnection = NetConnection::new(BytesWriter::new()); 398 let data = netconnection.connect_response( 399 &transaction_id, 400 &define::FMSVER.to_string(), 401 &define::CAPABILITIES, 402 &String::from("NetConnection.Connect.Success"), 403 &define::LEVEL.to_string(), 404 &String::from("Connection Succeeded."), 405 encoding, 406 )?; 407 408 let mut chunk_info = ChunkInfo::new( 409 csid_type::COMMAND_AMF0_AMF3, 410 chunk_type::TYPE_0, 411 0, 412 data.len() as u32, 413 msg_type_id::COMMAND_AMF0, 414 0, 415 data, 416 ); 417 418 self.packetizer.write_chunk(&mut chunk_info).await?; 419 420 Ok(()) 421 } 422 423 pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 424 let mut netconnection = NetConnection::new(BytesWriter::new()); 425 let data = netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?; 426 427 let mut chunk_info = ChunkInfo::new( 428 csid_type::COMMAND_AMF0_AMF3, 429 chunk_type::TYPE_0, 430 0, 431 data.len() as u32, 432 msg_type_id::COMMAND_AMF0, 433 0, 434 data, 435 ); 436 437 self.packetizer.write_chunk(&mut chunk_info).await?; 438 439 Ok(()) 440 } 441 442 pub async fn on_delete_stream( 443 &mut self, 444 transaction_id: &f64, 445 stream_id: &f64, 446 ) -> Result<(), SessionError> { 447 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 448 netstream 449 .on_status( 450 transaction_id, 451 &"status".to_string(), 452 &"NetStream.DeleteStream.Suceess".to_string(), 453 &"".to_string(), 454 ) 455 .await?; 456 457 Ok(()) 458 } 459 pub async fn on_play( 460 &mut self, 461 transaction_id: &f64, 462 stream_id: &u32, 463 other_values: &mut Vec<Amf0ValueType>, 464 ) -> Result<(), SessionError> { 465 let length = other_values.len() as u8; 466 let mut index: u8 = 0; 467 468 let mut stream_name: Option<String> = None; 469 let mut start: Option<f64> = None; 470 let mut duration: Option<f64> = None; 471 let mut reset: Option<bool> = None; 472 473 loop { 474 if index >= length { 475 break; 476 } 477 index = index + 1; 478 stream_name = match other_values.remove(0) { 479 Amf0ValueType::UTF8String(val) => Some(val), 480 _ => None, 481 }; 482 483 if index >= length { 484 break; 485 } 486 index = index + 1; 487 start = match other_values.remove(0) { 488 Amf0ValueType::Number(val) => Some(val), 489 _ => None, 490 }; 491 492 if index >= length { 493 break; 494 } 495 index = index + 1; 496 duration = match other_values.remove(0) { 497 Amf0ValueType::Number(val) => Some(val), 498 _ => None, 499 }; 500 501 if index >= length { 502 break; 503 } 504 index = index + 1; 505 reset = match other_values.remove(0) { 506 Amf0ValueType::Boolean(val) => Some(val), 507 _ => None, 508 }; 509 break; 510 } 511 512 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 513 event_messages.write_stream_begin(stream_id.clone()).await?; 514 515 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 516 netstream 517 .on_status( 518 transaction_id, 519 &"status".to_string(), 520 &"NetStream.Play.Reset".to_string(), 521 &"reset".to_string(), 522 ) 523 .await?; 524 525 netstream 526 .on_status( 527 transaction_id, 528 &"status".to_string(), 529 &"NetStream.Play.Start".to_string(), 530 &"play start".to_string(), 531 ) 532 .await?; 533 534 netstream 535 .on_status( 536 transaction_id, 537 &"status".to_string(), 538 &"NetStream.Data.Start".to_string(), 539 &"data start.".to_string(), 540 ) 541 .await?; 542 543 netstream 544 .on_status( 545 transaction_id, 546 &"status".to_string(), 547 &"NetStream.Play.PublishNotify".to_string(), 548 &"play publish notify.".to_string(), 549 ) 550 .await?; 551 552 event_messages 553 .write_stream_is_record(stream_id.clone()) 554 .await?; 555 556 self.subscribe_from_channels(stream_name.unwrap()).await?; 557 self.state = ServerSessionState::Play; 558 559 Ok(()) 560 } 561 562 async fn subscribe_from_channels(&mut self, stream_name: String) -> Result<(), SessionError> { 563 let (sender, receiver) = oneshot::channel(); 564 let subscribe_event = ChannelEvent::Subscribe { 565 app_name: self.app_name.clone(), 566 stream_name, 567 responder: sender, 568 }; 569 570 let rv = self.event_producer.send(subscribe_event); 571 match rv { 572 Err(_) => { 573 return Err(SessionError { 574 value: SessionErrorValue::ChannelEventSendErr, 575 }) 576 } 577 _ => {} 578 } 579 580 match receiver.await { 581 Ok(consumer) => { 582 self.data_consumer = consumer; 583 } 584 Err(_) => {} 585 } 586 Ok(()) 587 } 588 589 pub async fn on_publish( 590 &mut self, 591 transaction_id: &f64, 592 stream_id: &u32, 593 other_values: &mut Vec<Amf0ValueType>, 594 ) -> Result<(), SessionError> { 595 let length = other_values.len(); 596 597 if length < 2 { 598 return Err(SessionError { 599 value: SessionErrorValue::Amf0ValueCountNotCorrect, 600 }); 601 } 602 603 let stream_name = match other_values.remove(0) { 604 Amf0ValueType::UTF8String(val) => val, 605 _ => { 606 return Err(SessionError { 607 value: SessionErrorValue::Amf0ValueCountNotCorrect, 608 }); 609 } 610 }; 611 612 let stream_type = match other_values.remove(0) { 613 Amf0ValueType::UTF8String(val) => val, 614 _ => { 615 return Err(SessionError { 616 value: SessionErrorValue::Amf0ValueCountNotCorrect, 617 }); 618 } 619 }; 620 621 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 622 event_messages.write_stream_begin(stream_id.clone()).await?; 623 624 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 625 netstream 626 .on_status( 627 transaction_id, 628 &"status".to_string(), 629 &"NetStream.Publish.Start".to_string(), 630 &"".to_string(), 631 ) 632 .await?; 633 634 print!("before publish_to_channels\n"); 635 self.publish_to_channels(stream_name).await?; 636 print!("after publish_to_channels\n"); 637 638 Ok(()) 639 } 640 641 async fn publish_to_channels(&mut self, stream_name: String) -> Result<(), SessionError> { 642 let (sender, receiver) = oneshot::channel(); 643 let publish_event = ChannelEvent::Publish { 644 app_name: self.app_name.clone(), 645 stream_name, 646 responder: sender, 647 }; 648 649 let rv = self.event_producer.send(publish_event); 650 match rv { 651 Err(_) => { 652 return Err(SessionError { 653 value: SessionErrorValue::ChannelEventSendErr, 654 }) 655 } 656 _ => {} 657 } 658 659 match receiver.await { 660 Ok(producer) => { 661 print!("set producer before\n"); 662 self.data_producer = producer; 663 print!("set producer after\n"); 664 } 665 Err(_) => {} 666 } 667 Ok(()) 668 } 669 670 pub fn on_video_data( 671 &mut self, 672 data: &mut BytesMut, 673 timestamp: &u32, 674 ) -> Result<(), SessionError> { 675 let data = ChannelData::Video { 676 timestamp: timestamp.clone(), 677 data: data.clone(), 678 }; 679 680 print!("receive video data\n"); 681 match self.data_producer.send(data) { 682 Ok(size) => {} 683 Err(err) => { 684 format!("receive video err {}", err); 685 return Err(SessionError { 686 value: SessionErrorValue::SendChannelDataErr, 687 }); 688 } 689 } 690 691 Ok(()) 692 } 693 694 pub fn on_audio_data( 695 &mut self, 696 data: &mut BytesMut, 697 timestamp: &u32, 698 ) -> Result<(), SessionError> { 699 let data = ChannelData::Audio { 700 timestamp: timestamp.clone(), 701 data: data.clone(), 702 }; 703 704 match self.data_producer.send(data) { 705 Ok(size) => {} 706 Err(_) => { 707 return Err(SessionError { 708 value: SessionErrorValue::SendChannelDataErr, 709 }) 710 } 711 } 712 713 Ok(()) 714 } 715 716 pub fn on_amf_data(&mut self, body: &mut BytesMut) -> Result<(), SessionError> { 717 let data = ChannelData::MetaData { body: body.clone() }; 718 719 match self.data_producer.send(data) { 720 Ok(size) => {} 721 Err(_) => { 722 return Err(SessionError { 723 value: SessionErrorValue::SendChannelDataErr, 724 }) 725 } 726 } 727 // let mut metadata_handler = metadata::MetaData::default(); 728 729 // metadata_handler.save(body, values); 730 731 Ok(()) 732 } 733 } 734