1 use super::define; 2 use super::errors::SessionError; 3 use super::errors::SessionErrorValue; 4 use crate::handshake::handshake::SimpleHandshakeServer; 5 use crate::{amf0::Amf0ValueType, chunk::unpacketizer::UnpackResult}; 6 use crate::{ 7 application, 8 chunk::{unpacketizer::ChunkUnpacketizer, ChunkInfo}, 9 }; 10 use crate::{channels, chunk::packetizer::ChunkPacketizer}; 11 use crate::{ 12 chunk::define::CHUNK_SIZE, 13 chunk::define::{chunk_type, csid_type}, 14 }; 15 16 use crate::messages::define::msg_type_id; 17 use crate::messages::define::RtmpMessageData; 18 19 use crate::messages::parser::MessageParser; 20 use bytes::BytesMut; 21 22 use netio::bytes_reader::BytesReader; 23 use netio::bytes_writer::AsyncBytesWriter; 24 use netio::bytes_writer::BytesWriter; 25 use netio::netio::NetworkIO; 26 use std::{borrow::BorrowMut, time::Duration}; 27 28 use crate::channels::define::ChannelEvent; 29 use crate::channels::define::MultiConsumerForData; 30 use crate::channels::define::MultiProducerForEvent; 31 use crate::channels::define::SingleProducerForData; 32 use crate::netconnection::commands::NetConnection; 33 use crate::netstream::commands::NetStream; 34 use crate::protocol_control_messages::control_messages::ControlMessages; 35 36 use crate::user_control_messages::event_messages::EventMessages; 37 38 use std::collections::HashMap; 39 40 use tokio::{prelude::*, sync::broadcast}; 41 42 use std::cell::{RefCell, RefMut}; 43 use std::rc::Rc; 44 use std::sync::Arc; 45 use tokio::sync::oneshot; 46 use tokio::sync::Mutex; 47 48 use crate::channels::define::ChannelData; 49 use crate::channels::errors::ChannelError; 50 use crate::channels::errors::ChannelErrorValue; 51 52 use crate::handshake::handshake::ServerHandshakeState; 53 54 use crate::utils; 55 use log::{debug, log, log_enabled, Level}; 56 use std::fmt; 57 58 enum ServerSessionState { 59 Handshake, 60 ReadChunk, 61 // OnConnect, 62 // OnCreateStream, 63 //Publish, 64 Play, 65 } 66 67 pub struct ServerSession<S> 68 where 69 S: AsyncRead + AsyncWrite + Unpin + Send + Sync, 70 { 71 app_name: String, 72 73 io: Arc<Mutex<NetworkIO<S>>>, 74 handshaker: SimpleHandshakeServer<S>, 75 76 packetizer: ChunkPacketizer<S>, 77 unpacketizer: ChunkUnpacketizer, 78 79 state: ServerSessionState, 80 81 event_producer: MultiProducerForEvent, 82 83 //send video, audio or metadata from publish server session to player server sessions 84 data_producer: SingleProducerForData, 85 //receive video, audio or metadata from publish server session and send out to player 86 data_consumer: MultiConsumerForData, 87 } 88 89 impl<S> ServerSession<S> 90 where 91 S: AsyncRead + AsyncWrite + Unpin + Send + Sync, 92 { 93 pub fn new(stream: S, event_producer: MultiProducerForEvent, timeout: Duration) -> Self { 94 let net_io = Arc::new(Mutex::new(NetworkIO::new(stream, timeout))); 95 //only used for init,since I don't found a better way to deal with this. 96 let (init_producer, init_consumer) = broadcast::channel(1); 97 // let reader = BytesReader::new(BytesMut::new()); 98 99 Self { 100 app_name: String::from(""), 101 102 io: Arc::clone(&net_io), 103 handshaker: SimpleHandshakeServer::new(Arc::clone(&net_io)), 104 packetizer: ChunkPacketizer::new(Arc::clone(&net_io)), 105 unpacketizer: ChunkUnpacketizer::new(), 106 107 state: ServerSessionState::Handshake, 108 109 event_producer, 110 data_producer: init_producer, 111 data_consumer: init_consumer, 112 } 113 } 114 115 pub async fn run(&mut self) -> Result<(), SessionError> { 116 let duration = Duration::new(10, 10); 117 118 let mut remaining_bytes: BytesMut = BytesMut::new(); 119 let mut net_io_data: BytesMut = BytesMut::new(); 120 121 loop { 122 if remaining_bytes.len() <= 0 { 123 net_io_data = self.io.lock().await.read().await?; 124 } 125 126 match self.state { 127 ServerSessionState::Handshake => { 128 utils::print::printu8(net_io_data.clone()); 129 self.handshaker.extend_data(&net_io_data[..]); 130 self.handshaker.handshake().await?; 131 132 match self.handshaker.state { 133 ServerHandshakeState::Finish => { 134 self.state = ServerSessionState::ReadChunk; 135 remaining_bytes = self.handshaker.get_remaining_bytes(); 136 } 137 _ => continue, 138 } 139 } 140 ServerSessionState::ReadChunk => { 141 utils::print::printu8(net_io_data.clone()); 142 143 if remaining_bytes.len() > 0 { 144 let bytes = std::mem::replace(&mut remaining_bytes, BytesMut::new()); 145 self.unpacketizer.extend_data(&bytes[..]); 146 } else { 147 self.unpacketizer.extend_data(&net_io_data[..]); 148 } 149 150 let result = self.unpacketizer.read_chunks(); 151 152 let rv = match result { 153 Ok(val) => val, 154 Err(err) => { 155 // return Err(SessionError { 156 // value: SessionErrorValue::UnPackError(err), 157 // }) 158 continue; 159 } 160 }; 161 162 match rv { 163 UnpackResult::Chunks(chunks) => { 164 for chunk_info in chunks.iter() { 165 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 166 let timestamp = chunk_info.message_header.timestamp; 167 168 let mut message_parser = MessageParser::new(chunk_info.clone()); 169 let mut msg = message_parser.parse()?; 170 171 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 172 .await?; 173 } 174 } 175 _ => {} 176 } 177 } 178 179 //when in play state, only transfer publisher's video/audio/metadta to player. 180 ServerSessionState::Play => loop { 181 let data = self.data_consumer.recv().await; 182 183 match data { 184 Ok(val) => match val { 185 ChannelData::Audio { timestamp, data } => { 186 self.send_audio(data, timestamp).await?; 187 } 188 ChannelData::Video { timestamp, data } => { 189 self.send_video(data, timestamp).await?; 190 } 191 ChannelData::MetaData {} => {} 192 }, 193 Err(err) => {} 194 } 195 }, 196 } 197 } 198 199 //Ok(()) 200 } 201 202 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 203 let mut controlmessage = ControlMessages::new(AsyncBytesWriter::new(self.io.clone())); 204 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 205 206 Ok(()) 207 } 208 pub async fn send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> { 209 let mut chunk_info = ChunkInfo::new( 210 csid_type::AUDIO, 211 chunk_type::TYPE_0, 212 timestamp, 213 data.len() as u32, 214 msg_type_id::AUDIO, 215 0, 216 data, 217 ); 218 219 self.packetizer.write_chunk(&mut chunk_info).await?; 220 221 Ok(()) 222 } 223 224 pub async fn send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> { 225 let mut chunk_info = ChunkInfo::new( 226 csid_type::VIDEO, 227 chunk_type::TYPE_0, 228 timestamp, 229 data.len() as u32, 230 msg_type_id::VIDEO, 231 0, 232 data, 233 ); 234 235 self.packetizer.write_chunk(&mut chunk_info).await?; 236 237 Ok(()) 238 } 239 pub async fn process_messages( 240 &mut self, 241 rtmp_msg: &mut RtmpMessageData, 242 msg_stream_id: &u32, 243 timestamp: &u32, 244 ) -> Result<(), SessionError> { 245 match rtmp_msg { 246 RtmpMessageData::Amf0Command { 247 command_name, 248 transaction_id, 249 command_object, 250 others, 251 } => { 252 self.process_amf0_command_message( 253 msg_stream_id, 254 command_name, 255 transaction_id, 256 command_object, 257 others, 258 ) 259 .await? 260 } 261 RtmpMessageData::AudioData { data } => { 262 self.on_audio_data(data, timestamp)?; 263 } 264 RtmpMessageData::VideoData { data } => { 265 self.on_video_data(data, timestamp)?; 266 } 267 268 _ => {} 269 } 270 Ok(()) 271 } 272 273 pub async fn process_amf0_command_message( 274 &mut self, 275 stream_id: &u32, 276 command_name: &Amf0ValueType, 277 transaction_id: &Amf0ValueType, 278 command_object: &Amf0ValueType, 279 others: &mut Vec<Amf0ValueType>, 280 ) -> Result<(), SessionError> { 281 let empty_cmd_name = &String::new(); 282 let cmd_name = match command_name { 283 Amf0ValueType::UTF8String(str) => str, 284 _ => empty_cmd_name, 285 }; 286 287 let transaction_id = match transaction_id { 288 Amf0ValueType::Number(number) => number, 289 _ => &0.0, 290 }; 291 292 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 293 let obj = match command_object { 294 Amf0ValueType::Object(obj) => obj, 295 _ => &empty_cmd_obj, 296 }; 297 298 match cmd_name.as_str() { 299 "connect" => { 300 self.on_connect(&transaction_id, &obj).await?; 301 } 302 "createStream" => { 303 self.on_create_stream(transaction_id)?; 304 } 305 "deleteStream" => { 306 if others.len() > 1 { 307 let stream_id = match others.pop() { 308 Some(val) => match val { 309 Amf0ValueType::Number(streamid) => streamid, 310 _ => 0.0, 311 }, 312 _ => 0.0, 313 }; 314 315 self.on_delete_stream(transaction_id, &stream_id).await?; 316 } 317 } 318 "play" => { 319 self.on_play(transaction_id, stream_id, others).await?; 320 } 321 "publish" => { 322 self.on_publish(transaction_id, stream_id, others).await?; 323 } 324 _ => {} 325 } 326 327 Ok(()) 328 } 329 330 async fn on_connect( 331 &mut self, 332 transaction_id: &f64, 333 command_obj: &HashMap<String, Amf0ValueType>, 334 ) -> Result<(), SessionError> { 335 let mut control_message = ControlMessages::new(AsyncBytesWriter::new(self.io.clone())); 336 control_message 337 .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 338 .await?; 339 control_message 340 .write_set_peer_bandwidth( 341 define::PEER_BANDWIDTH, 342 define::PeerBandWidthLimitType::DYNAMIC, 343 ) 344 .await?; 345 control_message.write_set_chunk_size(CHUNK_SIZE).await?; 346 347 let obj_encoding = command_obj.get("objectEncoding"); 348 let encoding = match obj_encoding { 349 Some(Amf0ValueType::Number(encoding)) => encoding, 350 _ => &define::OBJENCODING_AMF0, 351 }; 352 353 let app_name = command_obj.get("app"); 354 self.app_name = match app_name { 355 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 356 _ => { 357 return Err(SessionError { 358 value: SessionErrorValue::NoAppName, 359 }); 360 } 361 }; 362 363 let mut netconnection = NetConnection::new(BytesWriter::new()); 364 let data = netconnection.connect_response( 365 &transaction_id, 366 &define::FMSVER.to_string(), 367 &define::CAPABILITIES, 368 &String::from("NetConnection.Connect.Success"), 369 &define::LEVEL.to_string(), 370 &String::from("Connection Succeeded."), 371 encoding, 372 )?; 373 374 let mut chunk_info = ChunkInfo::new( 375 csid_type::COMMAND_AMF0_AMF3, 376 chunk_type::TYPE_0, 377 0, 378 data.len() as u32, 379 msg_type_id::COMMAND_AMF0, 380 0, 381 data, 382 ); 383 384 self.packetizer.write_chunk(&mut chunk_info).await?; 385 386 Ok(()) 387 } 388 389 pub fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 390 let mut netconnection = NetConnection::new(BytesWriter::new()); 391 netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?; 392 393 Ok(()) 394 } 395 396 pub async fn on_delete_stream( 397 &mut self, 398 transaction_id: &f64, 399 stream_id: &f64, 400 ) -> Result<(), SessionError> { 401 let mut netstream = NetStream::new(BytesWriter::new()); 402 let data = netstream.on_status( 403 transaction_id, 404 &"status".to_string(), 405 &"NetStream.DeleteStream.Suceess".to_string(), 406 &"".to_string(), 407 )?; 408 409 let mut chunk_info = ChunkInfo::new( 410 csid_type::COMMAND_AMF0_AMF3, 411 chunk_type::TYPE_0, 412 0, 413 data.len() as u32, 414 msg_type_id::COMMAND_AMF0, 415 0, 416 data, 417 ); 418 419 self.packetizer.write_chunk(&mut chunk_info).await?; 420 Ok(()) 421 } 422 pub async fn on_play( 423 &mut self, 424 transaction_id: &f64, 425 stream_id: &u32, 426 other_values: &mut Vec<Amf0ValueType>, 427 ) -> Result<(), SessionError> { 428 let length = other_values.len() as u8; 429 let mut index: u8 = 0; 430 431 let mut stream_name: Option<String> = None; 432 let mut start: Option<f64> = None; 433 let mut duration: Option<f64> = None; 434 let mut reset: Option<bool> = None; 435 436 loop { 437 if index >= length { 438 break; 439 } 440 index = index + 1; 441 stream_name = match other_values.remove(0) { 442 Amf0ValueType::UTF8String(val) => Some(val), 443 _ => None, 444 }; 445 446 if index >= length { 447 break; 448 } 449 index = index + 1; 450 start = match other_values.remove(0) { 451 Amf0ValueType::Number(val) => Some(val), 452 _ => None, 453 }; 454 455 if index >= length { 456 break; 457 } 458 index = index + 1; 459 duration = match other_values.remove(0) { 460 Amf0ValueType::Number(val) => Some(val), 461 _ => None, 462 }; 463 464 if index >= length { 465 break; 466 } 467 index = index + 1; 468 reset = match other_values.remove(0) { 469 Amf0ValueType::Boolean(val) => Some(val), 470 _ => None, 471 }; 472 break; 473 } 474 475 let mut event_messages = EventMessages::new(AsyncBytesWriter::new(self.io.clone())); 476 event_messages.stream_begin(stream_id.clone()).await?; 477 478 let mut netstream = NetStream::new(BytesWriter::new()); 479 match reset { 480 Some(val) => { 481 if val { 482 netstream.on_status( 483 transaction_id, 484 &"status".to_string(), 485 &"NetStream.Play.Reset".to_string(), 486 &"".to_string(), 487 )?; 488 } 489 } 490 _ => {} 491 } 492 493 netstream.on_status( 494 transaction_id, 495 &"status".to_string(), 496 &"NetStream.Play.Start".to_string(), 497 &"".to_string(), 498 )?; 499 500 event_messages.stream_is_record(stream_id.clone()).await?; 501 502 self.subscribe_from_channels(stream_name.unwrap()).await?; 503 self.state = ServerSessionState::Play; 504 505 Ok(()) 506 } 507 508 async fn subscribe_from_channels(&mut self, stream_name: String) -> Result<(), SessionError> { 509 let (sender, receiver) = oneshot::channel(); 510 let subscribe_event = ChannelEvent::Subscribe { 511 app_name: self.app_name.clone(), 512 stream_name, 513 responder: sender, 514 }; 515 516 let rv = self.event_producer.send(subscribe_event); 517 match rv { 518 Err(_) => { 519 return Err(SessionError { 520 value: SessionErrorValue::ChannelEventSendErr, 521 }) 522 } 523 _ => {} 524 } 525 526 match receiver.await { 527 Ok(consumer) => { 528 self.data_consumer = consumer; 529 } 530 Err(_) => {} 531 } 532 Ok(()) 533 } 534 535 pub async fn on_publish( 536 &mut self, 537 transaction_id: &f64, 538 stream_id: &u32, 539 other_values: &mut Vec<Amf0ValueType>, 540 ) -> Result<(), SessionError> { 541 let length = other_values.len(); 542 543 if length < 2 { 544 return Err(SessionError { 545 value: SessionErrorValue::Amf0ValueCountNotCorrect, 546 }); 547 } 548 549 let stream_name = match other_values.remove(0) { 550 Amf0ValueType::UTF8String(val) => val, 551 _ => { 552 return Err(SessionError { 553 value: SessionErrorValue::Amf0ValueCountNotCorrect, 554 }); 555 } 556 }; 557 558 let stream_type = match other_values.remove(0) { 559 Amf0ValueType::UTF8String(val) => val, 560 _ => { 561 return Err(SessionError { 562 value: SessionErrorValue::Amf0ValueCountNotCorrect, 563 }); 564 } 565 }; 566 567 let mut event_messages = EventMessages::new(AsyncBytesWriter::new(self.io.clone())); 568 event_messages.stream_begin(stream_id.clone()).await?; 569 570 let mut netstream = NetStream::new(BytesWriter::new()); 571 let data = netstream.on_status( 572 transaction_id, 573 &"status".to_string(), 574 &"NetStream.Publish.Start".to_string(), 575 &"".to_string(), 576 )?; 577 578 let mut chunk_info = ChunkInfo::new( 579 csid_type::COMMAND_AMF0_AMF3, 580 chunk_type::TYPE_0, 581 0, 582 data.len() as u32, 583 msg_type_id::COMMAND_AMF0, 584 0, 585 data, 586 ); 587 588 self.packetizer.write_chunk(&mut chunk_info).await?; 589 590 self.publish_to_channels(stream_name).await?; 591 592 Ok(()) 593 } 594 595 async fn publish_to_channels(&mut self, stream_name: String) -> Result<(), SessionError> { 596 let (sender, receiver) = oneshot::channel(); 597 let publish_event = ChannelEvent::Publish { 598 app_name: self.app_name.clone(), 599 stream_name, 600 responder: sender, 601 }; 602 603 let rv = self.event_producer.send(publish_event); 604 match rv { 605 Err(_) => { 606 return Err(SessionError { 607 value: SessionErrorValue::ChannelEventSendErr, 608 }) 609 } 610 _ => {} 611 } 612 613 match receiver.await { 614 Ok(producer) => { 615 self.data_producer = producer; 616 } 617 Err(_) => {} 618 } 619 Ok(()) 620 } 621 622 pub fn on_video_data( 623 &mut self, 624 data: &mut BytesMut, 625 timestamp: &u32, 626 ) -> Result<(), SessionError> { 627 let data = ChannelData::Video { 628 timestamp: timestamp.clone(), 629 data: data.clone(), 630 }; 631 632 match self.data_producer.send(data) { 633 Ok(size) => {} 634 Err(_) => { 635 return Err(SessionError { 636 value: SessionErrorValue::SendChannelDataErr, 637 }) 638 } 639 } 640 641 Ok(()) 642 } 643 644 pub fn on_audio_data( 645 &mut self, 646 data: &mut BytesMut, 647 timestamp: &u32, 648 ) -> Result<(), SessionError> { 649 let data = ChannelData::Audio { 650 timestamp: timestamp.clone(), 651 data: data.clone(), 652 }; 653 654 match self.data_producer.send(data) { 655 Ok(size) => {} 656 Err(_) => { 657 return Err(SessionError { 658 value: SessionErrorValue::SendChannelDataErr, 659 }) 660 } 661 } 662 663 Ok(()) 664 } 665 } 666