1 use { 2 super::{ 3 common::Common, 4 define, 5 define::SessionType, 6 errors::{SessionError, SessionErrorValue}, 7 }, 8 crate::{ 9 amf0::Amf0ValueType, 10 channels::define::ChannelEventProducer, 11 chunk::{ 12 define::CHUNK_SIZE, 13 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 14 }, 15 config, 16 handshake::handshake::{HandshakeServer, ServerHandshakeState}, 17 messages::{define::RtmpMessageData, parser::MessageParser}, 18 netconnection::writer::NetConnection, 19 netstream::writer::NetStreamWriter, 20 protocol_control_messages::writer::ProtocolControlMessagesWriter, 21 user_control_messages::writer::EventMessagesWriter, 22 }, 23 bytes::BytesMut, 24 bytesio::{bytes_writer::AsyncBytesWriter, bytesio::BytesIO}, 25 std::{collections::HashMap, sync::Arc, time::Duration}, 26 tokio::{net::TcpStream, sync::Mutex}, 27 uuid::Uuid, 28 }; 29 30 enum ServerSessionState { 31 Handshake, 32 ReadChunk, 33 // OnConnect, 34 // OnCreateStream, 35 //Publish, 36 Play, 37 } 38 39 pub struct ServerSession { 40 pub app_name: String, 41 pub stream_name: String, 42 43 io: Arc<Mutex<BytesIO>>, 44 handshaker: HandshakeServer, 45 46 unpacketizer: ChunkUnpacketizer, 47 48 state: ServerSessionState, 49 50 pub common: Common, 51 52 bytesio_data: BytesMut, 53 has_remaing_data: bool, 54 55 /* Used to mark the subscriber's the data producer 56 in channels and delete it from map when unsubscribe 57 is called. */ 58 pub subscriber_id: Uuid, 59 60 connect_command_object: Option<HashMap<String, Amf0ValueType>>, 61 } 62 63 impl ServerSession { 64 pub fn new(stream: TcpStream, event_producer: ChannelEventProducer) -> Self { 65 let net_io = Arc::new(Mutex::new(BytesIO::new(stream))); 66 let subscriber_id = Uuid::new_v4(); 67 Self { 68 app_name: String::from(""), 69 stream_name: String::from(""), 70 71 io: Arc::clone(&net_io), 72 handshaker: HandshakeServer::new(Arc::clone(&net_io)), 73 74 unpacketizer: ChunkUnpacketizer::new(), 75 76 state: ServerSessionState::Handshake, 77 78 common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Server), 79 80 subscriber_id, 81 bytesio_data: BytesMut::new(), 82 has_remaing_data: false, 83 84 connect_command_object: None, 85 } 86 } 87 88 pub async fn run(&mut self) -> Result<(), SessionError> { 89 loop { 90 match self.state { 91 ServerSessionState::Handshake => { 92 self.handshake().await?; 93 } 94 ServerSessionState::ReadChunk => { 95 self.read_parse_chunks().await?; 96 } 97 ServerSessionState::Play => { 98 self.play().await?; 99 } 100 } 101 } 102 103 //Ok(()) 104 } 105 106 async fn handshake(&mut self) -> Result<(), SessionError> { 107 self.bytesio_data = self.io.lock().await.read().await?; 108 109 self.handshaker.extend_data(&self.bytesio_data[..]); 110 self.handshaker.handshake().await?; 111 112 match self.handshaker.state() { 113 ServerHandshakeState::Finish => { 114 self.state = ServerSessionState::ReadChunk; 115 116 let left_bytes = self.handshaker.get_remaining_bytes(); 117 if left_bytes.len() > 0 { 118 self.unpacketizer.extend_data(&left_bytes[..]); 119 self.has_remaing_data = true; 120 } 121 self.send_set_chunk_size().await?; 122 return Ok(()); 123 } 124 _ => {} 125 } 126 127 Ok(()) 128 } 129 130 async fn read_parse_chunks(&mut self) -> Result<(), SessionError> { 131 if !self.has_remaing_data { 132 match self 133 .io 134 .lock() 135 .await 136 .read_timeout(Duration::from_secs(2)) 137 .await 138 { 139 Ok(data) => { 140 self.bytesio_data = data; 141 } 142 Err(err) => { 143 self.common 144 .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) 145 .await?; 146 147 return Err(SessionError { 148 value: SessionErrorValue::BytesIOError(err), 149 }); 150 } 151 } 152 153 self.unpacketizer.extend_data(&self.bytesio_data[..]); 154 } 155 156 self.has_remaing_data = false; 157 158 loop { 159 let result = self.unpacketizer.read_chunks(); 160 161 if let Ok(rv) = result { 162 match rv { 163 UnpackResult::Chunks(chunks) => { 164 for chunk_info in chunks.iter() { 165 let mut msg = MessageParser::new(chunk_info.clone()).parse()?; 166 167 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 168 let timestamp = chunk_info.message_header.timestamp; 169 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 170 .await?; 171 } 172 } 173 _ => {} 174 } 175 } else { 176 break; 177 } 178 } 179 Ok(()) 180 } 181 182 async fn play(&mut self) -> Result<(), SessionError> { 183 match self.common.send_channel_data().await { 184 Ok(_) => {} 185 186 Err(err) => { 187 self.common 188 .unsubscribe_from_channels( 189 self.app_name.clone(), 190 self.stream_name.clone(), 191 self.subscriber_id, 192 ) 193 .await?; 194 return Err(err); 195 } 196 } 197 198 Ok(()) 199 } 200 201 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 202 let mut controlmessage = 203 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 204 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 205 206 Ok(()) 207 } 208 209 pub async fn process_messages( 210 &mut self, 211 rtmp_msg: &mut RtmpMessageData, 212 msg_stream_id: &u32, 213 timestamp: &u32, 214 ) -> Result<(), SessionError> { 215 match rtmp_msg { 216 RtmpMessageData::Amf0Command { 217 command_name, 218 transaction_id, 219 command_object, 220 others, 221 } => { 222 self.on_amf0_command_message( 223 msg_stream_id, 224 command_name, 225 transaction_id, 226 command_object, 227 others, 228 ) 229 .await? 230 } 231 RtmpMessageData::SetChunkSize { chunk_size } => { 232 self.on_set_chunk_size(chunk_size.clone() as usize)?; 233 } 234 RtmpMessageData::AudioData { data } => { 235 self.common.on_audio_data(data, timestamp)?; 236 } 237 RtmpMessageData::VideoData { data } => { 238 self.common.on_video_data(data, timestamp)?; 239 } 240 RtmpMessageData::AmfData { raw_data } => { 241 self.common.on_meta_data(raw_data, timestamp)?; 242 } 243 244 _ => {} 245 } 246 Ok(()) 247 } 248 249 pub async fn on_amf0_command_message( 250 &mut self, 251 stream_id: &u32, 252 command_name: &Amf0ValueType, 253 transaction_id: &Amf0ValueType, 254 command_object: &Amf0ValueType, 255 others: &mut Vec<Amf0ValueType>, 256 ) -> Result<(), SessionError> { 257 let empty_cmd_name = &String::new(); 258 let cmd_name = match command_name { 259 Amf0ValueType::UTF8String(str) => str, 260 _ => empty_cmd_name, 261 }; 262 263 let transaction_id = match transaction_id { 264 Amf0ValueType::Number(number) => number, 265 _ => &0.0, 266 }; 267 268 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 269 let obj = match command_object { 270 Amf0ValueType::Object(obj) => obj, 271 _ => &empty_cmd_obj, 272 }; 273 274 match cmd_name.as_str() { 275 "connect" => { 276 log::info!("[ S<-C ] [connect] "); 277 self.on_connect(&transaction_id, &obj).await?; 278 } 279 "createStream" => { 280 log::info!("[ S<-C ] [create stream] "); 281 self.on_create_stream(transaction_id).await?; 282 } 283 "deleteStream" => { 284 if others.len() > 0 { 285 let stream_id = match others.pop() { 286 Some(val) => match val { 287 Amf0ValueType::Number(streamid) => streamid, 288 _ => 0.0, 289 }, 290 _ => 0.0, 291 }; 292 log::info!( 293 "[ S<-C ] [delete stream] app_name: {}, stream_name: {}", 294 self.app_name, 295 self.stream_name 296 ); 297 298 self.on_delete_stream(transaction_id, &stream_id).await?; 299 } 300 } 301 "play" => { 302 log::info!( 303 "[ S<-C ] [play] app_name: {}, stream_name: {}", 304 self.app_name, 305 self.stream_name 306 ); 307 self.unpacketizer.session_type = config::SERVER_PULL; 308 self.on_play(transaction_id, stream_id, others).await?; 309 } 310 "publish" => { 311 self.unpacketizer.session_type = config::SERVER_PUSH; 312 self.on_publish(transaction_id, stream_id, others).await?; 313 } 314 _ => {} 315 } 316 317 Ok(()) 318 } 319 320 fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> { 321 self.unpacketizer.update_max_chunk_size(chunk_size); 322 Ok(()) 323 } 324 325 async fn on_connect( 326 &mut self, 327 transaction_id: &f64, 328 command_obj: &HashMap<String, Amf0ValueType>, 329 ) -> Result<(), SessionError> { 330 self.connect_command_object = Some(command_obj.clone()); 331 let mut control_message = 332 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 333 log::info!("[ S->C ] [set window_acknowledgement_size]"); 334 control_message 335 .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 336 .await?; 337 log::info!("[ S->C ] [set set_peer_bandwidth]",); 338 control_message 339 .write_set_peer_bandwidth( 340 define::PEER_BANDWIDTH, 341 define::peer_bandwidth_limit_type::DYNAMIC, 342 ) 343 .await?; 344 345 let obj_encoding = command_obj.get("objectEncoding"); 346 let encoding = match obj_encoding { 347 Some(Amf0ValueType::Number(encoding)) => encoding, 348 _ => &define::OBJENCODING_AMF0, 349 }; 350 351 let app_name = command_obj.get("app"); 352 self.app_name = match app_name { 353 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 354 _ => { 355 return Err(SessionError { 356 value: SessionErrorValue::NoAppName, 357 }); 358 } 359 }; 360 361 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 362 netconnection 363 .write_connect_response( 364 &transaction_id, 365 &define::FMSVER.to_string(), 366 &define::CAPABILITIES, 367 &String::from("NetConnection.Connect.Success"), 368 &define::LEVEL.to_string(), 369 &String::from("Connection Succeeded."), 370 encoding, 371 ) 372 .await?; 373 374 Ok(()) 375 } 376 377 pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 378 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 379 netconnection 380 .write_create_stream_response(transaction_id, &define::STREAM_ID) 381 .await?; 382 383 log::info!( 384 "[ S->C ] [create_stream_response] app_name: {}", 385 self.app_name, 386 ); 387 388 Ok(()) 389 } 390 391 pub async fn on_delete_stream( 392 &mut self, 393 transaction_id: &f64, 394 stream_id: &f64, 395 ) -> Result<(), SessionError> { 396 self.common 397 .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) 398 .await?; 399 400 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 401 netstream 402 .write_on_status( 403 transaction_id, 404 &"status".to_string(), 405 &"NetStream.DeleteStream.Suceess".to_string(), 406 &"".to_string(), 407 ) 408 .await?; 409 410 //self.unsubscribe_from_channels().await?; 411 log::info!( 412 "[ S->C ] [delete stream success] app_name: {}, stream_name: {}", 413 self.app_name, 414 self.stream_name 415 ); 416 log::trace!("{}", stream_id); 417 418 Ok(()) 419 } 420 pub async fn on_play( 421 &mut self, 422 transaction_id: &f64, 423 stream_id: &u32, 424 other_values: &mut Vec<Amf0ValueType>, 425 ) -> Result<(), SessionError> { 426 let length = other_values.len() as u8; 427 let mut index: u8 = 0; 428 429 let mut stream_name: Option<String> = None; 430 let mut start: Option<f64> = None; 431 let mut duration: Option<f64> = None; 432 let mut reset: Option<bool> = None; 433 434 loop { 435 if index >= length { 436 break; 437 } 438 index = index + 1; 439 stream_name = match other_values.remove(0) { 440 Amf0ValueType::UTF8String(val) => Some(val), 441 _ => None, 442 }; 443 444 if index >= length { 445 break; 446 } 447 index = index + 1; 448 start = match other_values.remove(0) { 449 Amf0ValueType::Number(val) => Some(val), 450 _ => None, 451 }; 452 453 if index >= length { 454 break; 455 } 456 index = index + 1; 457 duration = match other_values.remove(0) { 458 Amf0ValueType::Number(val) => Some(val), 459 _ => None, 460 }; 461 462 if index >= length { 463 break; 464 } 465 //index = index + 1; 466 reset = match other_values.remove(0) { 467 Amf0ValueType::Boolean(val) => Some(val), 468 _ => None, 469 }; 470 break; 471 } 472 473 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 474 event_messages.write_stream_begin(stream_id.clone()).await?; 475 log::info!( 476 "[ S->C ] [stream begin] app_name: {}, stream_name: {}", 477 self.app_name, 478 self.stream_name 479 ); 480 log::trace!( 481 "{} {} {}", 482 start.is_some(), 483 duration.is_some(), 484 reset.is_some() 485 ); 486 487 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 488 netstream 489 .write_on_status( 490 transaction_id, 491 &"status".to_string(), 492 &"NetStream.Play.Reset".to_string(), 493 &"reset".to_string(), 494 ) 495 .await?; 496 497 netstream 498 .write_on_status( 499 transaction_id, 500 &"status".to_string(), 501 &"NetStream.Play.Start".to_string(), 502 &"play start".to_string(), 503 ) 504 .await?; 505 506 netstream 507 .write_on_status( 508 transaction_id, 509 &"status".to_string(), 510 &"NetStream.Data.Start".to_string(), 511 &"data start.".to_string(), 512 ) 513 .await?; 514 515 netstream 516 .write_on_status( 517 transaction_id, 518 &"status".to_string(), 519 &"NetStream.Play.PublishNotify".to_string(), 520 &"play publish notify.".to_string(), 521 ) 522 .await?; 523 524 event_messages 525 .write_stream_is_record(stream_id.clone()) 526 .await?; 527 log::info!( 528 "[ S->C ] [stream is record] app_name: {}, stream_name: {}", 529 self.app_name, 530 self.stream_name 531 ); 532 self.stream_name = stream_name.clone().unwrap(); 533 self.common 534 .subscribe_from_channels( 535 self.app_name.clone(), 536 stream_name.unwrap(), 537 self.subscriber_id, 538 ) 539 .await?; 540 541 self.state = ServerSessionState::Play; 542 543 Ok(()) 544 } 545 546 pub async fn on_publish( 547 &mut self, 548 transaction_id: &f64, 549 stream_id: &u32, 550 other_values: &mut Vec<Amf0ValueType>, 551 ) -> Result<(), SessionError> { 552 let length = other_values.len(); 553 554 if length < 2 { 555 return Err(SessionError { 556 value: SessionErrorValue::Amf0ValueCountNotCorrect, 557 }); 558 } 559 560 let stream_name = match other_values.remove(0) { 561 Amf0ValueType::UTF8String(val) => val, 562 _ => { 563 return Err(SessionError { 564 value: SessionErrorValue::Amf0ValueCountNotCorrect, 565 }); 566 } 567 }; 568 569 self.stream_name = stream_name; 570 571 let _ = match other_values.remove(0) { 572 Amf0ValueType::UTF8String(val) => val, 573 _ => { 574 return Err(SessionError { 575 value: SessionErrorValue::Amf0ValueCountNotCorrect, 576 }); 577 } 578 }; 579 580 log::info!( 581 "[ S<-C ] [publish] app_name: {}, stream_name: {}", 582 self.app_name, 583 self.stream_name 584 ); 585 586 log::info!( 587 "[ S->C ] [stream begin] app_name: {}, stream_name: {}", 588 self.app_name, 589 self.stream_name 590 ); 591 592 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 593 event_messages.write_stream_begin(stream_id.clone()).await?; 594 595 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 596 netstream 597 .write_on_status( 598 transaction_id, 599 &"status".to_string(), 600 &"NetStream.Publish.Start".to_string(), 601 &"".to_string(), 602 ) 603 .await?; 604 log::info!( 605 "[ S->C ] [NetStream.Publish.Start] app_name: {}, stream_name: {}", 606 self.app_name, 607 self.stream_name 608 ); 609 610 self.common 611 .publish_to_channels(self.app_name.clone(), self.stream_name.clone()) 612 .await?; 613 614 Ok(()) 615 } 616 } 617