1 use super::define; 2 use super::errors::SessionError; 3 use super::errors::SessionErrorValue; 4 use crate::chunk::{unpacketizer::ChunkUnpacketizer, ChunkInfo}; 5 use crate::handshake::handshake::{ComplexHandshakeServer, SimpleHandshakeServer}; 6 use crate::{amf0::Amf0ValueType, chunk::unpacketizer::UnpackResult}; 7 use crate::{channels, chunk::packetizer::ChunkPacketizer}; 8 use crate::{ 9 chunk::define::CHUNK_SIZE, 10 chunk::define::{chunk_type, csid_type}, 11 }; 12 13 use crate::messages::define::msg_type_id; 14 use crate::messages::define::RtmpMessageData; 15 16 use crate::messages::parser::MessageParser; 17 use bytes::BytesMut; 18 19 use netio::bytes_writer::AsyncBytesWriter; 20 use netio::bytes_writer::BytesWriter; 21 use netio::netio::NetworkIO; 22 use std::{borrow::BorrowMut, time::Duration}; 23 24 use crate::channels::define::ChannelEvent; 25 use crate::channels::define::MultiConsumerForData; 26 use crate::channels::define::MultiProducerForEvent; 27 use crate::channels::define::SingleProducerForData; 28 use crate::netconnection::commands::NetConnection; 29 use crate::netstream::writer::NetStreamWriter; 30 use crate::protocol_control_messages::writer::ProtocolControlMessagesWriter; 31 32 use crate::user_control_messages::writer::EventMessagesWriter; 33 34 use std::collections::HashMap; 35 36 use tokio::net::TcpStream; 37 use tokio::sync::broadcast; 38 39 use std::sync::Arc; 40 use tokio::sync::oneshot; 41 use tokio::sync::Mutex; 42 43 use crate::channels::define::ChannelData; 44 45 use crate::handshake::handshake::ServerHandshakeState; 46 47 use crate::cache::metadata; 48 49 use crate::utils; 50 use log::{debug, log, log_enabled, Level}; 51 use std::fmt; 52 53 enum ServerSessionState { 54 Handshake, 55 ReadChunk, 56 // OnConnect, 57 // OnCreateStream, 58 //Publish, 59 Play, 60 } 61 62 pub struct ServerSession { 63 app_name: String, 64 65 io: Arc<Mutex<NetworkIO>>, 66 simple_handshaker: SimpleHandshakeServer, 67 complex_handshaker: ComplexHandshakeServer, 68 69 packetizer: ChunkPacketizer, 70 unpacketizer: ChunkUnpacketizer, 71 72 state: ServerSessionState, 73 74 event_producer: MultiProducerForEvent, 75 76 //send video, audio or metadata from publish server session to player server sessions 77 data_producer: SingleProducerForData, 78 //receive video, audio or metadata from publish server session and send out to player 79 data_consumer: MultiConsumerForData, 80 81 session_id: u8, 82 } 83 84 impl ServerSession { 85 pub fn new( 86 stream: TcpStream, 87 event_producer: MultiProducerForEvent, 88 timeout: Duration, 89 session_id: u8, 90 ) -> Self { 91 let net_io = Arc::new(Mutex::new(NetworkIO::new(stream, timeout))); 92 //only used for init,since I don't found a better way to deal with this. 93 let (init_producer, init_consumer) = broadcast::channel(1); 94 // let reader = BytesReader::new(BytesMut::new()); 95 96 Self { 97 app_name: String::from(""), 98 99 io: Arc::clone(&net_io), 100 simple_handshaker: SimpleHandshakeServer::new(Arc::clone(&net_io)), 101 complex_handshaker: ComplexHandshakeServer::new(Arc::clone(&net_io)), 102 packetizer: ChunkPacketizer::new(Arc::clone(&net_io)), 103 unpacketizer: ChunkUnpacketizer::new(), 104 105 state: ServerSessionState::Handshake, 106 107 event_producer, 108 data_producer: init_producer, 109 data_consumer: init_consumer, 110 session_id: session_id, 111 } 112 } 113 114 pub async fn run(&mut self) -> Result<(), SessionError> { 115 let duration = Duration::new(10, 10); 116 117 let mut remaining_bytes: BytesMut = BytesMut::new(); 118 119 loop { 120 let mut net_io_data: BytesMut = BytesMut::new(); 121 if remaining_bytes.len() <= 0 { 122 net_io_data = self.io.lock().await.read().await?; 123 124 //utils::print::print(net_io_data.clone()); 125 } 126 127 match self.state { 128 ServerSessionState::Handshake => { 129 if self.session_id > 0 { 130 // utils::print::printu8(net_io_data.clone()); 131 } 132 133 self.simple_handshaker.extend_data(&net_io_data[..]); 134 self.simple_handshaker.handshake().await?; 135 136 match self.simple_handshaker.state { 137 ServerHandshakeState::Finish => { 138 self.state = ServerSessionState::ReadChunk; 139 remaining_bytes = self.simple_handshaker.get_remaining_bytes(); 140 } 141 _ => continue, 142 } 143 } 144 ServerSessionState::ReadChunk => { 145 if self.session_id > 0 { 146 // utils::print::printu8(net_io_data.clone()); 147 } 148 149 if remaining_bytes.len() > 0 { 150 let bytes = std::mem::replace(&mut remaining_bytes, BytesMut::new()); 151 self.unpacketizer.extend_data(&bytes[..]); 152 } else { 153 self.unpacketizer.extend_data(&net_io_data[..]); 154 } 155 156 loop { 157 let result = self.unpacketizer.read_chunks(); 158 let rv = match result { 159 Ok(val) => val, 160 Err(_) => { 161 break; 162 } 163 }; 164 165 match rv { 166 UnpackResult::Chunks(chunks) => { 167 for chunk_info in chunks.iter() { 168 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 169 let timestamp = chunk_info.message_header.timestamp; 170 171 let mut message_parser = MessageParser::new(chunk_info.clone()); 172 let mut msg = message_parser.parse()?; 173 174 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 175 .await?; 176 } 177 } 178 _ => {} 179 } 180 } 181 } 182 183 //when in play state, only transfer publisher's video/audio/metadta to player. 184 ServerSessionState::Play => loop { 185 let data = self.data_consumer.recv().await; 186 187 match data { 188 Ok(val) => match val { 189 ChannelData::Audio { timestamp, data } => { 190 print!("send audio data\n"); 191 self.send_audio(data, timestamp).await?; 192 } 193 ChannelData::Video { timestamp, data } => { 194 print!("send video data\n"); 195 self.send_video(data, timestamp).await?; 196 } 197 ChannelData::MetaData {} => {} 198 }, 199 Err(err) => {} 200 } 201 }, 202 } 203 } 204 205 //Ok(()) 206 } 207 208 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 209 let mut controlmessage = 210 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 211 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 212 213 Ok(()) 214 } 215 pub async fn send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> { 216 let mut chunk_info = ChunkInfo::new( 217 csid_type::AUDIO, 218 chunk_type::TYPE_0, 219 timestamp, 220 data.len() as u32, 221 msg_type_id::AUDIO, 222 0, 223 data, 224 ); 225 226 self.packetizer.write_chunk(&mut chunk_info).await?; 227 228 Ok(()) 229 } 230 231 pub async fn send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> { 232 let mut chunk_info = ChunkInfo::new( 233 csid_type::VIDEO, 234 chunk_type::TYPE_0, 235 timestamp, 236 data.len() as u32, 237 msg_type_id::VIDEO, 238 0, 239 data, 240 ); 241 242 self.packetizer.write_chunk(&mut chunk_info).await?; 243 244 Ok(()) 245 } 246 pub async fn process_messages( 247 &mut self, 248 rtmp_msg: &mut RtmpMessageData, 249 msg_stream_id: &u32, 250 timestamp: &u32, 251 ) -> Result<(), SessionError> { 252 match rtmp_msg { 253 RtmpMessageData::Amf0Command { 254 command_name, 255 transaction_id, 256 command_object, 257 others, 258 } => { 259 self.process_amf0_command_message( 260 msg_stream_id, 261 command_name, 262 transaction_id, 263 command_object, 264 others, 265 ) 266 .await? 267 } 268 RtmpMessageData::SetChunkSize { chunk_size } => { 269 self.on_set_chunk_size(chunk_size.clone() as usize)?; 270 } 271 RtmpMessageData::AudioData { data } => { 272 self.on_audio_data(data, timestamp)?; 273 } 274 RtmpMessageData::VideoData { data } => { 275 self.on_video_data(data, timestamp)?; 276 } 277 RtmpMessageData::AmfData { raw_data, values } => { 278 self.on_amf_data(raw_data, values)?; 279 } 280 281 _ => {} 282 } 283 Ok(()) 284 } 285 286 pub async fn process_amf0_command_message( 287 &mut self, 288 stream_id: &u32, 289 command_name: &Amf0ValueType, 290 transaction_id: &Amf0ValueType, 291 command_object: &Amf0ValueType, 292 others: &mut Vec<Amf0ValueType>, 293 ) -> Result<(), SessionError> { 294 let empty_cmd_name = &String::new(); 295 let cmd_name = match command_name { 296 Amf0ValueType::UTF8String(str) => str, 297 _ => empty_cmd_name, 298 }; 299 300 let transaction_id = match transaction_id { 301 Amf0ValueType::Number(number) => number, 302 _ => &0.0, 303 }; 304 305 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 306 let obj = match command_object { 307 Amf0ValueType::Object(obj) => obj, 308 _ => &empty_cmd_obj, 309 }; 310 311 match cmd_name.as_str() { 312 "connect" => { 313 print!("connect ......."); 314 self.on_connect(&transaction_id, &obj).await?; 315 } 316 "createStream" => { 317 self.on_create_stream(transaction_id).await?; 318 } 319 "deleteStream" => { 320 if others.len() > 1 { 321 let stream_id = match others.pop() { 322 Some(val) => match val { 323 Amf0ValueType::Number(streamid) => streamid, 324 _ => 0.0, 325 }, 326 _ => 0.0, 327 }; 328 329 self.on_delete_stream(transaction_id, &stream_id).await?; 330 } 331 } 332 "play" => { 333 self.on_play(transaction_id, stream_id, others).await?; 334 } 335 "publish" => { 336 self.on_publish(transaction_id, stream_id, others).await?; 337 } 338 _ => {} 339 } 340 341 Ok(()) 342 } 343 344 fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> { 345 self.unpacketizer.update_max_chunk_size(chunk_size); 346 Ok(()) 347 } 348 349 async fn on_connect( 350 &mut self, 351 transaction_id: &f64, 352 command_obj: &HashMap<String, Amf0ValueType>, 353 ) -> Result<(), SessionError> { 354 let mut control_message = 355 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 356 control_message 357 .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 358 .await?; 359 control_message 360 .write_set_peer_bandwidth( 361 define::PEER_BANDWIDTH, 362 define::PeerBandWidthLimitType::DYNAMIC, 363 ) 364 .await?; 365 control_message.write_set_chunk_size(CHUNK_SIZE).await?; 366 367 let obj_encoding = command_obj.get("objectEncoding"); 368 let encoding = match obj_encoding { 369 Some(Amf0ValueType::Number(encoding)) => encoding, 370 _ => &define::OBJENCODING_AMF0, 371 }; 372 373 let app_name = command_obj.get("app"); 374 self.app_name = match app_name { 375 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 376 _ => { 377 return Err(SessionError { 378 value: SessionErrorValue::NoAppName, 379 }); 380 } 381 }; 382 383 let mut netconnection = NetConnection::new(BytesWriter::new()); 384 let data = netconnection.connect_response( 385 &transaction_id, 386 &define::FMSVER.to_string(), 387 &define::CAPABILITIES, 388 &String::from("NetConnection.Connect.Success"), 389 &define::LEVEL.to_string(), 390 &String::from("Connection Succeeded."), 391 encoding, 392 )?; 393 394 let mut chunk_info = ChunkInfo::new( 395 csid_type::COMMAND_AMF0_AMF3, 396 chunk_type::TYPE_0, 397 0, 398 data.len() as u32, 399 msg_type_id::COMMAND_AMF0, 400 0, 401 data, 402 ); 403 404 self.packetizer.write_chunk(&mut chunk_info).await?; 405 406 Ok(()) 407 } 408 409 pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 410 let mut netconnection = NetConnection::new(BytesWriter::new()); 411 let data = netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?; 412 413 let mut chunk_info = ChunkInfo::new( 414 csid_type::COMMAND_AMF0_AMF3, 415 chunk_type::TYPE_0, 416 0, 417 data.len() as u32, 418 msg_type_id::COMMAND_AMF0, 419 0, 420 data, 421 ); 422 423 self.packetizer.write_chunk(&mut chunk_info).await?; 424 425 Ok(()) 426 } 427 428 pub async fn on_delete_stream( 429 &mut self, 430 transaction_id: &f64, 431 stream_id: &f64, 432 ) -> Result<(), SessionError> { 433 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 434 netstream 435 .on_status( 436 transaction_id, 437 &"status".to_string(), 438 &"NetStream.DeleteStream.Suceess".to_string(), 439 &"".to_string(), 440 ) 441 .await?; 442 443 Ok(()) 444 } 445 pub async fn on_play( 446 &mut self, 447 transaction_id: &f64, 448 stream_id: &u32, 449 other_values: &mut Vec<Amf0ValueType>, 450 ) -> Result<(), SessionError> { 451 let length = other_values.len() as u8; 452 let mut index: u8 = 0; 453 454 let mut stream_name: Option<String> = None; 455 let mut start: Option<f64> = None; 456 let mut duration: Option<f64> = None; 457 let mut reset: Option<bool> = None; 458 459 loop { 460 if index >= length { 461 break; 462 } 463 index = index + 1; 464 stream_name = match other_values.remove(0) { 465 Amf0ValueType::UTF8String(val) => Some(val), 466 _ => None, 467 }; 468 469 if index >= length { 470 break; 471 } 472 index = index + 1; 473 start = match other_values.remove(0) { 474 Amf0ValueType::Number(val) => Some(val), 475 _ => None, 476 }; 477 478 if index >= length { 479 break; 480 } 481 index = index + 1; 482 duration = match other_values.remove(0) { 483 Amf0ValueType::Number(val) => Some(val), 484 _ => None, 485 }; 486 487 if index >= length { 488 break; 489 } 490 index = index + 1; 491 reset = match other_values.remove(0) { 492 Amf0ValueType::Boolean(val) => Some(val), 493 _ => None, 494 }; 495 break; 496 } 497 498 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 499 event_messages.write_stream_begin(stream_id.clone()).await?; 500 501 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 502 netstream 503 .on_status( 504 transaction_id, 505 &"status".to_string(), 506 &"NetStream.Play.Reset".to_string(), 507 &"reset".to_string(), 508 ) 509 .await?; 510 511 netstream 512 .on_status( 513 transaction_id, 514 &"status".to_string(), 515 &"NetStream.Play.Start".to_string(), 516 &"play start".to_string(), 517 ) 518 .await?; 519 520 netstream 521 .on_status( 522 transaction_id, 523 &"status".to_string(), 524 &"NetStream.Data.Start".to_string(), 525 &"data start.".to_string(), 526 ) 527 .await?; 528 529 netstream 530 .on_status( 531 transaction_id, 532 &"status".to_string(), 533 &"NetStream.Play.PublishNotify".to_string(), 534 &"play publish notify.".to_string(), 535 ) 536 .await?; 537 538 event_messages 539 .write_stream_is_record(stream_id.clone()) 540 .await?; 541 542 self.subscribe_from_channels(stream_name.unwrap()).await?; 543 self.state = ServerSessionState::Play; 544 545 Ok(()) 546 } 547 548 async fn subscribe_from_channels(&mut self, stream_name: String) -> Result<(), SessionError> { 549 let (sender, receiver) = oneshot::channel(); 550 let subscribe_event = ChannelEvent::Subscribe { 551 app_name: self.app_name.clone(), 552 stream_name, 553 responder: sender, 554 }; 555 556 let rv = self.event_producer.send(subscribe_event); 557 match rv { 558 Err(_) => { 559 return Err(SessionError { 560 value: SessionErrorValue::ChannelEventSendErr, 561 }) 562 } 563 _ => {} 564 } 565 566 match receiver.await { 567 Ok(consumer) => { 568 self.data_consumer = consumer; 569 } 570 Err(_) => {} 571 } 572 Ok(()) 573 } 574 575 pub async fn on_publish( 576 &mut self, 577 transaction_id: &f64, 578 stream_id: &u32, 579 other_values: &mut Vec<Amf0ValueType>, 580 ) -> Result<(), SessionError> { 581 let length = other_values.len(); 582 583 if length < 2 { 584 return Err(SessionError { 585 value: SessionErrorValue::Amf0ValueCountNotCorrect, 586 }); 587 } 588 589 let stream_name = match other_values.remove(0) { 590 Amf0ValueType::UTF8String(val) => val, 591 _ => { 592 return Err(SessionError { 593 value: SessionErrorValue::Amf0ValueCountNotCorrect, 594 }); 595 } 596 }; 597 598 let stream_type = match other_values.remove(0) { 599 Amf0ValueType::UTF8String(val) => val, 600 _ => { 601 return Err(SessionError { 602 value: SessionErrorValue::Amf0ValueCountNotCorrect, 603 }); 604 } 605 }; 606 607 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 608 event_messages.write_stream_begin(stream_id.clone()).await?; 609 610 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 611 netstream 612 .on_status( 613 transaction_id, 614 &"status".to_string(), 615 &"NetStream.Publish.Start".to_string(), 616 &"".to_string(), 617 ) 618 .await?; 619 620 print!("before publish_to_channels\n"); 621 self.publish_to_channels(stream_name).await?; 622 print!("after publish_to_channels\n"); 623 624 Ok(()) 625 } 626 627 async fn publish_to_channels(&mut self, stream_name: String) -> Result<(), SessionError> { 628 let (sender, receiver) = oneshot::channel(); 629 let publish_event = ChannelEvent::Publish { 630 app_name: self.app_name.clone(), 631 stream_name, 632 responder: sender, 633 }; 634 635 let rv = self.event_producer.send(publish_event); 636 match rv { 637 Err(_) => { 638 return Err(SessionError { 639 value: SessionErrorValue::ChannelEventSendErr, 640 }) 641 } 642 _ => {} 643 } 644 645 match receiver.await { 646 Ok(producer) => { 647 print!("set producer before\n"); 648 self.data_producer = producer; 649 print!("set producer after\n"); 650 } 651 Err(_) => {} 652 } 653 Ok(()) 654 } 655 656 pub fn on_video_data( 657 &mut self, 658 data: &mut BytesMut, 659 timestamp: &u32, 660 ) -> Result<(), SessionError> { 661 let data = ChannelData::Video { 662 timestamp: timestamp.clone(), 663 data: data.clone(), 664 }; 665 666 print!("receive video data\n"); 667 match self.data_producer.send(data) { 668 Ok(size) => {} 669 Err(err) => { 670 format!("receive video err {}", err); 671 return Err(SessionError { 672 value: SessionErrorValue::SendChannelDataErr, 673 }); 674 } 675 } 676 677 Ok(()) 678 } 679 680 pub fn on_audio_data( 681 &mut self, 682 data: &mut BytesMut, 683 timestamp: &u32, 684 ) -> Result<(), SessionError> { 685 let data = ChannelData::Audio { 686 timestamp: timestamp.clone(), 687 data: data.clone(), 688 }; 689 690 match self.data_producer.send(data) { 691 Ok(size) => {} 692 Err(_) => { 693 return Err(SessionError { 694 value: SessionErrorValue::SendChannelDataErr, 695 }) 696 } 697 } 698 699 Ok(()) 700 } 701 702 pub fn on_amf_data( 703 &mut self, 704 body: &mut BytesMut, 705 values: &mut Vec<Amf0ValueType>, 706 ) -> Result<(), SessionError> { 707 let mut metadata_handler = metadata::MetaData::default(); 708 709 metadata_handler.save(body, values); 710 711 Ok(()) 712 } 713 } 714