1 use { 2 super::{ 3 common::Common, 4 define, 5 define::SessionType, 6 errors::{SessionError, SessionErrorValue}, 7 }, 8 crate::{ 9 amf0::Amf0ValueType, 10 channels::define::ChannelEventProducer, 11 chunk::{ 12 define::CHUNK_SIZE, 13 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 14 }, 15 config, 16 handshake::handshake::{HandshakeServer, ServerHandshakeState}, 17 messages::{define::RtmpMessageData, parser::MessageParser}, 18 netconnection::writer::NetConnection, 19 netstream::writer::NetStreamWriter, 20 protocol_control_messages::writer::ProtocolControlMessagesWriter, 21 user_control_messages::writer::EventMessagesWriter, 22 }, 23 bytes::BytesMut, 24 bytesio::{bytes_writer::AsyncBytesWriter, bytesio::BytesIO}, 25 std::{collections::HashMap, sync::Arc}, 26 tokio::{net::TcpStream, sync::Mutex}, 27 uuid::Uuid, 28 }; 29 30 enum ServerSessionState { 31 Handshake, 32 ReadChunk, 33 // OnConnect, 34 // OnCreateStream, 35 //Publish, 36 Play, 37 } 38 39 pub struct ServerSession { 40 pub app_name: String, 41 pub stream_name: String, 42 43 io: Arc<Mutex<BytesIO>>, 44 handshaker: HandshakeServer, 45 46 unpacketizer: ChunkUnpacketizer, 47 48 state: ServerSessionState, 49 50 pub common: Common, 51 52 bytesio_data: BytesMut, 53 need_process: bool, 54 55 /* Used to mark the subscriber's the data producer 56 in channels and delete it from map when unsubscribe 57 is called. */ 58 pub subscriber_id: Uuid, 59 60 connect_command_object: Option<HashMap<String, Amf0ValueType>>, 61 } 62 63 impl ServerSession { 64 pub fn new(stream: TcpStream, event_producer: ChannelEventProducer) -> Self { 65 let net_io = Arc::new(Mutex::new(BytesIO::new(stream))); 66 let subscriber_id = Uuid::new_v4(); 67 Self { 68 app_name: String::from(""), 69 stream_name: String::from(""), 70 71 io: Arc::clone(&net_io), 72 handshaker: HandshakeServer::new(Arc::clone(&net_io)), 73 74 unpacketizer: ChunkUnpacketizer::new(), 75 76 state: ServerSessionState::Handshake, 77 78 common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Server), 79 80 subscriber_id, 81 bytesio_data: BytesMut::new(), 82 need_process: false, 83 84 connect_command_object: None, 85 } 86 } 87 88 pub async fn run(&mut self) -> Result<(), SessionError> { 89 loop { 90 match self.state { 91 ServerSessionState::Handshake => { 92 self.handshake().await?; 93 } 94 ServerSessionState::ReadChunk => { 95 self.read_parse_chunks().await?; 96 } 97 ServerSessionState::Play => { 98 self.play().await?; 99 } 100 } 101 } 102 103 //Ok(()) 104 } 105 106 async fn handshake(&mut self) -> Result<(), SessionError> { 107 self.bytesio_data = self.io.lock().await.read().await?; 108 self.handshaker.extend_data(&self.bytesio_data[..]); 109 self.handshaker.handshake().await?; 110 111 match self.handshaker.state() { 112 ServerHandshakeState::Finish => { 113 self.state = ServerSessionState::ReadChunk; 114 115 let left_bytes = self.handshaker.get_remaining_bytes(); 116 if left_bytes.len() > 0 { 117 self.unpacketizer.extend_data(&left_bytes[..]); 118 self.need_process = true; 119 } 120 self.send_set_chunk_size().await?; 121 return Ok(()); 122 } 123 _ => {} 124 } 125 126 Ok(()) 127 } 128 129 async fn read_parse_chunks(&mut self) -> Result<(), SessionError> { 130 if !self.need_process { 131 self.bytesio_data = self.io.lock().await.read().await?; 132 self.unpacketizer.extend_data(&self.bytesio_data[..]); 133 } 134 135 self.need_process = false; 136 137 loop { 138 let result = self.unpacketizer.read_chunks(); 139 140 if let Ok(rv) = result { 141 match rv { 142 UnpackResult::Chunks(chunks) => { 143 for chunk_info in chunks.iter() { 144 let mut msg = MessageParser::new(chunk_info.clone()).parse()?; 145 146 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 147 let timestamp = chunk_info.message_header.timestamp; 148 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 149 .await?; 150 } 151 } 152 _ => {} 153 } 154 } else { 155 break; 156 } 157 } 158 Ok(()) 159 } 160 161 async fn play(&mut self) -> Result<(), SessionError> { 162 match self.common.send_channel_data().await { 163 Ok(_) => {} 164 165 Err(err) => { 166 self.common 167 .unsubscribe_from_channels( 168 self.app_name.clone(), 169 self.stream_name.clone(), 170 self.subscriber_id, 171 ) 172 .await?; 173 return Err(err); 174 } 175 } 176 177 Ok(()) 178 } 179 180 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 181 let mut controlmessage = 182 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 183 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 184 185 Ok(()) 186 } 187 188 pub async fn process_messages( 189 &mut self, 190 rtmp_msg: &mut RtmpMessageData, 191 msg_stream_id: &u32, 192 timestamp: &u32, 193 ) -> Result<(), SessionError> { 194 match rtmp_msg { 195 RtmpMessageData::Amf0Command { 196 command_name, 197 transaction_id, 198 command_object, 199 others, 200 } => { 201 self.on_amf0_command_message( 202 msg_stream_id, 203 command_name, 204 transaction_id, 205 command_object, 206 others, 207 ) 208 .await? 209 } 210 RtmpMessageData::SetChunkSize { chunk_size } => { 211 self.on_set_chunk_size(chunk_size.clone() as usize)?; 212 } 213 RtmpMessageData::AudioData { data } => { 214 self.common.on_audio_data(data, timestamp)?; 215 } 216 RtmpMessageData::VideoData { data } => { 217 self.common.on_video_data(data, timestamp)?; 218 } 219 RtmpMessageData::AmfData { raw_data } => { 220 self.common.on_meta_data(raw_data, timestamp)?; 221 } 222 223 _ => {} 224 } 225 Ok(()) 226 } 227 228 pub async fn on_amf0_command_message( 229 &mut self, 230 stream_id: &u32, 231 command_name: &Amf0ValueType, 232 transaction_id: &Amf0ValueType, 233 command_object: &Amf0ValueType, 234 others: &mut Vec<Amf0ValueType>, 235 ) -> Result<(), SessionError> { 236 let empty_cmd_name = &String::new(); 237 let cmd_name = match command_name { 238 Amf0ValueType::UTF8String(str) => str, 239 _ => empty_cmd_name, 240 }; 241 242 let transaction_id = match transaction_id { 243 Amf0ValueType::Number(number) => number, 244 _ => &0.0, 245 }; 246 247 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 248 let obj = match command_object { 249 Amf0ValueType::Object(obj) => obj, 250 _ => &empty_cmd_obj, 251 }; 252 253 match cmd_name.as_str() { 254 "connect" => { 255 log::info!("[ S<-C ] [connect] "); 256 self.on_connect(&transaction_id, &obj).await?; 257 } 258 "createStream" => { 259 log::info!("[ S<-C ] [create stream] "); 260 self.on_create_stream(transaction_id).await?; 261 } 262 "deleteStream" => { 263 if others.len() > 0 { 264 let stream_id = match others.pop() { 265 Some(val) => match val { 266 Amf0ValueType::Number(streamid) => streamid, 267 _ => 0.0, 268 }, 269 _ => 0.0, 270 }; 271 log::info!( 272 "[ S<-C ] [delete stream] app_name: {}, stream_name: {}", 273 self.app_name, 274 self.stream_name 275 ); 276 277 self.on_delete_stream(transaction_id, &stream_id).await?; 278 } 279 } 280 "play" => { 281 log::info!( 282 "[ S<-C ] [play] app_name: {}, stream_name: {}", 283 self.app_name, 284 self.stream_name 285 ); 286 self.unpacketizer.session_type = config::SERVER_PULL; 287 self.on_play(transaction_id, stream_id, others).await?; 288 } 289 "publish" => { 290 self.unpacketizer.session_type = config::SERVER_PUSH; 291 self.on_publish(transaction_id, stream_id, others).await?; 292 } 293 _ => {} 294 } 295 296 Ok(()) 297 } 298 299 fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> { 300 self.unpacketizer.update_max_chunk_size(chunk_size); 301 Ok(()) 302 } 303 304 async fn on_connect( 305 &mut self, 306 transaction_id: &f64, 307 command_obj: &HashMap<String, Amf0ValueType>, 308 ) -> Result<(), SessionError> { 309 self.connect_command_object = Some(command_obj.clone()); 310 let mut control_message = 311 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 312 log::info!("[ S->C ] [set window_acknowledgement_size]"); 313 control_message 314 .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 315 .await?; 316 log::info!("[ S->C ] [set set_peer_bandwidth]",); 317 control_message 318 .write_set_peer_bandwidth( 319 define::PEER_BANDWIDTH, 320 define::peer_bandwidth_limit_type::DYNAMIC, 321 ) 322 .await?; 323 324 let obj_encoding = command_obj.get("objectEncoding"); 325 let encoding = match obj_encoding { 326 Some(Amf0ValueType::Number(encoding)) => encoding, 327 _ => &define::OBJENCODING_AMF0, 328 }; 329 330 let app_name = command_obj.get("app"); 331 self.app_name = match app_name { 332 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 333 _ => { 334 return Err(SessionError { 335 value: SessionErrorValue::NoAppName, 336 }); 337 } 338 }; 339 340 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 341 netconnection 342 .write_connect_response( 343 &transaction_id, 344 &define::FMSVER.to_string(), 345 &define::CAPABILITIES, 346 &String::from("NetConnection.Connect.Success"), 347 &define::LEVEL.to_string(), 348 &String::from("Connection Succeeded."), 349 encoding, 350 ) 351 .await?; 352 353 Ok(()) 354 } 355 356 pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 357 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 358 netconnection 359 .write_create_stream_response(transaction_id, &define::STREAM_ID) 360 .await?; 361 362 log::info!( 363 "[ S->C ] [create_stream_response] app_name: {}", 364 self.app_name, 365 ); 366 367 Ok(()) 368 } 369 370 pub async fn on_delete_stream( 371 &mut self, 372 transaction_id: &f64, 373 stream_id: &f64, 374 ) -> Result<(), SessionError> { 375 self.common 376 .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) 377 .await?; 378 379 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 380 netstream 381 .write_on_status( 382 transaction_id, 383 &"status".to_string(), 384 &"NetStream.DeleteStream.Suceess".to_string(), 385 &"".to_string(), 386 ) 387 .await?; 388 389 //self.unsubscribe_from_channels().await?; 390 log::info!( 391 "[ S->C ] [delete stream success] app_name: {}, stream_name: {}", 392 self.app_name, 393 self.stream_name 394 ); 395 log::trace!("{}", stream_id); 396 397 Ok(()) 398 } 399 pub async fn on_play( 400 &mut self, 401 transaction_id: &f64, 402 stream_id: &u32, 403 other_values: &mut Vec<Amf0ValueType>, 404 ) -> Result<(), SessionError> { 405 let length = other_values.len() as u8; 406 let mut index: u8 = 0; 407 408 let mut stream_name: Option<String> = None; 409 let mut start: Option<f64> = None; 410 let mut duration: Option<f64> = None; 411 let mut reset: Option<bool> = None; 412 413 loop { 414 if index >= length { 415 break; 416 } 417 index = index + 1; 418 stream_name = match other_values.remove(0) { 419 Amf0ValueType::UTF8String(val) => Some(val), 420 _ => None, 421 }; 422 423 if index >= length { 424 break; 425 } 426 index = index + 1; 427 start = match other_values.remove(0) { 428 Amf0ValueType::Number(val) => Some(val), 429 _ => None, 430 }; 431 432 if index >= length { 433 break; 434 } 435 index = index + 1; 436 duration = match other_values.remove(0) { 437 Amf0ValueType::Number(val) => Some(val), 438 _ => None, 439 }; 440 441 if index >= length { 442 break; 443 } 444 //index = index + 1; 445 reset = match other_values.remove(0) { 446 Amf0ValueType::Boolean(val) => Some(val), 447 _ => None, 448 }; 449 break; 450 } 451 452 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 453 event_messages.write_stream_begin(stream_id.clone()).await?; 454 log::info!( 455 "[ S->C ] [stream begin] app_name: {}, stream_name: {}", 456 self.app_name, 457 self.stream_name 458 ); 459 log::trace!( 460 "{} {} {}", 461 start.is_some(), 462 duration.is_some(), 463 reset.is_some() 464 ); 465 466 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 467 netstream 468 .write_on_status( 469 transaction_id, 470 &"status".to_string(), 471 &"NetStream.Play.Reset".to_string(), 472 &"reset".to_string(), 473 ) 474 .await?; 475 476 netstream 477 .write_on_status( 478 transaction_id, 479 &"status".to_string(), 480 &"NetStream.Play.Start".to_string(), 481 &"play start".to_string(), 482 ) 483 .await?; 484 485 netstream 486 .write_on_status( 487 transaction_id, 488 &"status".to_string(), 489 &"NetStream.Data.Start".to_string(), 490 &"data start.".to_string(), 491 ) 492 .await?; 493 494 netstream 495 .write_on_status( 496 transaction_id, 497 &"status".to_string(), 498 &"NetStream.Play.PublishNotify".to_string(), 499 &"play publish notify.".to_string(), 500 ) 501 .await?; 502 503 event_messages 504 .write_stream_is_record(stream_id.clone()) 505 .await?; 506 log::info!( 507 "[ S->C ] [stream is record] app_name: {}, stream_name: {}", 508 self.app_name, 509 self.stream_name 510 ); 511 self.stream_name = stream_name.clone().unwrap(); 512 self.common 513 .subscribe_from_channels( 514 self.app_name.clone(), 515 stream_name.unwrap(), 516 self.subscriber_id, 517 ) 518 .await?; 519 520 self.state = ServerSessionState::Play; 521 522 Ok(()) 523 } 524 525 pub async fn on_publish( 526 &mut self, 527 transaction_id: &f64, 528 stream_id: &u32, 529 other_values: &mut Vec<Amf0ValueType>, 530 ) -> Result<(), SessionError> { 531 let length = other_values.len(); 532 533 if length < 2 { 534 return Err(SessionError { 535 value: SessionErrorValue::Amf0ValueCountNotCorrect, 536 }); 537 } 538 539 let stream_name = match other_values.remove(0) { 540 Amf0ValueType::UTF8String(val) => val, 541 _ => { 542 return Err(SessionError { 543 value: SessionErrorValue::Amf0ValueCountNotCorrect, 544 }); 545 } 546 }; 547 548 self.stream_name = stream_name; 549 550 let _ = match other_values.remove(0) { 551 Amf0ValueType::UTF8String(val) => val, 552 _ => { 553 return Err(SessionError { 554 value: SessionErrorValue::Amf0ValueCountNotCorrect, 555 }); 556 } 557 }; 558 559 log::info!( 560 "[ S<-C ] [publish] app_name: {}, stream_name: {}", 561 self.app_name, 562 self.stream_name 563 ); 564 565 log::info!( 566 "[ S->C ] [stream begin] app_name: {}, stream_name: {}", 567 self.app_name, 568 self.stream_name 569 ); 570 571 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 572 event_messages.write_stream_begin(stream_id.clone()).await?; 573 574 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 575 netstream 576 .write_on_status( 577 transaction_id, 578 &"status".to_string(), 579 &"NetStream.Publish.Start".to_string(), 580 &"".to_string(), 581 ) 582 .await?; 583 log::info!( 584 "[ S->C ] [NetStream.Publish.Start] app_name: {}, stream_name: {}", 585 self.app_name, 586 self.stream_name 587 ); 588 589 self.common 590 .publish_to_channels(self.app_name.clone(), self.stream_name.clone()) 591 .await?; 592 593 Ok(()) 594 } 595 } 596