1 use super::define; 2 use super::errors::SessionError; 3 use super::errors::SessionErrorValue; 4 use crate::handshake::handshake::SimpleHandshakeServer; 5 use crate::{amf0::Amf0ValueType, chunk::unpacketizer::UnpackResult}; 6 use crate::{ 7 application, 8 chunk::{unpacketizer::ChunkUnpacketizer, ChunkInfo}, 9 }; 10 use crate::{channels, chunk::packetizer::ChunkPacketizer}; 11 use crate::{ 12 chunk::define::CHUNK_SIZE, 13 chunk::define::{chunk_type, csid_type}, 14 }; 15 16 use crate::messages::define::msg_type_id; 17 use crate::messages::define::RtmpMessageData; 18 19 use crate::messages::parser::MessageParser; 20 use bytes::BytesMut; 21 22 use netio::bytes_writer::AsyncBytesWriter; 23 use netio::bytes_writer::BytesWriter; 24 use netio::netio::NetworkIO; 25 use std::{borrow::BorrowMut, time::Duration}; 26 27 use crate::channels::define::ChannelEvent; 28 use crate::channels::define::MultiConsumerForData; 29 use crate::channels::define::MultiProducerForEvent; 30 use crate::channels::define::SingleProducerForData; 31 use crate::netconnection::commands::NetConnection; 32 use crate::netstream::commands::NetStream; 33 use crate::protocol_control_messages::control_messages::ControlMessages; 34 35 use crate::user_control_messages::event_messages::EventMessages; 36 37 use std::collections::HashMap; 38 39 use tokio::{prelude::*, sync::broadcast}; 40 41 use std::sync::Arc; 42 use tokio::sync::oneshot; 43 use tokio::sync::Mutex; 44 45 use crate::channels::define::ChannelData; 46 use crate::channels::errors::ChannelError; 47 use crate::channels::errors::ChannelErrorValue; 48 49 enum ServerSessionState { 50 Handshake, 51 ReadChunk, 52 // OnConnect, 53 // OnCreateStream, 54 //Publish, 55 Play, 56 } 57 58 pub struct ServerSession<S> 59 where 60 S: AsyncRead + AsyncWrite + Unpin + Send + Sync, 61 { 62 app_name: String, 63 64 io: Arc<Mutex<NetworkIO<S>>>, 65 handshaker: SimpleHandshakeServer<S>, 66 67 packetizer: ChunkPacketizer<S>, 68 unpacketizer: ChunkUnpacketizer, 69 70 state: ServerSessionState, 71 72 event_producer: MultiProducerForEvent, 73 74 //send video, audio or metadata from publish server session to player server sessions 75 data_producer: SingleProducerForData, 76 //receive video, audio or metadata from publish server session and send out to player 77 data_consumer: MultiConsumerForData, 78 } 79 80 impl<S> ServerSession<S> 81 where 82 S: AsyncRead + AsyncWrite + Unpin + Send + Sync, 83 { 84 pub fn new(stream: S, event_producer: MultiProducerForEvent, timeout: Duration) -> Self { 85 let net_io = Arc::new(Mutex::new(NetworkIO::new(stream, timeout))); 86 let bytes_writer = AsyncBytesWriter::new(Arc::clone(&net_io)); 87 88 //only used for init,since I don't found a better way to deal with this. 89 let (init_producer, init_consumer) = broadcast::channel(1); 90 91 Self { 92 app_name: String::from(""), 93 94 io: Arc::clone(&net_io), 95 handshaker: SimpleHandshakeServer::new(Arc::clone(&net_io)), 96 97 packetizer: ChunkPacketizer::new(bytes_writer), 98 unpacketizer: ChunkUnpacketizer::new(), 99 100 state: ServerSessionState::Handshake, 101 102 event_producer, 103 data_producer: init_producer, 104 data_consumer: init_consumer, 105 } 106 } 107 108 pub async fn run(&mut self) -> Result<(), SessionError> { 109 let duration = Duration::new(10, 10); 110 111 loop { 112 let data = self.io.lock().await.read().await?; 113 114 match self.state { 115 ServerSessionState::Handshake => { 116 self.handshaker.extend_data(&data[..]); 117 let result = self.handshaker.handshake().await; 118 119 match result { 120 Ok(v) => { 121 self.state = ServerSessionState::ReadChunk; 122 } 123 Err(e) => {} 124 } 125 } 126 ServerSessionState::ReadChunk => { 127 self.unpacketizer.extend_data(&data[..]); 128 let result = self.unpacketizer.read_chunk()?; 129 130 match result { 131 UnpackResult::ChunkInfo(chunk_info) => { 132 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 133 let timestamp = chunk_info.message_header.timestamp; 134 135 let mut message_parser = MessageParser::new(chunk_info); 136 let mut msg = message_parser.parse()?; 137 138 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 139 .await?; 140 } 141 _ => {} 142 } 143 } 144 145 //when in play state, only transfer publisher's video/audio/metadta to player. 146 ServerSessionState::Play => loop { 147 let data = self.data_consumer.recv().await; 148 149 match data { 150 Ok(val) => match val { 151 ChannelData::Audio { timestamp, data } => { 152 self.send_audio(data, timestamp).await?; 153 } 154 ChannelData::Video { timestamp, data } => { 155 self.send_video(data, timestamp).await?; 156 } 157 ChannelData::MetaData {} => {} 158 }, 159 Err(err) => {} 160 } 161 }, 162 } 163 } 164 165 //Ok(()) 166 } 167 168 pub fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 169 let mut controlmessage = ControlMessages::new(AsyncBytesWriter::new(self.io.clone())); 170 controlmessage.write_set_chunk_size(CHUNK_SIZE)?; 171 172 Ok(()) 173 } 174 pub async fn send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> { 175 let mut chunk_info = ChunkInfo::new( 176 csid_type::AUDIO, 177 chunk_type::TYPE_0, 178 timestamp, 179 data.len() as u32, 180 msg_type_id::AUDIO, 181 0, 182 data, 183 ); 184 185 self.packetizer.write_chunk(&mut chunk_info).await?; 186 187 Ok(()) 188 } 189 190 pub async fn send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> { 191 let mut chunk_info = ChunkInfo::new( 192 csid_type::VIDEO, 193 chunk_type::TYPE_0, 194 timestamp, 195 data.len() as u32, 196 msg_type_id::VIDEO, 197 0, 198 data, 199 ); 200 201 self.packetizer.write_chunk(&mut chunk_info).await?; 202 203 Ok(()) 204 } 205 pub async fn process_messages( 206 &mut self, 207 rtmp_msg: &mut RtmpMessageData, 208 msg_stream_id: &u32, 209 timestamp: &u32, 210 ) -> Result<(), SessionError> { 211 match rtmp_msg { 212 RtmpMessageData::Amf0Command { 213 command_name, 214 transaction_id, 215 command_object, 216 others, 217 } => { 218 self.process_amf0_command_message( 219 msg_stream_id, 220 command_name, 221 transaction_id, 222 command_object, 223 others, 224 ) 225 .await? 226 } 227 RtmpMessageData::AudioData { data } => { 228 self.on_audio_data(data, timestamp)?; 229 } 230 RtmpMessageData::VideoData { data } => { 231 self.on_video_data(data, timestamp)?; 232 } 233 234 _ => {} 235 } 236 Ok(()) 237 } 238 239 pub async fn process_amf0_command_message( 240 &mut self, 241 stream_id: &u32, 242 command_name: &Amf0ValueType, 243 transaction_id: &Amf0ValueType, 244 command_object: &Amf0ValueType, 245 others: &mut Vec<Amf0ValueType>, 246 ) -> Result<(), SessionError> { 247 let empty_cmd_name = &String::new(); 248 let cmd_name = match command_name { 249 Amf0ValueType::UTF8String(str) => str, 250 _ => empty_cmd_name, 251 }; 252 253 let transaction_id = match transaction_id { 254 Amf0ValueType::Number(number) => number, 255 _ => &0.0, 256 }; 257 258 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 259 let obj = match command_object { 260 Amf0ValueType::Object(obj) => obj, 261 _ => &empty_cmd_obj, 262 }; 263 264 match cmd_name.as_str() { 265 "connect" => { 266 self.on_connect(&transaction_id, &obj).await?; 267 } 268 "createStream" => { 269 self.on_create_stream(transaction_id)?; 270 } 271 "deleteStream" => { 272 if others.len() > 1 { 273 let stream_id = match others.pop() { 274 Some(val) => match val { 275 Amf0ValueType::Number(streamid) => streamid, 276 _ => 0.0, 277 }, 278 _ => 0.0, 279 }; 280 281 self.on_delete_stream(transaction_id, &stream_id).await?; 282 } 283 } 284 "play" => { 285 self.on_play(transaction_id, stream_id, others).await?; 286 } 287 "publish" => { 288 self.on_publish(transaction_id, stream_id, others).await?; 289 } 290 _ => {} 291 } 292 293 Ok(()) 294 } 295 296 async fn on_connect( 297 &mut self, 298 transaction_id: &f64, 299 command_obj: &HashMap<String, Amf0ValueType>, 300 ) -> Result<(), SessionError> { 301 let mut control_message = ControlMessages::new(AsyncBytesWriter::new(self.io.clone())); 302 control_message.write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE)?; 303 control_message.write_set_peer_bandwidth( 304 define::PEER_BANDWIDTH, 305 define::PeerBandWidthLimitType::DYNAMIC, 306 )?; 307 control_message.write_set_chunk_size(CHUNK_SIZE)?; 308 309 let obj_encoding = command_obj.get("objectEncoding"); 310 let encoding = match obj_encoding { 311 Some(Amf0ValueType::Number(encoding)) => encoding, 312 _ => &define::OBJENCODING_AMF0, 313 }; 314 315 let app_name = command_obj.get("app"); 316 self.app_name = match app_name { 317 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 318 _ => { 319 return Err(SessionError { 320 value: SessionErrorValue::NoAppName, 321 }); 322 } 323 }; 324 325 let mut netconnection = NetConnection::new(BytesWriter::new()); 326 let data = netconnection.connect_response( 327 &transaction_id, 328 &define::FMSVER.to_string(), 329 &define::CAPABILITIES, 330 &String::from("NetConnection.Connect.Success"), 331 &define::LEVEL.to_string(), 332 &String::from("Connection Succeeded."), 333 encoding, 334 )?; 335 336 let mut chunk_info = ChunkInfo::new( 337 csid_type::COMMAND_AMF0_AMF3, 338 chunk_type::TYPE_0, 339 0, 340 data.len() as u32, 341 msg_type_id::COMMAND_AMF0, 342 0, 343 data, 344 ); 345 346 self.packetizer.write_chunk(&mut chunk_info).await?; 347 348 Ok(()) 349 } 350 351 pub fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 352 let mut netconnection = NetConnection::new(BytesWriter::new()); 353 netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?; 354 355 Ok(()) 356 } 357 358 pub async fn on_delete_stream( 359 &mut self, 360 transaction_id: &f64, 361 stream_id: &f64, 362 ) -> Result<(), SessionError> { 363 let mut netstream = NetStream::new(BytesWriter::new()); 364 let data = netstream.on_status( 365 transaction_id, 366 &"status".to_string(), 367 &"NetStream.DeleteStream.Suceess".to_string(), 368 &"".to_string(), 369 )?; 370 371 let mut chunk_info = ChunkInfo::new( 372 csid_type::COMMAND_AMF0_AMF3, 373 chunk_type::TYPE_0, 374 0, 375 data.len() as u32, 376 msg_type_id::COMMAND_AMF0, 377 0, 378 data, 379 ); 380 381 self.packetizer.write_chunk(&mut chunk_info).await?; 382 Ok(()) 383 } 384 pub async fn on_play( 385 &mut self, 386 transaction_id: &f64, 387 stream_id: &u32, 388 other_values: &mut Vec<Amf0ValueType>, 389 ) -> Result<(), SessionError> { 390 let length = other_values.len() as u8; 391 let mut index: u8 = 0; 392 393 let mut stream_name: Option<String> = None; 394 let mut start: Option<f64> = None; 395 let mut duration: Option<f64> = None; 396 let mut reset: Option<bool> = None; 397 398 loop { 399 if index >= length { 400 break; 401 } 402 index = index + 1; 403 stream_name = match other_values.remove(0) { 404 Amf0ValueType::UTF8String(val) => Some(val), 405 _ => None, 406 }; 407 408 if index >= length { 409 break; 410 } 411 index = index + 1; 412 start = match other_values.remove(0) { 413 Amf0ValueType::Number(val) => Some(val), 414 _ => None, 415 }; 416 417 if index >= length { 418 break; 419 } 420 index = index + 1; 421 duration = match other_values.remove(0) { 422 Amf0ValueType::Number(val) => Some(val), 423 _ => None, 424 }; 425 426 if index >= length { 427 break; 428 } 429 index = index + 1; 430 reset = match other_values.remove(0) { 431 Amf0ValueType::Boolean(val) => Some(val), 432 _ => None, 433 }; 434 break; 435 } 436 437 let mut event_messages = EventMessages::new(AsyncBytesWriter::new(self.io.clone())); 438 event_messages.stream_begin(stream_id.clone()).await?; 439 440 let mut netstream = NetStream::new(BytesWriter::new()); 441 match reset { 442 Some(val) => { 443 if val { 444 netstream.on_status( 445 transaction_id, 446 &"status".to_string(), 447 &"NetStream.Play.Reset".to_string(), 448 &"".to_string(), 449 )?; 450 } 451 } 452 _ => {} 453 } 454 455 netstream.on_status( 456 transaction_id, 457 &"status".to_string(), 458 &"NetStream.Play.Start".to_string(), 459 &"".to_string(), 460 )?; 461 462 event_messages.stream_is_record(stream_id.clone()).await?; 463 464 self.subscribe_from_channels(stream_name.unwrap()).await?; 465 self.state = ServerSessionState::Play; 466 467 Ok(()) 468 } 469 470 async fn subscribe_from_channels(&mut self, stream_name: String) -> Result<(), SessionError> { 471 let (sender, receiver) = oneshot::channel(); 472 let subscribe_event = ChannelEvent::Subscribe { 473 app_name: self.app_name.clone(), 474 stream_name, 475 responder: sender, 476 }; 477 478 let rv = self.event_producer.send(subscribe_event); 479 match rv { 480 Err(_) => { 481 return Err(SessionError { 482 value: SessionErrorValue::ChannelEventSendErr, 483 }) 484 } 485 _ => {} 486 } 487 488 match receiver.await { 489 Ok(consumer) => { 490 self.data_consumer = consumer; 491 } 492 Err(_) => {} 493 } 494 Ok(()) 495 } 496 497 pub async fn on_publish( 498 &mut self, 499 transaction_id: &f64, 500 stream_id: &u32, 501 other_values: &mut Vec<Amf0ValueType>, 502 ) -> Result<(), SessionError> { 503 let length = other_values.len(); 504 505 if length < 2 { 506 return Err(SessionError { 507 value: SessionErrorValue::Amf0ValueCountNotCorrect, 508 }); 509 } 510 511 let stream_name = match other_values.remove(0) { 512 Amf0ValueType::UTF8String(val) => val, 513 _ => { 514 return Err(SessionError { 515 value: SessionErrorValue::Amf0ValueCountNotCorrect, 516 }); 517 } 518 }; 519 520 let stream_type = match other_values.remove(0) { 521 Amf0ValueType::UTF8String(val) => val, 522 _ => { 523 return Err(SessionError { 524 value: SessionErrorValue::Amf0ValueCountNotCorrect, 525 }); 526 } 527 }; 528 529 let mut event_messages = EventMessages::new(AsyncBytesWriter::new(self.io.clone())); 530 event_messages.stream_begin(stream_id.clone()).await?; 531 532 let mut netstream = NetStream::new(BytesWriter::new()); 533 let data = netstream.on_status( 534 transaction_id, 535 &"status".to_string(), 536 &"NetStream.Publish.Start".to_string(), 537 &"".to_string(), 538 )?; 539 540 let mut chunk_info = ChunkInfo::new( 541 csid_type::COMMAND_AMF0_AMF3, 542 chunk_type::TYPE_0, 543 0, 544 data.len() as u32, 545 msg_type_id::COMMAND_AMF0, 546 0, 547 data, 548 ); 549 550 self.packetizer.write_chunk(&mut chunk_info).await?; 551 552 self.publish_to_channels(stream_name).await?; 553 554 Ok(()) 555 } 556 557 async fn publish_to_channels(&mut self, stream_name: String) -> Result<(), SessionError> { 558 let (sender, receiver) = oneshot::channel(); 559 let publish_event = ChannelEvent::Publish { 560 app_name: self.app_name.clone(), 561 stream_name, 562 responder: sender, 563 }; 564 565 let rv = self.event_producer.send(publish_event); 566 match rv { 567 Err(_) => { 568 return Err(SessionError { 569 value: SessionErrorValue::ChannelEventSendErr, 570 }) 571 } 572 _ => {} 573 } 574 575 match receiver.await { 576 Ok(producer) => { 577 self.data_producer = producer; 578 } 579 Err(_) => {} 580 } 581 Ok(()) 582 } 583 584 pub fn on_video_data( 585 &mut self, 586 data: &mut BytesMut, 587 timestamp: &u32, 588 ) -> Result<(), SessionError> { 589 let data = ChannelData::Video { 590 timestamp: timestamp.clone(), 591 data: data.clone(), 592 }; 593 594 match self.data_producer.send(data) { 595 Ok(size) => {} 596 Err(_) => { 597 return Err(SessionError { 598 value: SessionErrorValue::SendChannelDataErr, 599 }) 600 } 601 } 602 603 Ok(()) 604 } 605 606 pub fn on_audio_data( 607 &mut self, 608 data: &mut BytesMut, 609 timestamp: &u32, 610 ) -> Result<(), SessionError> { 611 let data = ChannelData::Audio { 612 timestamp: timestamp.clone(), 613 data: data.clone(), 614 }; 615 616 match self.data_producer.send(data) { 617 Ok(size) => {} 618 Err(_) => { 619 return Err(SessionError { 620 value: SessionErrorValue::SendChannelDataErr, 621 }) 622 } 623 } 624 625 Ok(()) 626 } 627 } 628