1 use { 2 super::{ 3 common::Common, 4 define, 5 define::SessionType, 6 errors::{SessionError, SessionErrorValue}, 7 }, 8 crate::{ 9 amf0::Amf0ValueType, 10 channels::define::ChannelEventProducer, 11 chunk::{ 12 define::{chunk_type, csid_type, CHUNK_SIZE}, 13 packetizer::ChunkPacketizer, 14 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 15 ChunkInfo, 16 }, 17 config, 18 handshake::handshake::{HandshakeServer, ServerHandshakeState}, 19 messages::{ 20 define::{msg_type_id, RtmpMessageData}, 21 parser::MessageParser, 22 }, 23 netconnection::commands::NetConnection, 24 netstream::writer::NetStreamWriter, 25 protocol_control_messages::writer::ProtocolControlMessagesWriter, 26 user_control_messages::writer::EventMessagesWriter, 27 }, 28 bytes::BytesMut, 29 bytesio::{ 30 bytes_writer::{AsyncBytesWriter, BytesWriter}, 31 bytesio::BytesIO, 32 }, 33 std::{collections::HashMap, sync::Arc}, 34 tokio::{net::TcpStream, sync::Mutex}, 35 uuid::Uuid, 36 }; 37 38 enum ServerSessionState { 39 Handshake, 40 ReadChunk, 41 // OnConnect, 42 // OnCreateStream, 43 //Publish, 44 Play, 45 } 46 47 pub struct ServerSession { 48 pub app_name: String, 49 pub stream_name: String, 50 51 io: Arc<Mutex<BytesIO>>, 52 handshaker: HandshakeServer, 53 54 packetizer: ChunkPacketizer, 55 unpacketizer: ChunkUnpacketizer, 56 57 state: ServerSessionState, 58 59 pub common: Common, 60 61 bytesio_data: BytesMut, 62 need_process: bool, 63 64 /* Used to mark the subscriber's the data producer 65 in channels and delete it from map when unsubscribe 66 is called. */ 67 pub subscriber_id: Uuid, 68 69 connect_command_object: Option<HashMap<String, Amf0ValueType>>, 70 } 71 72 impl ServerSession { 73 pub fn new(stream: TcpStream, event_producer: ChannelEventProducer) -> Self { 74 let net_io = Arc::new(Mutex::new(BytesIO::new(stream))); 75 let subscriber_id = Uuid::new_v4(); 76 Self { 77 app_name: String::from(""), 78 stream_name: String::from(""), 79 80 io: Arc::clone(&net_io), 81 handshaker: HandshakeServer::new(Arc::clone(&net_io)), 82 83 packetizer: ChunkPacketizer::new(Arc::clone(&net_io)), 84 unpacketizer: ChunkUnpacketizer::new(), 85 86 state: ServerSessionState::Handshake, 87 88 common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Server), 89 90 subscriber_id, 91 bytesio_data: BytesMut::new(), 92 need_process: false, 93 94 connect_command_object: None, 95 } 96 } 97 98 pub async fn run(&mut self) -> Result<(), SessionError> { 99 loop { 100 match self.state { 101 ServerSessionState::Handshake => { 102 self.handshake().await?; 103 } 104 ServerSessionState::ReadChunk => { 105 self.read_parse_chunks().await?; 106 } 107 ServerSessionState::Play => { 108 self.play().await?; 109 } 110 } 111 } 112 113 //Ok(()) 114 } 115 116 async fn handshake(&mut self) -> Result<(), SessionError> { 117 self.bytesio_data = self.io.lock().await.read().await?; 118 self.handshaker.extend_data(&self.bytesio_data[..]); 119 self.handshaker.handshake().await?; 120 121 match self.handshaker.state() { 122 ServerHandshakeState::Finish => { 123 self.state = ServerSessionState::ReadChunk; 124 125 let left_bytes = self.handshaker.get_remaining_bytes(); 126 if left_bytes.len() > 0 { 127 self.unpacketizer.extend_data(&left_bytes[..]); 128 self.need_process = true; 129 } 130 self.send_set_chunk_size().await?; 131 return Ok(()); 132 } 133 _ => {} 134 } 135 136 Ok(()) 137 } 138 139 async fn read_parse_chunks(&mut self) -> Result<(), SessionError> { 140 if !self.need_process { 141 self.bytesio_data = self.io.lock().await.read().await?; 142 self.unpacketizer.extend_data(&self.bytesio_data[..]); 143 } 144 145 self.need_process = false; 146 147 loop { 148 let result = self.unpacketizer.read_chunks(); 149 150 if let Ok(rv) = result { 151 match rv { 152 UnpackResult::Chunks(chunks) => { 153 for chunk_info in chunks.iter() { 154 let mut msg = MessageParser::new(chunk_info.clone()).parse()?; 155 156 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 157 let timestamp = chunk_info.message_header.timestamp; 158 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 159 .await?; 160 } 161 } 162 _ => {} 163 } 164 } else { 165 break; 166 } 167 } 168 Ok(()) 169 } 170 171 async fn play(&mut self) -> Result<(), SessionError> { 172 match self.common.send_channel_data().await { 173 Ok(_) => {} 174 175 Err(err) => { 176 self.common 177 .unsubscribe_from_channels( 178 self.app_name.clone(), 179 self.stream_name.clone(), 180 self.subscriber_id, 181 ) 182 .await?; 183 return Err(err); 184 } 185 } 186 187 Ok(()) 188 } 189 190 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 191 let mut controlmessage = 192 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 193 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 194 195 Ok(()) 196 } 197 198 pub async fn process_messages( 199 &mut self, 200 rtmp_msg: &mut RtmpMessageData, 201 msg_stream_id: &u32, 202 timestamp: &u32, 203 ) -> Result<(), SessionError> { 204 match rtmp_msg { 205 RtmpMessageData::Amf0Command { 206 command_name, 207 transaction_id, 208 command_object, 209 others, 210 } => { 211 self.on_amf0_command_message( 212 msg_stream_id, 213 command_name, 214 transaction_id, 215 command_object, 216 others, 217 ) 218 .await? 219 } 220 RtmpMessageData::SetChunkSize { chunk_size } => { 221 self.on_set_chunk_size(chunk_size.clone() as usize)?; 222 } 223 RtmpMessageData::AudioData { data } => { 224 self.common.on_audio_data(data, timestamp)?; 225 } 226 RtmpMessageData::VideoData { data } => { 227 self.common.on_video_data(data, timestamp)?; 228 } 229 RtmpMessageData::AmfData { raw_data } => { 230 self.common.on_meta_data(raw_data, timestamp)?; 231 } 232 233 _ => {} 234 } 235 Ok(()) 236 } 237 238 pub async fn on_amf0_command_message( 239 &mut self, 240 stream_id: &u32, 241 command_name: &Amf0ValueType, 242 transaction_id: &Amf0ValueType, 243 command_object: &Amf0ValueType, 244 others: &mut Vec<Amf0ValueType>, 245 ) -> Result<(), SessionError> { 246 let empty_cmd_name = &String::new(); 247 let cmd_name = match command_name { 248 Amf0ValueType::UTF8String(str) => str, 249 _ => empty_cmd_name, 250 }; 251 252 let transaction_id = match transaction_id { 253 Amf0ValueType::Number(number) => number, 254 _ => &0.0, 255 }; 256 257 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 258 let obj = match command_object { 259 Amf0ValueType::Object(obj) => obj, 260 _ => &empty_cmd_obj, 261 }; 262 263 match cmd_name.as_str() { 264 "connect" => { 265 log::info!("[ S<-C ] [connect] "); 266 self.on_connect(&transaction_id, &obj).await?; 267 } 268 "createStream" => { 269 log::info!("[ S<-C ] [create stream] "); 270 self.on_create_stream(transaction_id).await?; 271 } 272 "deleteStream" => { 273 if others.len() > 0 { 274 let stream_id = match others.pop() { 275 Some(val) => match val { 276 Amf0ValueType::Number(streamid) => streamid, 277 _ => 0.0, 278 }, 279 _ => 0.0, 280 }; 281 log::info!( 282 "[ S<-C ] [delete stream] app_name: {}, stream_name: {}", 283 self.app_name, 284 self.stream_name 285 ); 286 287 self.on_delete_stream(transaction_id, &stream_id).await?; 288 } 289 } 290 "play" => { 291 log::info!( 292 "[ S<-C ] [play] app_name: {}, stream_name: {}", 293 self.app_name, 294 self.stream_name 295 ); 296 self.unpacketizer.session_type = config::SERVER_PULL; 297 self.on_play(transaction_id, stream_id, others).await?; 298 } 299 "publish" => { 300 self.unpacketizer.session_type = config::SERVER_PUSH; 301 self.on_publish(transaction_id, stream_id, others).await?; 302 } 303 _ => {} 304 } 305 306 Ok(()) 307 } 308 309 fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> { 310 self.unpacketizer.update_max_chunk_size(chunk_size); 311 Ok(()) 312 } 313 314 async fn on_connect( 315 &mut self, 316 transaction_id: &f64, 317 command_obj: &HashMap<String, Amf0ValueType>, 318 ) -> Result<(), SessionError> { 319 self.connect_command_object = Some(command_obj.clone()); 320 let mut control_message = 321 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 322 log::info!("[ S->C ] [set window_acknowledgement_size]"); 323 control_message 324 .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 325 .await?; 326 log::info!("[ S->C ] [set set_peer_bandwidth]",); 327 control_message 328 .write_set_peer_bandwidth( 329 define::PEER_BANDWIDTH, 330 define::peer_bandwidth_limit_type::DYNAMIC, 331 ) 332 .await?; 333 334 let obj_encoding = command_obj.get("objectEncoding"); 335 let encoding = match obj_encoding { 336 Some(Amf0ValueType::Number(encoding)) => encoding, 337 _ => &define::OBJENCODING_AMF0, 338 }; 339 340 let app_name = command_obj.get("app"); 341 self.app_name = match app_name { 342 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 343 _ => { 344 return Err(SessionError { 345 value: SessionErrorValue::NoAppName, 346 }); 347 } 348 }; 349 350 let mut netconnection = NetConnection::new(BytesWriter::new()); 351 let data = netconnection.connect_response( 352 &transaction_id, 353 &define::FMSVER.to_string(), 354 &define::CAPABILITIES, 355 &String::from("NetConnection.Connect.Success"), 356 &define::LEVEL.to_string(), 357 &String::from("Connection Succeeded."), 358 encoding, 359 )?; 360 361 let mut chunk_info = ChunkInfo::new( 362 csid_type::COMMAND_AMF0_AMF3, 363 chunk_type::TYPE_0, 364 0, 365 data.len() as u32, 366 msg_type_id::COMMAND_AMF0, 367 0, 368 data, 369 ); 370 371 self.packetizer.write_chunk(&mut chunk_info).await?; 372 373 Ok(()) 374 } 375 376 pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 377 let mut netconnection = NetConnection::new(BytesWriter::new()); 378 let data = netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?; 379 380 let mut chunk_info = ChunkInfo::new( 381 csid_type::COMMAND_AMF0_AMF3, 382 chunk_type::TYPE_0, 383 0, 384 data.len() as u32, 385 msg_type_id::COMMAND_AMF0, 386 0, 387 data, 388 ); 389 390 self.packetizer.write_chunk(&mut chunk_info).await?; 391 392 log::info!( 393 "[ S->C ] [create_stream_response] app_name: {}", 394 self.app_name, 395 ); 396 397 Ok(()) 398 } 399 400 pub async fn on_delete_stream( 401 &mut self, 402 transaction_id: &f64, 403 stream_id: &f64, 404 ) -> Result<(), SessionError> { 405 self.common 406 .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) 407 .await?; 408 409 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 410 netstream 411 .on_status( 412 transaction_id, 413 &"status".to_string(), 414 &"NetStream.DeleteStream.Suceess".to_string(), 415 &"".to_string(), 416 ) 417 .await?; 418 419 //self.unsubscribe_from_channels().await?; 420 log::info!( 421 "[ S->C ] [delete stream success] app_name: {}, stream_name: {}", 422 self.app_name, 423 self.stream_name 424 ); 425 log::trace!("{}", stream_id); 426 427 Ok(()) 428 } 429 pub async fn on_play( 430 &mut self, 431 transaction_id: &f64, 432 stream_id: &u32, 433 other_values: &mut Vec<Amf0ValueType>, 434 ) -> Result<(), SessionError> { 435 let length = other_values.len() as u8; 436 let mut index: u8 = 0; 437 438 let mut stream_name: Option<String> = None; 439 let mut start: Option<f64> = None; 440 let mut duration: Option<f64> = None; 441 let mut reset: Option<bool> = None; 442 443 loop { 444 if index >= length { 445 break; 446 } 447 index = index + 1; 448 stream_name = match other_values.remove(0) { 449 Amf0ValueType::UTF8String(val) => Some(val), 450 _ => None, 451 }; 452 453 if index >= length { 454 break; 455 } 456 index = index + 1; 457 start = match other_values.remove(0) { 458 Amf0ValueType::Number(val) => Some(val), 459 _ => None, 460 }; 461 462 if index >= length { 463 break; 464 } 465 index = index + 1; 466 duration = match other_values.remove(0) { 467 Amf0ValueType::Number(val) => Some(val), 468 _ => None, 469 }; 470 471 if index >= length { 472 break; 473 } 474 //index = index + 1; 475 reset = match other_values.remove(0) { 476 Amf0ValueType::Boolean(val) => Some(val), 477 _ => None, 478 }; 479 break; 480 } 481 482 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 483 event_messages.write_stream_begin(stream_id.clone()).await?; 484 log::info!( 485 "[ S->C ] [stream begin] app_name: {}, stream_name: {}", 486 self.app_name, 487 self.stream_name 488 ); 489 log::trace!( 490 "{} {} {}", 491 start.is_some(), 492 duration.is_some(), 493 reset.is_some() 494 ); 495 496 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 497 netstream 498 .on_status( 499 transaction_id, 500 &"status".to_string(), 501 &"NetStream.Play.Reset".to_string(), 502 &"reset".to_string(), 503 ) 504 .await?; 505 506 netstream 507 .on_status( 508 transaction_id, 509 &"status".to_string(), 510 &"NetStream.Play.Start".to_string(), 511 &"play start".to_string(), 512 ) 513 .await?; 514 515 netstream 516 .on_status( 517 transaction_id, 518 &"status".to_string(), 519 &"NetStream.Data.Start".to_string(), 520 &"data start.".to_string(), 521 ) 522 .await?; 523 524 netstream 525 .on_status( 526 transaction_id, 527 &"status".to_string(), 528 &"NetStream.Play.PublishNotify".to_string(), 529 &"play publish notify.".to_string(), 530 ) 531 .await?; 532 533 event_messages 534 .write_stream_is_record(stream_id.clone()) 535 .await?; 536 log::info!( 537 "[ S->C ] [stream is record] app_name: {}, stream_name: {}", 538 self.app_name, 539 self.stream_name 540 ); 541 self.stream_name = stream_name.clone().unwrap(); 542 self.common 543 .subscribe_from_channels( 544 self.app_name.clone(), 545 stream_name.unwrap(), 546 self.subscriber_id, 547 ) 548 .await?; 549 550 self.state = ServerSessionState::Play; 551 552 Ok(()) 553 } 554 555 pub async fn on_publish( 556 &mut self, 557 transaction_id: &f64, 558 stream_id: &u32, 559 other_values: &mut Vec<Amf0ValueType>, 560 ) -> Result<(), SessionError> { 561 let length = other_values.len(); 562 563 if length < 2 { 564 return Err(SessionError { 565 value: SessionErrorValue::Amf0ValueCountNotCorrect, 566 }); 567 } 568 569 let stream_name = match other_values.remove(0) { 570 Amf0ValueType::UTF8String(val) => val, 571 _ => { 572 return Err(SessionError { 573 value: SessionErrorValue::Amf0ValueCountNotCorrect, 574 }); 575 } 576 }; 577 578 self.stream_name = stream_name; 579 580 let _ = match other_values.remove(0) { 581 Amf0ValueType::UTF8String(val) => val, 582 _ => { 583 return Err(SessionError { 584 value: SessionErrorValue::Amf0ValueCountNotCorrect, 585 }); 586 } 587 }; 588 589 log::info!( 590 "[ S<-C ] [publish] app_name: {}, stream_name: {}", 591 self.app_name, 592 self.stream_name 593 ); 594 595 log::info!( 596 "[ S->C ] [stream begin] app_name: {}, stream_name: {}", 597 self.app_name, 598 self.stream_name 599 ); 600 601 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 602 event_messages.write_stream_begin(stream_id.clone()).await?; 603 604 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 605 netstream 606 .on_status( 607 transaction_id, 608 &"status".to_string(), 609 &"NetStream.Publish.Start".to_string(), 610 &"".to_string(), 611 ) 612 .await?; 613 log::info!( 614 "[ S->C ] [NetStream.Publish.Start] app_name: {}, stream_name: {}", 615 self.app_name, 616 self.stream_name 617 ); 618 619 self.common 620 .publish_to_channels(self.app_name.clone(), self.stream_name.clone()) 621 .await?; 622 623 Ok(()) 624 } 625 } 626