1 use { 2 super::{ 3 common::Common, 4 define, 5 define::SessionType, 6 errors::{SessionError, SessionErrorValue}, 7 }, 8 crate::{ 9 amf0::Amf0ValueType, 10 channels::define::ChannelEventProducer, 11 chunk::{ 12 define::CHUNK_SIZE, 13 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 14 }, 15 config, handshake, 16 handshake::{define::ServerHandshakeState, handshake_server::HandshakeServer}, 17 messages::{define::RtmpMessageData, parser::MessageParser}, 18 netconnection::writer::NetConnection, 19 netstream::writer::NetStreamWriter, 20 protocol_control_messages::writer::ProtocolControlMessagesWriter, 21 user_control_messages::writer::EventMessagesWriter, 22 }, 23 bytes::BytesMut, 24 bytesio::{bytes_writer::AsyncBytesWriter, bytesio::BytesIO}, 25 std::{collections::HashMap, sync::Arc, time::Duration}, 26 tokio::{net::TcpStream, sync::Mutex}, 27 uuid::Uuid, 28 }; 29 30 enum ServerSessionState { 31 Handshake, 32 ReadChunk, 33 // OnConnect, 34 // OnCreateStream, 35 //Publish, 36 DeleteStream, 37 Play, 38 } 39 40 pub struct ServerSession { 41 pub app_name: String, 42 pub stream_name: String, 43 io: Arc<Mutex<BytesIO>>, 44 handshaker: HandshakeServer, 45 unpacketizer: ChunkUnpacketizer, 46 state: ServerSessionState, 47 pub common: Common, 48 bytesio_data: BytesMut, 49 has_remaing_data: bool, 50 /* Used to mark the subscriber's the data producer 51 in channels and delete it from map when unsubscribe 52 is called. */ 53 pub subscriber_id: Uuid, 54 connect_command_object: Option<HashMap<String, Amf0ValueType>>, 55 } 56 57 impl ServerSession { 58 pub fn new(stream: TcpStream, event_producer: ChannelEventProducer) -> Self { 59 let net_io = Arc::new(Mutex::new(BytesIO::new(stream))); 60 let subscriber_id = Uuid::new_v4(); 61 Self { 62 app_name: String::from(""), 63 stream_name: String::from(""), 64 io: Arc::clone(&net_io), 65 handshaker: HandshakeServer::new(Arc::clone(&net_io)), 66 unpacketizer: ChunkUnpacketizer::new(), 67 state: ServerSessionState::Handshake, 68 common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Server), 69 subscriber_id, 70 bytesio_data: BytesMut::new(), 71 has_remaing_data: false, 72 connect_command_object: None, 73 } 74 } 75 76 pub async fn run(&mut self) -> Result<(), SessionError> { 77 loop { 78 match self.state { 79 ServerSessionState::Handshake => { 80 self.handshake().await?; 81 } 82 ServerSessionState::ReadChunk => { 83 self.read_parse_chunks().await?; 84 } 85 ServerSessionState::Play => { 86 self.play().await?; 87 } 88 ServerSessionState::DeleteStream => { 89 return Ok(()); 90 } 91 } 92 } 93 94 //Ok(()) 95 } 96 97 async fn handshake(&mut self) -> Result<(), SessionError> { 98 let mut bytes_len = 0; 99 100 while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE { 101 self.bytesio_data = self.io.lock().await.read().await?; 102 bytes_len += self.bytesio_data.len(); 103 self.handshaker.extend_data(&self.bytesio_data[..]); 104 } 105 106 self.handshaker.handshake().await?; 107 108 if let ServerHandshakeState::Finish = self.handshaker.state() { 109 self.state = ServerSessionState::ReadChunk; 110 let left_bytes = self.handshaker.get_remaining_bytes(); 111 if !left_bytes.is_empty() { 112 self.unpacketizer.extend_data(&left_bytes[..]); 113 self.has_remaing_data = true; 114 } 115 log::info!("[ S->C ] [send_set_chunk_size] "); 116 self.send_set_chunk_size().await?; 117 return Ok(()); 118 } 119 120 Ok(()) 121 } 122 123 async fn read_parse_chunks(&mut self) -> Result<(), SessionError> { 124 if !self.has_remaing_data { 125 match self 126 .io 127 .lock() 128 .await 129 .read_timeout(Duration::from_secs(2)) 130 .await 131 { 132 Ok(data) => { 133 self.bytesio_data = data; 134 } 135 Err(err) => { 136 self.common 137 .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) 138 .await?; 139 140 return Err(SessionError { 141 value: SessionErrorValue::BytesIOError(err), 142 }); 143 } 144 } 145 146 self.unpacketizer.extend_data(&self.bytesio_data[..]); 147 } 148 149 self.has_remaing_data = false; 150 151 loop { 152 let result = self.unpacketizer.read_chunks(); 153 154 if let Ok(rv) = result { 155 if let UnpackResult::Chunks(chunks) = rv { 156 for chunk_info in chunks { 157 let timestamp = chunk_info.message_header.timestamp; 158 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 159 160 let mut msg = MessageParser::new(chunk_info).parse()?; 161 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 162 .await?; 163 } 164 } 165 } else { 166 break; 167 } 168 } 169 Ok(()) 170 } 171 172 async fn play(&mut self) -> Result<(), SessionError> { 173 match self.common.send_channel_data().await { 174 Ok(_) => {} 175 176 Err(err) => { 177 self.common 178 .unsubscribe_from_channels( 179 self.app_name.clone(), 180 self.stream_name.clone(), 181 self.subscriber_id, 182 ) 183 .await?; 184 return Err(err); 185 } 186 } 187 188 Ok(()) 189 } 190 191 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 192 let mut controlmessage = 193 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 194 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 195 196 Ok(()) 197 } 198 199 pub async fn process_messages( 200 &mut self, 201 rtmp_msg: &mut RtmpMessageData, 202 msg_stream_id: &u32, 203 timestamp: &u32, 204 ) -> Result<(), SessionError> { 205 match rtmp_msg { 206 RtmpMessageData::Amf0Command { 207 command_name, 208 transaction_id, 209 command_object, 210 others, 211 } => { 212 self.on_amf0_command_message( 213 msg_stream_id, 214 command_name, 215 transaction_id, 216 command_object, 217 others, 218 ) 219 .await? 220 } 221 RtmpMessageData::SetChunkSize { chunk_size } => { 222 self.on_set_chunk_size(*chunk_size as usize)?; 223 } 224 RtmpMessageData::AudioData { data } => { 225 self.common.on_audio_data(data, timestamp)?; 226 } 227 RtmpMessageData::VideoData { data } => { 228 self.common.on_video_data(data, timestamp)?; 229 } 230 RtmpMessageData::AmfData { raw_data } => { 231 self.common.on_meta_data(raw_data, timestamp)?; 232 } 233 234 _ => {} 235 } 236 Ok(()) 237 } 238 239 pub async fn on_amf0_command_message( 240 &mut self, 241 stream_id: &u32, 242 command_name: &Amf0ValueType, 243 transaction_id: &Amf0ValueType, 244 command_object: &Amf0ValueType, 245 others: &mut Vec<Amf0ValueType>, 246 ) -> Result<(), SessionError> { 247 let empty_cmd_name = &String::new(); 248 let cmd_name = match command_name { 249 Amf0ValueType::UTF8String(str) => str, 250 _ => empty_cmd_name, 251 }; 252 253 let transaction_id = match transaction_id { 254 Amf0ValueType::Number(number) => number, 255 _ => &0.0, 256 }; 257 258 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 259 let obj = match command_object { 260 Amf0ValueType::Object(obj) => obj, 261 _ => &empty_cmd_obj, 262 }; 263 264 match cmd_name.as_str() { 265 "connect" => { 266 log::info!("[ S<-C ] [connect] "); 267 self.on_connect(transaction_id, obj).await?; 268 } 269 "createStream" => { 270 log::info!("[ S<-C ] [create stream] "); 271 self.on_create_stream(transaction_id).await?; 272 } 273 "deleteStream" => { 274 if !others.is_empty() { 275 let stream_id = match others.pop() { 276 Some(Amf0ValueType::Number(streamid)) => streamid, 277 _ => 0.0, 278 }; 279 280 log::info!( 281 "[ S<-C ] [delete stream] app_name: {}, stream_name: {}", 282 self.app_name, 283 self.stream_name 284 ); 285 286 self.on_delete_stream(transaction_id, &stream_id).await?; 287 self.state = ServerSessionState::DeleteStream; 288 } 289 } 290 "play" => { 291 log::info!( 292 "[ S<-C ] [play] app_name: {}, stream_name: {}", 293 self.app_name, 294 self.stream_name 295 ); 296 self.unpacketizer.session_type = config::SERVER_PULL; 297 self.on_play(transaction_id, stream_id, others).await?; 298 } 299 "publish" => { 300 self.unpacketizer.session_type = config::SERVER_PUSH; 301 self.on_publish(transaction_id, stream_id, others).await?; 302 } 303 _ => {} 304 } 305 306 Ok(()) 307 } 308 309 fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> { 310 log::info!( 311 "[ S<-C ] [set chunk size] app_name: {}, stream_name: {}, chunk size: {}", 312 self.app_name, 313 self.stream_name, 314 chunk_size 315 ); 316 self.unpacketizer.update_max_chunk_size(chunk_size); 317 Ok(()) 318 } 319 320 async fn on_connect( 321 &mut self, 322 transaction_id: &f64, 323 command_obj: &HashMap<String, Amf0ValueType>, 324 ) -> Result<(), SessionError> { 325 self.connect_command_object = Some(command_obj.clone()); 326 let mut control_message = 327 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 328 log::info!("[ S->C ] [set window_acknowledgement_size]"); 329 control_message 330 .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 331 .await?; 332 333 log::info!("[ S->C ] [set set_peer_bandwidth]",); 334 control_message 335 .write_set_peer_bandwidth( 336 define::PEER_BANDWIDTH, 337 define::peer_bandwidth_limit_type::DYNAMIC, 338 ) 339 .await?; 340 341 let obj_encoding = command_obj.get("objectEncoding"); 342 let encoding = match obj_encoding { 343 Some(Amf0ValueType::Number(encoding)) => encoding, 344 _ => &define::OBJENCODING_AMF0, 345 }; 346 347 let app_name = command_obj.get("app"); 348 self.app_name = match app_name { 349 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 350 _ => { 351 return Err(SessionError { 352 value: SessionErrorValue::NoAppName, 353 }); 354 } 355 }; 356 357 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 358 log::info!("[ S->C ] [set connect_response]",); 359 netconnection 360 .write_connect_response( 361 transaction_id, 362 define::FMSVER, 363 &define::CAPABILITIES, 364 &String::from("NetConnection.Connect.Success"), 365 define::LEVEL, 366 &String::from("Connection Succeeded."), 367 encoding, 368 ) 369 .await?; 370 371 Ok(()) 372 } 373 374 pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 375 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 376 netconnection 377 .write_create_stream_response(transaction_id, &define::STREAM_ID) 378 .await?; 379 380 log::info!( 381 "[ S->C ] [create_stream_response] app_name: {}", 382 self.app_name, 383 ); 384 385 Ok(()) 386 } 387 388 pub async fn on_delete_stream( 389 &mut self, 390 transaction_id: &f64, 391 stream_id: &f64, 392 ) -> Result<(), SessionError> { 393 self.common 394 .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) 395 .await?; 396 397 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 398 netstream 399 .write_on_status( 400 transaction_id, 401 "status", 402 "NetStream.DeleteStream.Suceess", 403 "", 404 ) 405 .await?; 406 407 //self.unsubscribe_from_channels().await?; 408 log::info!( 409 "[ S->C ] [delete stream success] app_name: {}, stream_name: {}", 410 self.app_name, 411 self.stream_name 412 ); 413 log::trace!("{}", stream_id); 414 415 Ok(()) 416 } 417 418 #[allow(clippy::never_loop)] 419 pub async fn on_play( 420 &mut self, 421 transaction_id: &f64, 422 stream_id: &u32, 423 other_values: &mut Vec<Amf0ValueType>, 424 ) -> Result<(), SessionError> { 425 let length = other_values.len() as u8; 426 let mut index: u8 = 0; 427 428 let mut stream_name: Option<String> = None; 429 let mut start: Option<f64> = None; 430 let mut duration: Option<f64> = None; 431 let mut reset: Option<bool> = None; 432 433 loop { 434 if index >= length { 435 break; 436 } 437 index += 1; 438 stream_name = match other_values.remove(0) { 439 Amf0ValueType::UTF8String(val) => Some(val), 440 _ => None, 441 }; 442 443 if index >= length { 444 break; 445 } 446 index += 1; 447 start = match other_values.remove(0) { 448 Amf0ValueType::Number(val) => Some(val), 449 _ => None, 450 }; 451 452 if index >= length { 453 break; 454 } 455 index += 1; 456 duration = match other_values.remove(0) { 457 Amf0ValueType::Number(val) => Some(val), 458 _ => None, 459 }; 460 461 if index >= length { 462 break; 463 } 464 //index = index + 1; 465 reset = match other_values.remove(0) { 466 Amf0ValueType::Boolean(val) => Some(val), 467 _ => None, 468 }; 469 break; 470 } 471 472 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 473 event_messages.write_stream_begin(*stream_id).await?; 474 log::info!( 475 "[ S->C ] [stream begin] app_name: {}, stream_name: {}", 476 self.app_name, 477 self.stream_name 478 ); 479 log::trace!( 480 "{} {} {}", 481 start.is_some(), 482 duration.is_some(), 483 reset.is_some() 484 ); 485 486 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 487 netstream 488 .write_on_status(transaction_id, "status", "NetStream.Play.Reset", "reset") 489 .await?; 490 491 netstream 492 .write_on_status( 493 transaction_id, 494 "status", 495 "NetStream.Play.Start", 496 "play start", 497 ) 498 .await?; 499 500 netstream 501 .write_on_status( 502 transaction_id, 503 "status", 504 "NetStream.Data.Start", 505 "data start.", 506 ) 507 .await?; 508 509 netstream 510 .write_on_status( 511 transaction_id, 512 "status", 513 "NetStream.Play.PublishNotify", 514 "play publish notify.", 515 ) 516 .await?; 517 518 event_messages.write_stream_is_record(*stream_id).await?; 519 log::info!( 520 "[ S->C ] [stream is record] app_name: {}, stream_name: {}", 521 self.app_name, 522 self.stream_name 523 ); 524 self.stream_name = stream_name.clone().unwrap(); 525 self.common 526 .subscribe_from_channels( 527 self.app_name.clone(), 528 stream_name.unwrap(), 529 self.subscriber_id, 530 ) 531 .await?; 532 533 self.state = ServerSessionState::Play; 534 535 Ok(()) 536 } 537 538 pub async fn on_publish( 539 &mut self, 540 transaction_id: &f64, 541 stream_id: &u32, 542 other_values: &mut Vec<Amf0ValueType>, 543 ) -> Result<(), SessionError> { 544 let length = other_values.len(); 545 546 if length < 2 { 547 return Err(SessionError { 548 value: SessionErrorValue::Amf0ValueCountNotCorrect, 549 }); 550 } 551 552 let stream_name = match other_values.remove(0) { 553 Amf0ValueType::UTF8String(val) => val, 554 _ => { 555 return Err(SessionError { 556 value: SessionErrorValue::Amf0ValueCountNotCorrect, 557 }); 558 } 559 }; 560 561 self.stream_name = stream_name; 562 563 let _ = match other_values.remove(0) { 564 Amf0ValueType::UTF8String(val) => val, 565 _ => { 566 return Err(SessionError { 567 value: SessionErrorValue::Amf0ValueCountNotCorrect, 568 }); 569 } 570 }; 571 572 log::info!( 573 "[ S<-C ] [publish] app_name: {}, stream_name: {}", 574 self.app_name, 575 self.stream_name 576 ); 577 578 log::info!( 579 "[ S->C ] [stream begin] app_name: {}, stream_name: {}", 580 self.app_name, 581 self.stream_name 582 ); 583 584 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 585 event_messages.write_stream_begin(*stream_id).await?; 586 587 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 588 netstream 589 .write_on_status(transaction_id, "status", "NetStream.Publish.Start", "") 590 .await?; 591 log::info!( 592 "[ S->C ] [NetStream.Publish.Start] app_name: {}, stream_name: {}", 593 self.app_name, 594 self.stream_name 595 ); 596 597 self.common 598 .publish_to_channels(self.app_name.clone(), self.stream_name.clone()) 599 .await?; 600 601 Ok(()) 602 } 603 } 604