1 use { 2 super::{ 3 common::Common, 4 define, 5 define::SessionType, 6 errors::{SessionError, SessionErrorValue}, 7 }, 8 crate::{ 9 amf0::Amf0ValueType, 10 channels::define::ChannelEventProducer, 11 chunk::{ 12 define::CHUNK_SIZE, 13 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 14 }, 15 config, handshake, 16 handshake::{define::ServerHandshakeState, handshake_server::HandshakeServer}, 17 messages::{define::RtmpMessageData, parser::MessageParser}, 18 netconnection::writer::NetConnection, 19 netstream::writer::NetStreamWriter, 20 protocol_control_messages::writer::ProtocolControlMessagesWriter, 21 user_control_messages::writer::EventMessagesWriter, 22 }, 23 bytes::BytesMut, 24 bytesio::{bytes_writer::AsyncBytesWriter, bytesio::BytesIO}, 25 std::{collections::HashMap, sync::Arc, time::Duration}, 26 tokio::{net::TcpStream, sync::Mutex}, 27 uuid::Uuid, 28 }; 29 30 enum ServerSessionState { 31 Handshake, 32 ReadChunk, 33 // OnConnect, 34 // OnCreateStream, 35 //Publish, 36 Play, 37 } 38 39 pub struct ServerSession { 40 pub app_name: String, 41 pub stream_name: String, 42 io: Arc<Mutex<BytesIO>>, 43 handshaker: HandshakeServer, 44 unpacketizer: ChunkUnpacketizer, 45 state: ServerSessionState, 46 pub common: Common, 47 bytesio_data: BytesMut, 48 has_remaing_data: bool, 49 /* Used to mark the subscriber's the data producer 50 in channels and delete it from map when unsubscribe 51 is called. */ 52 pub subscriber_id: Uuid, 53 connect_command_object: Option<HashMap<String, Amf0ValueType>>, 54 } 55 56 impl ServerSession { 57 pub fn new(stream: TcpStream, event_producer: ChannelEventProducer) -> Self { 58 let net_io = Arc::new(Mutex::new(BytesIO::new(stream))); 59 let subscriber_id = Uuid::new_v4(); 60 Self { 61 app_name: String::from(""), 62 stream_name: String::from(""), 63 io: Arc::clone(&net_io), 64 handshaker: HandshakeServer::new(Arc::clone(&net_io)), 65 unpacketizer: ChunkUnpacketizer::new(), 66 state: ServerSessionState::Handshake, 67 common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Server), 68 subscriber_id, 69 bytesio_data: BytesMut::new(), 70 has_remaing_data: false, 71 connect_command_object: None, 72 } 73 } 74 75 pub async fn run(&mut self) -> Result<(), SessionError> { 76 loop { 77 match self.state { 78 ServerSessionState::Handshake => { 79 self.handshake().await?; 80 } 81 ServerSessionState::ReadChunk => { 82 self.read_parse_chunks().await?; 83 } 84 ServerSessionState::Play => { 85 self.play().await?; 86 } 87 } 88 } 89 90 //Ok(()) 91 } 92 93 async fn handshake(&mut self) -> Result<(), SessionError> { 94 let mut bytes_len = 0; 95 96 while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE { 97 self.bytesio_data = self.io.lock().await.read().await?; 98 bytes_len += self.bytesio_data.len(); 99 self.handshaker.extend_data(&self.bytesio_data[..]); 100 } 101 102 self.handshaker.handshake().await?; 103 104 if let ServerHandshakeState::Finish = self.handshaker.state() { 105 self.state = ServerSessionState::ReadChunk; 106 let left_bytes = self.handshaker.get_remaining_bytes(); 107 if !left_bytes.is_empty() { 108 self.unpacketizer.extend_data(&left_bytes[..]); 109 self.has_remaing_data = true; 110 } 111 log::info!("[ S->C ] [send_set_chunk_size] "); 112 self.send_set_chunk_size().await?; 113 return Ok(()); 114 } 115 116 Ok(()) 117 } 118 119 async fn read_parse_chunks(&mut self) -> Result<(), SessionError> { 120 if !self.has_remaing_data { 121 match self 122 .io 123 .lock() 124 .await 125 .read_timeout(Duration::from_secs(2)) 126 .await 127 { 128 Ok(data) => { 129 self.bytesio_data = data; 130 } 131 Err(err) => { 132 self.common 133 .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) 134 .await?; 135 136 return Err(SessionError { 137 value: SessionErrorValue::BytesIOError(err), 138 }); 139 } 140 } 141 142 self.unpacketizer.extend_data(&self.bytesio_data[..]); 143 } 144 145 self.has_remaing_data = false; 146 147 loop { 148 let result = self.unpacketizer.read_chunks(); 149 150 if let Ok(rv) = result { 151 if let UnpackResult::Chunks(chunks) = rv { 152 for chunk_info in chunks { 153 let timestamp = chunk_info.message_header.timestamp; 154 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 155 156 let mut msg = MessageParser::new(chunk_info).parse()?; 157 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 158 .await?; 159 } 160 } 161 } else { 162 break; 163 } 164 } 165 Ok(()) 166 } 167 168 async fn play(&mut self) -> Result<(), SessionError> { 169 match self.common.send_channel_data().await { 170 Ok(_) => {} 171 172 Err(err) => { 173 self.common 174 .unsubscribe_from_channels( 175 self.app_name.clone(), 176 self.stream_name.clone(), 177 self.subscriber_id, 178 ) 179 .await?; 180 return Err(err); 181 } 182 } 183 184 Ok(()) 185 } 186 187 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 188 let mut controlmessage = 189 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 190 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 191 192 Ok(()) 193 } 194 195 pub async fn process_messages( 196 &mut self, 197 rtmp_msg: &mut RtmpMessageData, 198 msg_stream_id: &u32, 199 timestamp: &u32, 200 ) -> Result<(), SessionError> { 201 match rtmp_msg { 202 RtmpMessageData::Amf0Command { 203 command_name, 204 transaction_id, 205 command_object, 206 others, 207 } => { 208 self.on_amf0_command_message( 209 msg_stream_id, 210 command_name, 211 transaction_id, 212 command_object, 213 others, 214 ) 215 .await? 216 } 217 RtmpMessageData::SetChunkSize { chunk_size } => { 218 self.on_set_chunk_size(*chunk_size as usize)?; 219 } 220 RtmpMessageData::AudioData { data } => { 221 self.common.on_audio_data(data, timestamp)?; 222 } 223 RtmpMessageData::VideoData { data } => { 224 self.common.on_video_data(data, timestamp)?; 225 } 226 RtmpMessageData::AmfData { raw_data } => { 227 self.common.on_meta_data(raw_data, timestamp)?; 228 } 229 230 _ => {} 231 } 232 Ok(()) 233 } 234 235 pub async fn on_amf0_command_message( 236 &mut self, 237 stream_id: &u32, 238 command_name: &Amf0ValueType, 239 transaction_id: &Amf0ValueType, 240 command_object: &Amf0ValueType, 241 others: &mut Vec<Amf0ValueType>, 242 ) -> Result<(), SessionError> { 243 let empty_cmd_name = &String::new(); 244 let cmd_name = match command_name { 245 Amf0ValueType::UTF8String(str) => str, 246 _ => empty_cmd_name, 247 }; 248 249 let transaction_id = match transaction_id { 250 Amf0ValueType::Number(number) => number, 251 _ => &0.0, 252 }; 253 254 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 255 let obj = match command_object { 256 Amf0ValueType::Object(obj) => obj, 257 _ => &empty_cmd_obj, 258 }; 259 260 match cmd_name.as_str() { 261 "connect" => { 262 log::info!("[ S<-C ] [connect] "); 263 self.on_connect(transaction_id, obj).await?; 264 } 265 "createStream" => { 266 log::info!("[ S<-C ] [create stream] "); 267 self.on_create_stream(transaction_id).await?; 268 } 269 "deleteStream" => { 270 if !others.is_empty() { 271 let stream_id = match others.pop() { 272 Some(Amf0ValueType::Number(streamid)) => streamid, 273 _ => 0.0, 274 }; 275 276 log::info!( 277 "[ S<-C ] [delete stream] app_name: {}, stream_name: {}", 278 self.app_name, 279 self.stream_name 280 ); 281 282 self.on_delete_stream(transaction_id, &stream_id).await?; 283 } 284 } 285 "play" => { 286 log::info!( 287 "[ S<-C ] [play] app_name: {}, stream_name: {}", 288 self.app_name, 289 self.stream_name 290 ); 291 self.unpacketizer.session_type = config::SERVER_PULL; 292 self.on_play(transaction_id, stream_id, others).await?; 293 } 294 "publish" => { 295 self.unpacketizer.session_type = config::SERVER_PUSH; 296 self.on_publish(transaction_id, stream_id, others).await?; 297 } 298 _ => {} 299 } 300 301 Ok(()) 302 } 303 304 fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> { 305 log::info!( 306 "[ S<-C ] [set chunk size] app_name: {}, stream_name: {}, chunk size: {}", 307 self.app_name, 308 self.stream_name, 309 chunk_size 310 ); 311 self.unpacketizer.update_max_chunk_size(chunk_size); 312 Ok(()) 313 } 314 315 async fn on_connect( 316 &mut self, 317 transaction_id: &f64, 318 command_obj: &HashMap<String, Amf0ValueType>, 319 ) -> Result<(), SessionError> { 320 self.connect_command_object = Some(command_obj.clone()); 321 let mut control_message = 322 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 323 log::info!("[ S->C ] [set window_acknowledgement_size]"); 324 control_message 325 .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 326 .await?; 327 328 log::info!("[ S->C ] [set set_peer_bandwidth]",); 329 control_message 330 .write_set_peer_bandwidth( 331 define::PEER_BANDWIDTH, 332 define::peer_bandwidth_limit_type::DYNAMIC, 333 ) 334 .await?; 335 336 let obj_encoding = command_obj.get("objectEncoding"); 337 let encoding = match obj_encoding { 338 Some(Amf0ValueType::Number(encoding)) => encoding, 339 _ => &define::OBJENCODING_AMF0, 340 }; 341 342 let app_name = command_obj.get("app"); 343 self.app_name = match app_name { 344 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 345 _ => { 346 return Err(SessionError { 347 value: SessionErrorValue::NoAppName, 348 }); 349 } 350 }; 351 352 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 353 log::info!("[ S->C ] [set connect_response]",); 354 netconnection 355 .write_connect_response( 356 transaction_id, 357 define::FMSVER, 358 &define::CAPABILITIES, 359 &String::from("NetConnection.Connect.Success"), 360 define::LEVEL, 361 &String::from("Connection Succeeded."), 362 encoding, 363 ) 364 .await?; 365 366 Ok(()) 367 } 368 369 pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 370 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 371 netconnection 372 .write_create_stream_response(transaction_id, &define::STREAM_ID) 373 .await?; 374 375 log::info!( 376 "[ S->C ] [create_stream_response] app_name: {}", 377 self.app_name, 378 ); 379 380 Ok(()) 381 } 382 383 pub async fn on_delete_stream( 384 &mut self, 385 transaction_id: &f64, 386 stream_id: &f64, 387 ) -> Result<(), SessionError> { 388 self.common 389 .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) 390 .await?; 391 392 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 393 netstream 394 .write_on_status( 395 transaction_id, 396 "status", 397 "NetStream.DeleteStream.Suceess", 398 "", 399 ) 400 .await?; 401 402 //self.unsubscribe_from_channels().await?; 403 log::info!( 404 "[ S->C ] [delete stream success] app_name: {}, stream_name: {}", 405 self.app_name, 406 self.stream_name 407 ); 408 log::trace!("{}", stream_id); 409 410 Ok(()) 411 } 412 413 #[allow(clippy::never_loop)] 414 pub async fn on_play( 415 &mut self, 416 transaction_id: &f64, 417 stream_id: &u32, 418 other_values: &mut Vec<Amf0ValueType>, 419 ) -> Result<(), SessionError> { 420 let length = other_values.len() as u8; 421 let mut index: u8 = 0; 422 423 let mut stream_name: Option<String> = None; 424 let mut start: Option<f64> = None; 425 let mut duration: Option<f64> = None; 426 let mut reset: Option<bool> = None; 427 428 loop { 429 if index >= length { 430 break; 431 } 432 index += 1; 433 stream_name = match other_values.remove(0) { 434 Amf0ValueType::UTF8String(val) => Some(val), 435 _ => None, 436 }; 437 438 if index >= length { 439 break; 440 } 441 index += 1; 442 start = match other_values.remove(0) { 443 Amf0ValueType::Number(val) => Some(val), 444 _ => None, 445 }; 446 447 if index >= length { 448 break; 449 } 450 index += 1; 451 duration = match other_values.remove(0) { 452 Amf0ValueType::Number(val) => Some(val), 453 _ => None, 454 }; 455 456 if index >= length { 457 break; 458 } 459 //index = index + 1; 460 reset = match other_values.remove(0) { 461 Amf0ValueType::Boolean(val) => Some(val), 462 _ => None, 463 }; 464 break; 465 } 466 467 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 468 event_messages.write_stream_begin(*stream_id).await?; 469 log::info!( 470 "[ S->C ] [stream begin] app_name: {}, stream_name: {}", 471 self.app_name, 472 self.stream_name 473 ); 474 log::trace!( 475 "{} {} {}", 476 start.is_some(), 477 duration.is_some(), 478 reset.is_some() 479 ); 480 481 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 482 netstream 483 .write_on_status(transaction_id, "status", "NetStream.Play.Reset", "reset") 484 .await?; 485 486 netstream 487 .write_on_status( 488 transaction_id, 489 "status", 490 "NetStream.Play.Start", 491 "play start", 492 ) 493 .await?; 494 495 netstream 496 .write_on_status( 497 transaction_id, 498 "status", 499 "NetStream.Data.Start", 500 "data start.", 501 ) 502 .await?; 503 504 netstream 505 .write_on_status( 506 transaction_id, 507 "status", 508 "NetStream.Play.PublishNotify", 509 "play publish notify.", 510 ) 511 .await?; 512 513 event_messages.write_stream_is_record(*stream_id).await?; 514 log::info!( 515 "[ S->C ] [stream is record] app_name: {}, stream_name: {}", 516 self.app_name, 517 self.stream_name 518 ); 519 self.stream_name = stream_name.clone().unwrap(); 520 self.common 521 .subscribe_from_channels( 522 self.app_name.clone(), 523 stream_name.unwrap(), 524 self.subscriber_id, 525 ) 526 .await?; 527 528 self.state = ServerSessionState::Play; 529 530 Ok(()) 531 } 532 533 pub async fn on_publish( 534 &mut self, 535 transaction_id: &f64, 536 stream_id: &u32, 537 other_values: &mut Vec<Amf0ValueType>, 538 ) -> Result<(), SessionError> { 539 let length = other_values.len(); 540 541 if length < 2 { 542 return Err(SessionError { 543 value: SessionErrorValue::Amf0ValueCountNotCorrect, 544 }); 545 } 546 547 let stream_name = match other_values.remove(0) { 548 Amf0ValueType::UTF8String(val) => val, 549 _ => { 550 return Err(SessionError { 551 value: SessionErrorValue::Amf0ValueCountNotCorrect, 552 }); 553 } 554 }; 555 556 self.stream_name = stream_name; 557 558 let _ = match other_values.remove(0) { 559 Amf0ValueType::UTF8String(val) => val, 560 _ => { 561 return Err(SessionError { 562 value: SessionErrorValue::Amf0ValueCountNotCorrect, 563 }); 564 } 565 }; 566 567 log::info!( 568 "[ S<-C ] [publish] app_name: {}, stream_name: {}", 569 self.app_name, 570 self.stream_name 571 ); 572 573 log::info!( 574 "[ S->C ] [stream begin] app_name: {}, stream_name: {}", 575 self.app_name, 576 self.stream_name 577 ); 578 579 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 580 event_messages.write_stream_begin(*stream_id).await?; 581 582 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 583 netstream 584 .write_on_status(transaction_id, "status", "NetStream.Publish.Start", "") 585 .await?; 586 log::info!( 587 "[ S->C ] [NetStream.Publish.Start] app_name: {}, stream_name: {}", 588 self.app_name, 589 self.stream_name 590 ); 591 592 self.common 593 .publish_to_channels(self.app_name.clone(), self.stream_name.clone()) 594 .await?; 595 596 Ok(()) 597 } 598 } 599