1 use super::define; 2 use super::errors::SessionError; 3 use super::errors::SessionErrorValue; 4 use crate::handshake::handshake::{SimpleHandshakeServer,ComplexHandshakeServer}; 5 use crate::{amf0::Amf0ValueType, chunk::unpacketizer::UnpackResult}; 6 use crate::{ 7 application, 8 chunk::{unpacketizer::ChunkUnpacketizer, ChunkInfo}, 9 }; 10 use crate::{channels, chunk::packetizer::ChunkPacketizer}; 11 use crate::{ 12 chunk::define::CHUNK_SIZE, 13 chunk::define::{chunk_type, csid_type}, 14 }; 15 16 use crate::messages::define::msg_type_id; 17 use crate::messages::define::RtmpMessageData; 18 19 use crate::messages::parser::MessageParser; 20 use bytes::BytesMut; 21 22 use netio::bytes_reader::BytesReader; 23 use netio::bytes_writer::AsyncBytesWriter; 24 use netio::bytes_writer::BytesWriter; 25 use netio::netio::NetworkIO; 26 use std::{borrow::BorrowMut, time::Duration}; 27 28 use crate::channels::define::ChannelEvent; 29 use crate::channels::define::MultiConsumerForData; 30 use crate::channels::define::MultiProducerForEvent; 31 use crate::channels::define::SingleProducerForData; 32 use crate::netconnection::commands::NetConnection; 33 use crate::netstream::commands::NetStream; 34 use crate::protocol_control_messages::control_messages::ControlMessages; 35 36 use crate::user_control_messages::event_messages::EventMessages; 37 38 use std::collections::HashMap; 39 40 use tokio::sync::broadcast; 41 use tokio::{io::ReadHalf, net::TcpStream}; 42 43 use std::cell::{RefCell, RefMut}; 44 use std::rc::Rc; 45 use std::sync::Arc; 46 use tokio::sync::oneshot; 47 use tokio::sync::Mutex; 48 49 use crate::channels::define::ChannelData; 50 use crate::channels::errors::ChannelError; 51 use crate::channels::errors::ChannelErrorValue; 52 53 use crate::handshake::handshake::ServerHandshakeState; 54 55 use crate::utils; 56 use log::{debug, log, log_enabled, Level}; 57 use std::fmt; 58 59 enum ServerSessionState { 60 Handshake, 61 ReadChunk, 62 // OnConnect, 63 // OnCreateStream, 64 //Publish, 65 Play, 66 } 67 68 pub struct ServerSession { 69 app_name: String, 70 71 io: Arc<Mutex<NetworkIO>>, 72 simple_handshaker: SimpleHandshakeServer, 73 complex_handshaker: ComplexHandshakeServer, 74 75 packetizer: ChunkPacketizer, 76 unpacketizer: ChunkUnpacketizer, 77 78 state: ServerSessionState, 79 80 event_producer: MultiProducerForEvent, 81 82 //send video, audio or metadata from publish server session to player server sessions 83 data_producer: SingleProducerForData, 84 //receive video, audio or metadata from publish server session and send out to player 85 data_consumer: MultiConsumerForData, 86 87 session_id: u8, 88 } 89 90 impl ServerSession { 91 pub fn new( 92 stream: TcpStream, 93 event_producer: MultiProducerForEvent, 94 timeout: Duration, 95 session_id: u8, 96 ) -> Self { 97 let net_io = Arc::new(Mutex::new(NetworkIO::new(stream, timeout))); 98 //only used for init,since I don't found a better way to deal with this. 99 let (init_producer, init_consumer) = broadcast::channel(1); 100 // let reader = BytesReader::new(BytesMut::new()); 101 102 Self { 103 app_name: String::from(""), 104 105 io: Arc::clone(&net_io), 106 simple_handshaker: SimpleHandshakeServer::new(Arc::clone(&net_io)), 107 complex_handshaker: ComplexHandshakeServer::new(Arc::clone(&net_io)), 108 packetizer: ChunkPacketizer::new(Arc::clone(&net_io)), 109 unpacketizer: ChunkUnpacketizer::new(), 110 111 state: ServerSessionState::Handshake, 112 113 event_producer, 114 data_producer: init_producer, 115 data_consumer: init_consumer, 116 session_id: session_id, 117 } 118 } 119 120 pub async fn run(&mut self) -> Result<(), SessionError> { 121 let duration = Duration::new(10, 10); 122 123 let mut remaining_bytes: BytesMut = BytesMut::new(); 124 125 loop { 126 let mut net_io_data: BytesMut = BytesMut::new(); 127 if remaining_bytes.len() <= 0 { 128 net_io_data = self.io.lock().await.read().await?; 129 130 utils::print::print(net_io_data.clone()); 131 } 132 133 match self.state { 134 ServerSessionState::Handshake => { 135 if self.session_id > 0 { 136 // utils::print::printu8(net_io_data.clone()); 137 } 138 139 self.simple_handshaker.extend_data(&net_io_data[..]); 140 self.simple_handshaker.handshake().await?; 141 142 match self.simple_handshaker.state { 143 ServerHandshakeState::Finish => { 144 self.state = ServerSessionState::ReadChunk; 145 remaining_bytes = self.simple_handshaker.get_remaining_bytes(); 146 } 147 _ => continue, 148 } 149 } 150 ServerSessionState::ReadChunk => { 151 if self.session_id > 0 { 152 // utils::print::printu8(net_io_data.clone()); 153 } 154 155 if remaining_bytes.len() > 0 { 156 let bytes = std::mem::replace(&mut remaining_bytes, BytesMut::new()); 157 self.unpacketizer.extend_data(&bytes[..]); 158 } else { 159 self.unpacketizer.extend_data(&net_io_data[..]); 160 } 161 162 let result = self.unpacketizer.read_chunks(); 163 164 let rv = match result { 165 Ok(val) => val, 166 Err(err) => { 167 // return Err(SessionError { 168 // value: SessionErrorValue::UnPackError(err), 169 // }) 170 continue; 171 } 172 }; 173 174 match rv { 175 UnpackResult::Chunks(chunks) => { 176 for chunk_info in chunks.iter() { 177 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 178 let timestamp = chunk_info.message_header.timestamp; 179 180 let mut message_parser = MessageParser::new(chunk_info.clone()); 181 let mut msg = message_parser.parse()?; 182 183 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 184 .await?; 185 } 186 } 187 _ => {} 188 } 189 } 190 191 //when in play state, only transfer publisher's video/audio/metadta to player. 192 ServerSessionState::Play => loop { 193 let data = self.data_consumer.recv().await; 194 195 match data { 196 Ok(val) => match val { 197 ChannelData::Audio { timestamp, data } => { 198 self.send_audio(data, timestamp).await?; 199 } 200 ChannelData::Video { timestamp, data } => { 201 self.send_video(data, timestamp).await?; 202 } 203 ChannelData::MetaData {} => {} 204 }, 205 Err(err) => {} 206 } 207 }, 208 } 209 } 210 211 //Ok(()) 212 } 213 214 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 215 let mut controlmessage = ControlMessages::new(AsyncBytesWriter::new(self.io.clone())); 216 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 217 218 Ok(()) 219 } 220 pub async fn send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> { 221 let mut chunk_info = ChunkInfo::new( 222 csid_type::AUDIO, 223 chunk_type::TYPE_0, 224 timestamp, 225 data.len() as u32, 226 msg_type_id::AUDIO, 227 0, 228 data, 229 ); 230 231 self.packetizer.write_chunk(&mut chunk_info).await?; 232 233 Ok(()) 234 } 235 236 pub async fn send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> { 237 let mut chunk_info = ChunkInfo::new( 238 csid_type::VIDEO, 239 chunk_type::TYPE_0, 240 timestamp, 241 data.len() as u32, 242 msg_type_id::VIDEO, 243 0, 244 data, 245 ); 246 247 self.packetizer.write_chunk(&mut chunk_info).await?; 248 249 Ok(()) 250 } 251 pub async fn process_messages( 252 &mut self, 253 rtmp_msg: &mut RtmpMessageData, 254 msg_stream_id: &u32, 255 timestamp: &u32, 256 ) -> Result<(), SessionError> { 257 match rtmp_msg { 258 RtmpMessageData::Amf0Command { 259 command_name, 260 transaction_id, 261 command_object, 262 others, 263 } => { 264 self.process_amf0_command_message( 265 msg_stream_id, 266 command_name, 267 transaction_id, 268 command_object, 269 others, 270 ) 271 .await? 272 } 273 RtmpMessageData::AudioData { data } => { 274 self.on_audio_data(data, timestamp)?; 275 } 276 RtmpMessageData::VideoData { data } => { 277 self.on_video_data(data, timestamp)?; 278 } 279 280 _ => {} 281 } 282 Ok(()) 283 } 284 285 pub async fn process_amf0_command_message( 286 &mut self, 287 stream_id: &u32, 288 command_name: &Amf0ValueType, 289 transaction_id: &Amf0ValueType, 290 command_object: &Amf0ValueType, 291 others: &mut Vec<Amf0ValueType>, 292 ) -> Result<(), SessionError> { 293 let empty_cmd_name = &String::new(); 294 let cmd_name = match command_name { 295 Amf0ValueType::UTF8String(str) => str, 296 _ => empty_cmd_name, 297 }; 298 299 let transaction_id = match transaction_id { 300 Amf0ValueType::Number(number) => number, 301 _ => &0.0, 302 }; 303 304 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 305 let obj = match command_object { 306 Amf0ValueType::Object(obj) => obj, 307 _ => &empty_cmd_obj, 308 }; 309 310 match cmd_name.as_str() { 311 "connect" => { 312 print!("connect ......."); 313 self.on_connect(&transaction_id, &obj).await?; 314 } 315 "createStream" => { 316 self.on_create_stream(transaction_id).await?; 317 } 318 "deleteStream" => { 319 if others.len() > 1 { 320 let stream_id = match others.pop() { 321 Some(val) => match val { 322 Amf0ValueType::Number(streamid) => streamid, 323 _ => 0.0, 324 }, 325 _ => 0.0, 326 }; 327 328 self.on_delete_stream(transaction_id, &stream_id).await?; 329 } 330 } 331 "play" => { 332 self.on_play(transaction_id, stream_id, others).await?; 333 } 334 "publish" => { 335 self.on_publish(transaction_id, stream_id, others).await?; 336 } 337 _ => {} 338 } 339 340 Ok(()) 341 } 342 343 async fn on_connect( 344 &mut self, 345 transaction_id: &f64, 346 command_obj: &HashMap<String, Amf0ValueType>, 347 ) -> Result<(), SessionError> { 348 let mut control_message = ControlMessages::new(AsyncBytesWriter::new(self.io.clone())); 349 control_message 350 .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 351 .await?; 352 control_message 353 .write_set_peer_bandwidth( 354 define::PEER_BANDWIDTH, 355 define::PeerBandWidthLimitType::DYNAMIC, 356 ) 357 .await?; 358 control_message.write_set_chunk_size(CHUNK_SIZE).await?; 359 360 let obj_encoding = command_obj.get("objectEncoding"); 361 let encoding = match obj_encoding { 362 Some(Amf0ValueType::Number(encoding)) => encoding, 363 _ => &define::OBJENCODING_AMF0, 364 }; 365 366 let app_name = command_obj.get("app"); 367 self.app_name = match app_name { 368 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 369 _ => { 370 return Err(SessionError { 371 value: SessionErrorValue::NoAppName, 372 }); 373 } 374 }; 375 376 let mut netconnection = NetConnection::new(BytesWriter::new()); 377 let data = netconnection.connect_response( 378 &transaction_id, 379 &define::FMSVER.to_string(), 380 &define::CAPABILITIES, 381 &String::from("NetConnection.Connect.Success"), 382 &define::LEVEL.to_string(), 383 &String::from("Connection Succeeded."), 384 encoding, 385 )?; 386 387 let mut chunk_info = ChunkInfo::new( 388 csid_type::COMMAND_AMF0_AMF3, 389 chunk_type::TYPE_0, 390 0, 391 data.len() as u32, 392 msg_type_id::COMMAND_AMF0, 393 0, 394 data, 395 ); 396 397 self.packetizer.write_chunk(&mut chunk_info).await?; 398 399 Ok(()) 400 } 401 402 pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 403 let mut netconnection = NetConnection::new(BytesWriter::new()); 404 let data = netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?; 405 406 let mut chunk_info = ChunkInfo::new( 407 csid_type::COMMAND_AMF0_AMF3, 408 chunk_type::TYPE_0, 409 0, 410 data.len() as u32, 411 msg_type_id::COMMAND_AMF0, 412 0, 413 data, 414 ); 415 416 self.packetizer.write_chunk(&mut chunk_info).await?; 417 418 Ok(()) 419 } 420 421 pub async fn on_delete_stream( 422 &mut self, 423 transaction_id: &f64, 424 stream_id: &f64, 425 ) -> Result<(), SessionError> { 426 let mut netstream = NetStream::new(BytesWriter::new()); 427 let data = netstream.on_status( 428 transaction_id, 429 &"status".to_string(), 430 &"NetStream.DeleteStream.Suceess".to_string(), 431 &"".to_string(), 432 )?; 433 434 let mut chunk_info = ChunkInfo::new( 435 csid_type::COMMAND_AMF0_AMF3, 436 chunk_type::TYPE_0, 437 0, 438 data.len() as u32, 439 msg_type_id::COMMAND_AMF0, 440 0, 441 data, 442 ); 443 444 self.packetizer.write_chunk(&mut chunk_info).await?; 445 Ok(()) 446 } 447 pub async fn on_play( 448 &mut self, 449 transaction_id: &f64, 450 stream_id: &u32, 451 other_values: &mut Vec<Amf0ValueType>, 452 ) -> Result<(), SessionError> { 453 let length = other_values.len() as u8; 454 let mut index: u8 = 0; 455 456 let mut stream_name: Option<String> = None; 457 let mut start: Option<f64> = None; 458 let mut duration: Option<f64> = None; 459 let mut reset: Option<bool> = None; 460 461 loop { 462 if index >= length { 463 break; 464 } 465 index = index + 1; 466 stream_name = match other_values.remove(0) { 467 Amf0ValueType::UTF8String(val) => Some(val), 468 _ => None, 469 }; 470 471 if index >= length { 472 break; 473 } 474 index = index + 1; 475 start = match other_values.remove(0) { 476 Amf0ValueType::Number(val) => Some(val), 477 _ => None, 478 }; 479 480 if index >= length { 481 break; 482 } 483 index = index + 1; 484 duration = match other_values.remove(0) { 485 Amf0ValueType::Number(val) => Some(val), 486 _ => None, 487 }; 488 489 if index >= length { 490 break; 491 } 492 index = index + 1; 493 reset = match other_values.remove(0) { 494 Amf0ValueType::Boolean(val) => Some(val), 495 _ => None, 496 }; 497 break; 498 } 499 500 let mut event_messages = EventMessages::new(AsyncBytesWriter::new(self.io.clone())); 501 event_messages.stream_begin(stream_id.clone()).await?; 502 503 let mut netstream = NetStream::new(BytesWriter::new()); 504 match reset { 505 Some(val) => { 506 if val { 507 netstream.on_status( 508 transaction_id, 509 &"status".to_string(), 510 &"NetStream.Play.Reset".to_string(), 511 &"".to_string(), 512 )?; 513 } 514 } 515 _ => {} 516 } 517 518 netstream.on_status( 519 transaction_id, 520 &"status".to_string(), 521 &"NetStream.Play.Start".to_string(), 522 &"".to_string(), 523 )?; 524 525 event_messages.stream_is_record(stream_id.clone()).await?; 526 527 self.subscribe_from_channels(stream_name.unwrap()).await?; 528 self.state = ServerSessionState::Play; 529 530 Ok(()) 531 } 532 533 async fn subscribe_from_channels(&mut self, stream_name: String) -> Result<(), SessionError> { 534 let (sender, receiver) = oneshot::channel(); 535 let subscribe_event = ChannelEvent::Subscribe { 536 app_name: self.app_name.clone(), 537 stream_name, 538 responder: sender, 539 }; 540 541 let rv = self.event_producer.send(subscribe_event); 542 match rv { 543 Err(_) => { 544 return Err(SessionError { 545 value: SessionErrorValue::ChannelEventSendErr, 546 }) 547 } 548 _ => {} 549 } 550 551 match receiver.await { 552 Ok(consumer) => { 553 self.data_consumer = consumer; 554 } 555 Err(_) => {} 556 } 557 Ok(()) 558 } 559 560 pub async fn on_publish( 561 &mut self, 562 transaction_id: &f64, 563 stream_id: &u32, 564 other_values: &mut Vec<Amf0ValueType>, 565 ) -> Result<(), SessionError> { 566 let length = other_values.len(); 567 568 if length < 2 { 569 return Err(SessionError { 570 value: SessionErrorValue::Amf0ValueCountNotCorrect, 571 }); 572 } 573 574 let stream_name = match other_values.remove(0) { 575 Amf0ValueType::UTF8String(val) => val, 576 _ => { 577 return Err(SessionError { 578 value: SessionErrorValue::Amf0ValueCountNotCorrect, 579 }); 580 } 581 }; 582 583 let stream_type = match other_values.remove(0) { 584 Amf0ValueType::UTF8String(val) => val, 585 _ => { 586 return Err(SessionError { 587 value: SessionErrorValue::Amf0ValueCountNotCorrect, 588 }); 589 } 590 }; 591 592 let mut event_messages = EventMessages::new(AsyncBytesWriter::new(self.io.clone())); 593 event_messages.stream_begin(stream_id.clone()).await?; 594 595 let mut netstream = NetStream::new(BytesWriter::new()); 596 let data = netstream.on_status( 597 transaction_id, 598 &"status".to_string(), 599 &"NetStream.Publish.Start".to_string(), 600 &"".to_string(), 601 )?; 602 603 let mut chunk_info = ChunkInfo::new( 604 csid_type::COMMAND_AMF0_AMF3, 605 chunk_type::TYPE_0, 606 0, 607 data.len() as u32, 608 msg_type_id::COMMAND_AMF0, 609 0, 610 data, 611 ); 612 613 self.packetizer.write_chunk(&mut chunk_info).await?; 614 615 //self.publish_to_channels(stream_name).await?; 616 617 Ok(()) 618 } 619 620 async fn publish_to_channels(&mut self, stream_name: String) -> Result<(), SessionError> { 621 let (sender, receiver) = oneshot::channel(); 622 let publish_event = ChannelEvent::Publish { 623 app_name: self.app_name.clone(), 624 stream_name, 625 responder: sender, 626 }; 627 628 let rv = self.event_producer.send(publish_event); 629 match rv { 630 Err(_) => { 631 return Err(SessionError { 632 value: SessionErrorValue::ChannelEventSendErr, 633 }) 634 } 635 _ => {} 636 } 637 638 match receiver.await { 639 Ok(producer) => { 640 self.data_producer = producer; 641 } 642 Err(_) => {} 643 } 644 Ok(()) 645 } 646 647 pub fn on_video_data( 648 &mut self, 649 data: &mut BytesMut, 650 timestamp: &u32, 651 ) -> Result<(), SessionError> { 652 // let data = ChannelData::Video { 653 // timestamp: timestamp.clone(), 654 // data: data.clone(), 655 // }; 656 657 // match self.data_producer.send(data) { 658 // Ok(size) => {} 659 // Err(_) => { 660 // return Err(SessionError { 661 // value: SessionErrorValue::SendChannelDataErr, 662 // }) 663 // } 664 // } 665 666 Ok(()) 667 } 668 669 pub fn on_audio_data( 670 &mut self, 671 data: &mut BytesMut, 672 timestamp: &u32, 673 ) -> Result<(), SessionError> { 674 // let data = ChannelData::Audio { 675 // timestamp: timestamp.clone(), 676 // data: data.clone(), 677 // }; 678 679 // match self.data_producer.send(data) { 680 // Ok(size) => {} 681 // Err(_) => { 682 // return Err(SessionError { 683 // value: SessionErrorValue::SendChannelDataErr, 684 // }) 685 // } 686 // } 687 688 Ok(()) 689 } 690 } 691