1 use { 2 super::{ 3 common::Common, 4 define, 5 define::SessionType, 6 errors::{SessionError, SessionErrorValue}, 7 }, 8 crate::{ 9 amf0::Amf0ValueType, 10 channels::define::ChannelEventProducer, 11 chunk::{ 12 define::CHUNK_SIZE, 13 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 14 }, 15 config, handshake, 16 handshake::{define::ServerHandshakeState, handshake_server::HandshakeServer}, 17 messages::{define::RtmpMessageData, parser::MessageParser}, 18 netconnection::writer::NetConnection, 19 netstream::writer::NetStreamWriter, 20 protocol_control_messages::writer::ProtocolControlMessagesWriter, 21 user_control_messages::writer::EventMessagesWriter, 22 }, 23 bytes::BytesMut, 24 bytesio::{bytes_writer::AsyncBytesWriter, bytesio::BytesIO}, 25 std::{collections::HashMap, sync::Arc, time::Duration}, 26 tokio::{net::TcpStream, sync::Mutex}, 27 uuid::Uuid, 28 }; 29 30 enum ServerSessionState { 31 Handshake, 32 ReadChunk, 33 // OnConnect, 34 // OnCreateStream, 35 //Publish, 36 Play, 37 } 38 39 pub struct ServerSession { 40 pub app_name: String, 41 pub stream_name: String, 42 43 io: Arc<Mutex<BytesIO>>, 44 handshaker: HandshakeServer, 45 46 unpacketizer: ChunkUnpacketizer, 47 48 state: ServerSessionState, 49 50 pub common: Common, 51 52 bytesio_data: BytesMut, 53 has_remaing_data: bool, 54 55 /* Used to mark the subscriber's the data producer 56 in channels and delete it from map when unsubscribe 57 is called. */ 58 pub subscriber_id: Uuid, 59 60 connect_command_object: Option<HashMap<String, Amf0ValueType>>, 61 } 62 63 impl ServerSession { 64 pub fn new(stream: TcpStream, event_producer: ChannelEventProducer) -> Self { 65 let net_io = Arc::new(Mutex::new(BytesIO::new(stream))); 66 let subscriber_id = Uuid::new_v4(); 67 Self { 68 app_name: String::from(""), 69 stream_name: String::from(""), 70 71 io: Arc::clone(&net_io), 72 handshaker: HandshakeServer::new(Arc::clone(&net_io)), 73 74 unpacketizer: ChunkUnpacketizer::new(), 75 76 state: ServerSessionState::Handshake, 77 78 common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Server), 79 80 subscriber_id, 81 bytesio_data: BytesMut::new(), 82 has_remaing_data: false, 83 84 connect_command_object: None, 85 } 86 } 87 88 pub async fn run(&mut self) -> Result<(), SessionError> { 89 loop { 90 match self.state { 91 ServerSessionState::Handshake => { 92 self.handshake().await?; 93 } 94 ServerSessionState::ReadChunk => { 95 self.read_parse_chunks().await?; 96 } 97 ServerSessionState::Play => { 98 self.play().await?; 99 } 100 } 101 } 102 103 //Ok(()) 104 } 105 106 async fn handshake(&mut self) -> Result<(), SessionError> { 107 let mut bytes_len = 0; 108 109 while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE { 110 self.bytesio_data = self.io.lock().await.read().await?; 111 bytes_len += self.bytesio_data.len(); 112 self.handshaker.extend_data(&self.bytesio_data[..]); 113 } 114 115 self.handshaker.handshake().await?; 116 117 match self.handshaker.state() { 118 ServerHandshakeState::Finish => { 119 self.state = ServerSessionState::ReadChunk; 120 121 let left_bytes = self.handshaker.get_remaining_bytes(); 122 if left_bytes.len() > 0 { 123 self.unpacketizer.extend_data(&left_bytes[..]); 124 self.has_remaing_data = true; 125 } 126 log::info!("[ S->C ] [send_set_chunk_size] "); 127 self.send_set_chunk_size().await?; 128 return Ok(()); 129 } 130 _ => {} 131 } 132 133 Ok(()) 134 } 135 136 async fn read_parse_chunks(&mut self) -> Result<(), SessionError> { 137 if !self.has_remaing_data { 138 match self 139 .io 140 .lock() 141 .await 142 .read_timeout(Duration::from_secs(2)) 143 .await 144 { 145 Ok(data) => { 146 self.bytesio_data = data; 147 } 148 Err(err) => { 149 self.common 150 .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) 151 .await?; 152 153 return Err(SessionError { 154 value: SessionErrorValue::BytesIOError(err), 155 }); 156 } 157 } 158 159 self.unpacketizer.extend_data(&self.bytesio_data[..]); 160 } 161 162 self.has_remaing_data = false; 163 164 loop { 165 let result = self.unpacketizer.read_chunks(); 166 167 if let Ok(rv) = result { 168 match rv { 169 UnpackResult::Chunks(chunks) => { 170 for chunk_info in chunks { 171 let timestamp = chunk_info.message_header.timestamp; 172 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 173 174 let mut msg = MessageParser::new(chunk_info).parse()?; 175 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 176 .await?; 177 } 178 } 179 _ => {} 180 } 181 } else { 182 break; 183 } 184 } 185 Ok(()) 186 } 187 188 async fn play(&mut self) -> Result<(), SessionError> { 189 match self.common.send_channel_data().await { 190 Ok(_) => {} 191 192 Err(err) => { 193 self.common 194 .unsubscribe_from_channels( 195 self.app_name.clone(), 196 self.stream_name.clone(), 197 self.subscriber_id, 198 ) 199 .await?; 200 return Err(err); 201 } 202 } 203 204 Ok(()) 205 } 206 207 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 208 let mut controlmessage = 209 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 210 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 211 212 Ok(()) 213 } 214 215 pub async fn process_messages( 216 &mut self, 217 rtmp_msg: &mut RtmpMessageData, 218 msg_stream_id: &u32, 219 timestamp: &u32, 220 ) -> Result<(), SessionError> { 221 match rtmp_msg { 222 RtmpMessageData::Amf0Command { 223 command_name, 224 transaction_id, 225 command_object, 226 others, 227 } => { 228 self.on_amf0_command_message( 229 msg_stream_id, 230 command_name, 231 transaction_id, 232 command_object, 233 others, 234 ) 235 .await? 236 } 237 RtmpMessageData::SetChunkSize { chunk_size } => { 238 self.on_set_chunk_size(chunk_size.clone() as usize)?; 239 } 240 RtmpMessageData::AudioData { data } => { 241 self.common.on_audio_data(data, timestamp)?; 242 } 243 RtmpMessageData::VideoData { data } => { 244 self.common.on_video_data(data, timestamp)?; 245 } 246 RtmpMessageData::AmfData { raw_data } => { 247 self.common.on_meta_data(raw_data, timestamp)?; 248 } 249 250 _ => {} 251 } 252 Ok(()) 253 } 254 255 pub async fn on_amf0_command_message( 256 &mut self, 257 stream_id: &u32, 258 command_name: &Amf0ValueType, 259 transaction_id: &Amf0ValueType, 260 command_object: &Amf0ValueType, 261 others: &mut Vec<Amf0ValueType>, 262 ) -> Result<(), SessionError> { 263 let empty_cmd_name = &String::new(); 264 let cmd_name = match command_name { 265 Amf0ValueType::UTF8String(str) => str, 266 _ => empty_cmd_name, 267 }; 268 269 let transaction_id = match transaction_id { 270 Amf0ValueType::Number(number) => number, 271 _ => &0.0, 272 }; 273 274 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 275 let obj = match command_object { 276 Amf0ValueType::Object(obj) => obj, 277 _ => &empty_cmd_obj, 278 }; 279 280 match cmd_name.as_str() { 281 "connect" => { 282 log::info!("[ S<-C ] [connect] "); 283 self.on_connect(&transaction_id, &obj).await?; 284 } 285 "createStream" => { 286 log::info!("[ S<-C ] [create stream] "); 287 self.on_create_stream(transaction_id).await?; 288 } 289 "deleteStream" => { 290 if others.len() > 0 { 291 let stream_id = match others.pop() { 292 Some(val) => match val { 293 Amf0ValueType::Number(streamid) => streamid, 294 _ => 0.0, 295 }, 296 _ => 0.0, 297 }; 298 log::info!( 299 "[ S<-C ] [delete stream] app_name: {}, stream_name: {}", 300 self.app_name, 301 self.stream_name 302 ); 303 304 self.on_delete_stream(transaction_id, &stream_id).await?; 305 } 306 } 307 "play" => { 308 log::info!( 309 "[ S<-C ] [play] app_name: {}, stream_name: {}", 310 self.app_name, 311 self.stream_name 312 ); 313 self.unpacketizer.session_type = config::SERVER_PULL; 314 self.on_play(transaction_id, stream_id, others).await?; 315 } 316 "publish" => { 317 self.unpacketizer.session_type = config::SERVER_PUSH; 318 self.on_publish(transaction_id, stream_id, others).await?; 319 } 320 _ => {} 321 } 322 323 Ok(()) 324 } 325 326 fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> { 327 log::info!( 328 "[ S<-C ] [set chunk size] app_name: {}, stream_name: {}, chunk size: {}", 329 self.app_name, 330 self.stream_name, 331 chunk_size 332 ); 333 self.unpacketizer.update_max_chunk_size(chunk_size); 334 Ok(()) 335 } 336 337 async fn on_connect( 338 &mut self, 339 transaction_id: &f64, 340 command_obj: &HashMap<String, Amf0ValueType>, 341 ) -> Result<(), SessionError> { 342 self.connect_command_object = Some(command_obj.clone()); 343 let mut control_message = 344 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 345 log::info!("[ S->C ] [set window_acknowledgement_size]"); 346 control_message 347 .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 348 .await?; 349 350 log::info!("[ S->C ] [set set_peer_bandwidth]",); 351 control_message 352 .write_set_peer_bandwidth( 353 define::PEER_BANDWIDTH, 354 define::peer_bandwidth_limit_type::DYNAMIC, 355 ) 356 .await?; 357 358 let obj_encoding = command_obj.get("objectEncoding"); 359 let encoding = match obj_encoding { 360 Some(Amf0ValueType::Number(encoding)) => encoding, 361 _ => &define::OBJENCODING_AMF0, 362 }; 363 364 let app_name = command_obj.get("app"); 365 self.app_name = match app_name { 366 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 367 _ => { 368 return Err(SessionError { 369 value: SessionErrorValue::NoAppName, 370 }); 371 } 372 }; 373 374 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 375 log::info!("[ S->C ] [set connect_response]",); 376 netconnection 377 .write_connect_response( 378 &transaction_id, 379 &define::FMSVER.to_string(), 380 &define::CAPABILITIES, 381 &String::from("NetConnection.Connect.Success"), 382 &define::LEVEL.to_string(), 383 &String::from("Connection Succeeded."), 384 encoding, 385 ) 386 .await?; 387 388 Ok(()) 389 } 390 391 pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 392 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 393 netconnection 394 .write_create_stream_response(transaction_id, &define::STREAM_ID) 395 .await?; 396 397 log::info!( 398 "[ S->C ] [create_stream_response] app_name: {}", 399 self.app_name, 400 ); 401 402 Ok(()) 403 } 404 405 pub async fn on_delete_stream( 406 &mut self, 407 transaction_id: &f64, 408 stream_id: &f64, 409 ) -> Result<(), SessionError> { 410 self.common 411 .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) 412 .await?; 413 414 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 415 netstream 416 .write_on_status( 417 transaction_id, 418 &"status".to_string(), 419 &"NetStream.DeleteStream.Suceess".to_string(), 420 &"".to_string(), 421 ) 422 .await?; 423 424 //self.unsubscribe_from_channels().await?; 425 log::info!( 426 "[ S->C ] [delete stream success] app_name: {}, stream_name: {}", 427 self.app_name, 428 self.stream_name 429 ); 430 log::trace!("{}", stream_id); 431 432 Ok(()) 433 } 434 pub async fn on_play( 435 &mut self, 436 transaction_id: &f64, 437 stream_id: &u32, 438 other_values: &mut Vec<Amf0ValueType>, 439 ) -> Result<(), SessionError> { 440 let length = other_values.len() as u8; 441 let mut index: u8 = 0; 442 443 let mut stream_name: Option<String> = None; 444 let mut start: Option<f64> = None; 445 let mut duration: Option<f64> = None; 446 let mut reset: Option<bool> = None; 447 448 loop { 449 if index >= length { 450 break; 451 } 452 index = index + 1; 453 stream_name = match other_values.remove(0) { 454 Amf0ValueType::UTF8String(val) => Some(val), 455 _ => None, 456 }; 457 458 if index >= length { 459 break; 460 } 461 index = index + 1; 462 start = match other_values.remove(0) { 463 Amf0ValueType::Number(val) => Some(val), 464 _ => None, 465 }; 466 467 if index >= length { 468 break; 469 } 470 index = index + 1; 471 duration = match other_values.remove(0) { 472 Amf0ValueType::Number(val) => Some(val), 473 _ => None, 474 }; 475 476 if index >= length { 477 break; 478 } 479 //index = index + 1; 480 reset = match other_values.remove(0) { 481 Amf0ValueType::Boolean(val) => Some(val), 482 _ => None, 483 }; 484 break; 485 } 486 487 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 488 event_messages.write_stream_begin(stream_id.clone()).await?; 489 log::info!( 490 "[ S->C ] [stream begin] app_name: {}, stream_name: {}", 491 self.app_name, 492 self.stream_name 493 ); 494 log::trace!( 495 "{} {} {}", 496 start.is_some(), 497 duration.is_some(), 498 reset.is_some() 499 ); 500 501 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 502 netstream 503 .write_on_status( 504 transaction_id, 505 &"status".to_string(), 506 &"NetStream.Play.Reset".to_string(), 507 &"reset".to_string(), 508 ) 509 .await?; 510 511 netstream 512 .write_on_status( 513 transaction_id, 514 &"status".to_string(), 515 &"NetStream.Play.Start".to_string(), 516 &"play start".to_string(), 517 ) 518 .await?; 519 520 netstream 521 .write_on_status( 522 transaction_id, 523 &"status".to_string(), 524 &"NetStream.Data.Start".to_string(), 525 &"data start.".to_string(), 526 ) 527 .await?; 528 529 netstream 530 .write_on_status( 531 transaction_id, 532 &"status".to_string(), 533 &"NetStream.Play.PublishNotify".to_string(), 534 &"play publish notify.".to_string(), 535 ) 536 .await?; 537 538 event_messages 539 .write_stream_is_record(stream_id.clone()) 540 .await?; 541 log::info!( 542 "[ S->C ] [stream is record] app_name: {}, stream_name: {}", 543 self.app_name, 544 self.stream_name 545 ); 546 self.stream_name = stream_name.clone().unwrap(); 547 self.common 548 .subscribe_from_channels( 549 self.app_name.clone(), 550 stream_name.unwrap(), 551 self.subscriber_id, 552 ) 553 .await?; 554 555 self.state = ServerSessionState::Play; 556 557 Ok(()) 558 } 559 560 pub async fn on_publish( 561 &mut self, 562 transaction_id: &f64, 563 stream_id: &u32, 564 other_values: &mut Vec<Amf0ValueType>, 565 ) -> Result<(), SessionError> { 566 let length = other_values.len(); 567 568 if length < 2 { 569 return Err(SessionError { 570 value: SessionErrorValue::Amf0ValueCountNotCorrect, 571 }); 572 } 573 574 let stream_name = match other_values.remove(0) { 575 Amf0ValueType::UTF8String(val) => val, 576 _ => { 577 return Err(SessionError { 578 value: SessionErrorValue::Amf0ValueCountNotCorrect, 579 }); 580 } 581 }; 582 583 self.stream_name = stream_name; 584 585 let _ = match other_values.remove(0) { 586 Amf0ValueType::UTF8String(val) => val, 587 _ => { 588 return Err(SessionError { 589 value: SessionErrorValue::Amf0ValueCountNotCorrect, 590 }); 591 } 592 }; 593 594 log::info!( 595 "[ S<-C ] [publish] app_name: {}, stream_name: {}", 596 self.app_name, 597 self.stream_name 598 ); 599 600 log::info!( 601 "[ S->C ] [stream begin] app_name: {}, stream_name: {}", 602 self.app_name, 603 self.stream_name 604 ); 605 606 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 607 event_messages.write_stream_begin(stream_id.clone()).await?; 608 609 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 610 netstream 611 .write_on_status( 612 transaction_id, 613 &"status".to_string(), 614 &"NetStream.Publish.Start".to_string(), 615 &"".to_string(), 616 ) 617 .await?; 618 log::info!( 619 "[ S->C ] [NetStream.Publish.Start] app_name: {}, stream_name: {}", 620 self.app_name, 621 self.stream_name 622 ); 623 624 self.common 625 .publish_to_channels(self.app_name.clone(), self.stream_name.clone()) 626 .await?; 627 628 Ok(()) 629 } 630 } 631