1 use { 2 super::{ 3 common::Common, 4 define, 5 define::SessionType, 6 errors::{SessionError, SessionErrorValue}, 7 }, 8 crate::{ 9 amf0::Amf0ValueType, 10 channels::define::ChannelEventProducer, 11 chunk::{ 12 define::CHUNK_SIZE, 13 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 14 }, 15 config, handshake, 16 handshake::{define::ServerHandshakeState, handshake_server::HandshakeServer}, 17 messages::{define::RtmpMessageData, parser::MessageParser}, 18 netconnection::writer::NetConnection, 19 netstream::writer::NetStreamWriter, 20 protocol_control_messages::writer::ProtocolControlMessagesWriter, 21 user_control_messages::writer::EventMessagesWriter, 22 }, 23 bytes::BytesMut, 24 bytesio::{bytes_writer::AsyncBytesWriter, bytesio::BytesIO}, 25 std::{collections::HashMap, sync::Arc, time::Duration}, 26 tokio::{net::TcpStream, sync::Mutex}, 27 uuid::Uuid, 28 }; 29 30 enum ServerSessionState { 31 Handshake, 32 ReadChunk, 33 // OnConnect, 34 // OnCreateStream, 35 //Publish, 36 Play, 37 } 38 39 pub struct ServerSession { 40 pub app_name: String, 41 pub stream_name: String, 42 io: Arc<Mutex<BytesIO>>, 43 handshaker: HandshakeServer, 44 unpacketizer: ChunkUnpacketizer, 45 state: ServerSessionState, 46 pub common: Common, 47 bytesio_data: BytesMut, 48 has_remaing_data: bool, 49 /* Used to mark the subscriber's the data producer 50 in channels and delete it from map when unsubscribe 51 is called. */ 52 pub subscriber_id: Uuid, 53 connect_command_object: Option<HashMap<String, Amf0ValueType>>, 54 } 55 56 impl ServerSession { 57 pub fn new(stream: TcpStream, event_producer: ChannelEventProducer) -> Self { 58 let net_io = Arc::new(Mutex::new(BytesIO::new(stream))); 59 let subscriber_id = Uuid::new_v4(); 60 Self { 61 app_name: String::from(""), 62 stream_name: String::from(""), 63 io: Arc::clone(&net_io), 64 handshaker: HandshakeServer::new(Arc::clone(&net_io)), 65 unpacketizer: ChunkUnpacketizer::new(), 66 state: ServerSessionState::Handshake, 67 common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Server), 68 subscriber_id, 69 bytesio_data: BytesMut::new(), 70 has_remaing_data: false, 71 connect_command_object: None, 72 } 73 } 74 75 pub async fn run(&mut self) -> Result<(), SessionError> { 76 loop { 77 match self.state { 78 ServerSessionState::Handshake => { 79 self.handshake().await?; 80 } 81 ServerSessionState::ReadChunk => { 82 self.read_parse_chunks().await?; 83 } 84 ServerSessionState::Play => { 85 self.play().await?; 86 } 87 } 88 } 89 90 //Ok(()) 91 } 92 93 async fn handshake(&mut self) -> Result<(), SessionError> { 94 let mut bytes_len = 0; 95 96 while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE { 97 self.bytesio_data = self.io.lock().await.read().await?; 98 bytes_len += self.bytesio_data.len(); 99 self.handshaker.extend_data(&self.bytesio_data[..]); 100 } 101 102 self.handshaker.handshake().await?; 103 104 match self.handshaker.state() { 105 ServerHandshakeState::Finish => { 106 self.state = ServerSessionState::ReadChunk; 107 108 let left_bytes = self.handshaker.get_remaining_bytes(); 109 if !left_bytes.is_empty() { 110 self.unpacketizer.extend_data(&left_bytes[..]); 111 self.has_remaing_data = true; 112 } 113 log::info!("[ S->C ] [send_set_chunk_size] "); 114 self.send_set_chunk_size().await?; 115 return Ok(()); 116 } 117 _ => {} 118 } 119 120 Ok(()) 121 } 122 123 async fn read_parse_chunks(&mut self) -> Result<(), SessionError> { 124 if !self.has_remaing_data { 125 match self 126 .io 127 .lock() 128 .await 129 .read_timeout(Duration::from_secs(2)) 130 .await 131 { 132 Ok(data) => { 133 self.bytesio_data = data; 134 } 135 Err(err) => { 136 self.common 137 .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) 138 .await?; 139 140 return Err(SessionError { 141 value: SessionErrorValue::BytesIOError(err), 142 }); 143 } 144 } 145 146 self.unpacketizer.extend_data(&self.bytesio_data[..]); 147 } 148 149 self.has_remaing_data = false; 150 151 loop { 152 let result = self.unpacketizer.read_chunks(); 153 154 if let Ok(rv) = result { 155 match rv { 156 UnpackResult::Chunks(chunks) => { 157 for chunk_info in chunks { 158 let timestamp = chunk_info.message_header.timestamp; 159 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 160 161 let mut msg = MessageParser::new(chunk_info).parse()?; 162 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 163 .await?; 164 } 165 } 166 _ => {} 167 } 168 } else { 169 break; 170 } 171 } 172 Ok(()) 173 } 174 175 async fn play(&mut self) -> Result<(), SessionError> { 176 match self.common.send_channel_data().await { 177 Ok(_) => {} 178 179 Err(err) => { 180 self.common 181 .unsubscribe_from_channels( 182 self.app_name.clone(), 183 self.stream_name.clone(), 184 self.subscriber_id, 185 ) 186 .await?; 187 return Err(err); 188 } 189 } 190 191 Ok(()) 192 } 193 194 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 195 let mut controlmessage = 196 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 197 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 198 199 Ok(()) 200 } 201 202 pub async fn process_messages( 203 &mut self, 204 rtmp_msg: &mut RtmpMessageData, 205 msg_stream_id: &u32, 206 timestamp: &u32, 207 ) -> Result<(), SessionError> { 208 match rtmp_msg { 209 RtmpMessageData::Amf0Command { 210 command_name, 211 transaction_id, 212 command_object, 213 others, 214 } => { 215 self.on_amf0_command_message( 216 msg_stream_id, 217 command_name, 218 transaction_id, 219 command_object, 220 others, 221 ) 222 .await? 223 } 224 RtmpMessageData::SetChunkSize { chunk_size } => { 225 self.on_set_chunk_size(*chunk_size as usize)?; 226 } 227 RtmpMessageData::AudioData { data } => { 228 self.common.on_audio_data(data, timestamp)?; 229 } 230 RtmpMessageData::VideoData { data } => { 231 self.common.on_video_data(data, timestamp)?; 232 } 233 RtmpMessageData::AmfData { raw_data } => { 234 self.common.on_meta_data(raw_data, timestamp)?; 235 } 236 237 _ => {} 238 } 239 Ok(()) 240 } 241 242 pub async fn on_amf0_command_message( 243 &mut self, 244 stream_id: &u32, 245 command_name: &Amf0ValueType, 246 transaction_id: &Amf0ValueType, 247 command_object: &Amf0ValueType, 248 others: &mut Vec<Amf0ValueType>, 249 ) -> Result<(), SessionError> { 250 let empty_cmd_name = &String::new(); 251 let cmd_name = match command_name { 252 Amf0ValueType::UTF8String(str) => str, 253 _ => empty_cmd_name, 254 }; 255 256 let transaction_id = match transaction_id { 257 Amf0ValueType::Number(number) => number, 258 _ => &0.0, 259 }; 260 261 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 262 let obj = match command_object { 263 Amf0ValueType::Object(obj) => obj, 264 _ => &empty_cmd_obj, 265 }; 266 267 match cmd_name.as_str() { 268 "connect" => { 269 log::info!("[ S<-C ] [connect] "); 270 self.on_connect(transaction_id, obj).await?; 271 } 272 "createStream" => { 273 log::info!("[ S<-C ] [create stream] "); 274 self.on_create_stream(transaction_id).await?; 275 } 276 "deleteStream" => { 277 if !others.is_empty() { 278 let stream_id = match others.pop() { 279 Some(val) => match val { 280 Amf0ValueType::Number(streamid) => streamid, 281 _ => 0.0, 282 }, 283 _ => 0.0, 284 }; 285 log::info!( 286 "[ S<-C ] [delete stream] app_name: {}, stream_name: {}", 287 self.app_name, 288 self.stream_name 289 ); 290 291 self.on_delete_stream(transaction_id, &stream_id).await?; 292 } 293 } 294 "play" => { 295 log::info!( 296 "[ S<-C ] [play] app_name: {}, stream_name: {}", 297 self.app_name, 298 self.stream_name 299 ); 300 self.unpacketizer.session_type = config::SERVER_PULL; 301 self.on_play(transaction_id, stream_id, others).await?; 302 } 303 "publish" => { 304 self.unpacketizer.session_type = config::SERVER_PUSH; 305 self.on_publish(transaction_id, stream_id, others).await?; 306 } 307 _ => {} 308 } 309 310 Ok(()) 311 } 312 313 fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> { 314 log::info!( 315 "[ S<-C ] [set chunk size] app_name: {}, stream_name: {}, chunk size: {}", 316 self.app_name, 317 self.stream_name, 318 chunk_size 319 ); 320 self.unpacketizer.update_max_chunk_size(chunk_size); 321 Ok(()) 322 } 323 324 async fn on_connect( 325 &mut self, 326 transaction_id: &f64, 327 command_obj: &HashMap<String, Amf0ValueType>, 328 ) -> Result<(), SessionError> { 329 self.connect_command_object = Some(command_obj.clone()); 330 let mut control_message = 331 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 332 log::info!("[ S->C ] [set window_acknowledgement_size]"); 333 control_message 334 .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 335 .await?; 336 337 log::info!("[ S->C ] [set set_peer_bandwidth]",); 338 control_message 339 .write_set_peer_bandwidth( 340 define::PEER_BANDWIDTH, 341 define::peer_bandwidth_limit_type::DYNAMIC, 342 ) 343 .await?; 344 345 let obj_encoding = command_obj.get("objectEncoding"); 346 let encoding = match obj_encoding { 347 Some(Amf0ValueType::Number(encoding)) => encoding, 348 _ => &define::OBJENCODING_AMF0, 349 }; 350 351 let app_name = command_obj.get("app"); 352 self.app_name = match app_name { 353 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 354 _ => { 355 return Err(SessionError { 356 value: SessionErrorValue::NoAppName, 357 }); 358 } 359 }; 360 361 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 362 log::info!("[ S->C ] [set connect_response]",); 363 netconnection 364 .write_connect_response( 365 transaction_id, 366 &define::FMSVER.to_string(), 367 &define::CAPABILITIES, 368 &String::from("NetConnection.Connect.Success"), 369 &define::LEVEL.to_string(), 370 &String::from("Connection Succeeded."), 371 encoding, 372 ) 373 .await?; 374 375 Ok(()) 376 } 377 378 pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 379 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 380 netconnection 381 .write_create_stream_response(transaction_id, &define::STREAM_ID) 382 .await?; 383 384 log::info!( 385 "[ S->C ] [create_stream_response] app_name: {}", 386 self.app_name, 387 ); 388 389 Ok(()) 390 } 391 392 pub async fn on_delete_stream( 393 &mut self, 394 transaction_id: &f64, 395 stream_id: &f64, 396 ) -> Result<(), SessionError> { 397 self.common 398 .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) 399 .await?; 400 401 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 402 netstream 403 .write_on_status( 404 transaction_id, 405 &"status".to_string(), 406 &"NetStream.DeleteStream.Suceess".to_string(), 407 &"".to_string(), 408 ) 409 .await?; 410 411 //self.unsubscribe_from_channels().await?; 412 log::info!( 413 "[ S->C ] [delete stream success] app_name: {}, stream_name: {}", 414 self.app_name, 415 self.stream_name 416 ); 417 log::trace!("{}", stream_id); 418 419 Ok(()) 420 } 421 pub async fn on_play( 422 &mut self, 423 transaction_id: &f64, 424 stream_id: &u32, 425 other_values: &mut Vec<Amf0ValueType>, 426 ) -> Result<(), SessionError> { 427 let length = other_values.len() as u8; 428 let mut index: u8 = 0; 429 430 let mut stream_name: Option<String> = None; 431 let mut start: Option<f64> = None; 432 let mut duration: Option<f64> = None; 433 let mut reset: Option<bool> = None; 434 435 loop { 436 if index >= length { 437 break; 438 } 439 index += 1; 440 stream_name = match other_values.remove(0) { 441 Amf0ValueType::UTF8String(val) => Some(val), 442 _ => None, 443 }; 444 445 if index >= length { 446 break; 447 } 448 index += 1; 449 start = match other_values.remove(0) { 450 Amf0ValueType::Number(val) => Some(val), 451 _ => None, 452 }; 453 454 if index >= length { 455 break; 456 } 457 index += 1; 458 duration = match other_values.remove(0) { 459 Amf0ValueType::Number(val) => Some(val), 460 _ => None, 461 }; 462 463 if index >= length { 464 break; 465 } 466 //index = index + 1; 467 reset = match other_values.remove(0) { 468 Amf0ValueType::Boolean(val) => Some(val), 469 _ => None, 470 }; 471 break; 472 } 473 474 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 475 event_messages.write_stream_begin(*stream_id).await?; 476 log::info!( 477 "[ S->C ] [stream begin] app_name: {}, stream_name: {}", 478 self.app_name, 479 self.stream_name 480 ); 481 log::trace!( 482 "{} {} {}", 483 start.is_some(), 484 duration.is_some(), 485 reset.is_some() 486 ); 487 488 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 489 netstream 490 .write_on_status( 491 transaction_id, 492 &"status".to_string(), 493 &"NetStream.Play.Reset".to_string(), 494 &"reset".to_string(), 495 ) 496 .await?; 497 498 netstream 499 .write_on_status( 500 transaction_id, 501 &"status".to_string(), 502 &"NetStream.Play.Start".to_string(), 503 &"play start".to_string(), 504 ) 505 .await?; 506 507 netstream 508 .write_on_status( 509 transaction_id, 510 &"status".to_string(), 511 &"NetStream.Data.Start".to_string(), 512 &"data start.".to_string(), 513 ) 514 .await?; 515 516 netstream 517 .write_on_status( 518 transaction_id, 519 &"status".to_string(), 520 &"NetStream.Play.PublishNotify".to_string(), 521 &"play publish notify.".to_string(), 522 ) 523 .await?; 524 525 event_messages 526 .write_stream_is_record(*stream_id) 527 .await?; 528 log::info!( 529 "[ S->C ] [stream is record] app_name: {}, stream_name: {}", 530 self.app_name, 531 self.stream_name 532 ); 533 self.stream_name = stream_name.clone().unwrap(); 534 self.common 535 .subscribe_from_channels( 536 self.app_name.clone(), 537 stream_name.unwrap(), 538 self.subscriber_id, 539 ) 540 .await?; 541 542 self.state = ServerSessionState::Play; 543 544 Ok(()) 545 } 546 547 pub async fn on_publish( 548 &mut self, 549 transaction_id: &f64, 550 stream_id: &u32, 551 other_values: &mut Vec<Amf0ValueType>, 552 ) -> Result<(), SessionError> { 553 let length = other_values.len(); 554 555 if length < 2 { 556 return Err(SessionError { 557 value: SessionErrorValue::Amf0ValueCountNotCorrect, 558 }); 559 } 560 561 let stream_name = match other_values.remove(0) { 562 Amf0ValueType::UTF8String(val) => val, 563 _ => { 564 return Err(SessionError { 565 value: SessionErrorValue::Amf0ValueCountNotCorrect, 566 }); 567 } 568 }; 569 570 self.stream_name = stream_name; 571 572 let _ = match other_values.remove(0) { 573 Amf0ValueType::UTF8String(val) => val, 574 _ => { 575 return Err(SessionError { 576 value: SessionErrorValue::Amf0ValueCountNotCorrect, 577 }); 578 } 579 }; 580 581 log::info!( 582 "[ S<-C ] [publish] app_name: {}, stream_name: {}", 583 self.app_name, 584 self.stream_name 585 ); 586 587 log::info!( 588 "[ S->C ] [stream begin] app_name: {}, stream_name: {}", 589 self.app_name, 590 self.stream_name 591 ); 592 593 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 594 event_messages.write_stream_begin(*stream_id).await?; 595 596 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 597 netstream 598 .write_on_status( 599 transaction_id, 600 &"status".to_string(), 601 &"NetStream.Publish.Start".to_string(), 602 &"".to_string(), 603 ) 604 .await?; 605 log::info!( 606 "[ S->C ] [NetStream.Publish.Start] app_name: {}, stream_name: {}", 607 self.app_name, 608 self.stream_name 609 ); 610 611 self.common 612 .publish_to_channels(self.app_name.clone(), self.stream_name.clone()) 613 .await?; 614 615 Ok(()) 616 } 617 } 618