1 use { 2 super::{ 3 common::Common, 4 define, 5 define::SessionType, 6 errors::{SessionError, SessionErrorValue}, 7 }, 8 crate::{ 9 amf0::Amf0ValueType, 10 channels::define::ChannelEventProducer, 11 chunk::{ 12 define::{chunk_type, csid_type, CHUNK_SIZE}, 13 packetizer::ChunkPacketizer, 14 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 15 ChunkInfo, 16 }, 17 config, 18 handshake::handshake::{HandshakeServer, ServerHandshakeState}, 19 messages::{ 20 define::{msg_type_id, RtmpMessageData}, 21 parser::MessageParser, 22 }, 23 netconnection::commands::NetConnection, 24 netstream::writer::NetStreamWriter, 25 protocol_control_messages::writer::ProtocolControlMessagesWriter, 26 user_control_messages::writer::EventMessagesWriter, 27 }, 28 bytes::BytesMut, 29 bytesio::{ 30 bytes_writer::{AsyncBytesWriter, BytesWriter}, 31 bytesio::BytesIO, 32 }, 33 std::{collections::HashMap, sync::Arc}, 34 tokio::{net::TcpStream, sync::Mutex}, 35 uuid::Uuid, 36 }; 37 38 enum ServerSessionState { 39 Handshake, 40 ReadChunk, 41 // OnConnect, 42 // OnCreateStream, 43 //Publish, 44 Play, 45 } 46 47 pub struct ServerSession { 48 pub app_name: String, 49 pub stream_name: String, 50 51 io: Arc<Mutex<BytesIO>>, 52 handshaker: HandshakeServer, 53 54 packetizer: ChunkPacketizer, 55 unpacketizer: ChunkUnpacketizer, 56 57 state: ServerSessionState, 58 59 pub common: Common, 60 61 bytesio_data: BytesMut, 62 need_process: bool, 63 64 /* Used to mark the subscriber's the data producer 65 in channels and delete it from map when unsubscribe 66 is called. */ 67 pub subscriber_id: Uuid, 68 69 connect_command_object: Option<HashMap<String, Amf0ValueType>>, 70 } 71 72 impl ServerSession { 73 pub fn new(stream: TcpStream, event_producer: ChannelEventProducer) -> Self { 74 let net_io = Arc::new(Mutex::new(BytesIO::new(stream))); 75 let subscriber_id = Uuid::new_v4(); 76 Self { 77 app_name: String::from(""), 78 stream_name: String::from(""), 79 80 io: Arc::clone(&net_io), 81 handshaker: HandshakeServer::new(Arc::clone(&net_io)), 82 83 packetizer: ChunkPacketizer::new(Arc::clone(&net_io)), 84 unpacketizer: ChunkUnpacketizer::new(), 85 86 state: ServerSessionState::Handshake, 87 88 common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Server), 89 90 subscriber_id, 91 bytesio_data: BytesMut::new(), 92 need_process: false, 93 94 connect_command_object: None, 95 } 96 } 97 98 pub async fn run(&mut self) -> Result<(), SessionError> { 99 loop { 100 match self.state { 101 ServerSessionState::Handshake => { 102 self.handshake().await?; 103 } 104 ServerSessionState::ReadChunk => { 105 self.read_parse_chunks().await?; 106 } 107 ServerSessionState::Play => { 108 self.play().await?; 109 } 110 } 111 } 112 113 //Ok(()) 114 } 115 116 async fn handshake(&mut self) -> Result<(), SessionError> { 117 self.bytesio_data = self.io.lock().await.read().await?; 118 self.handshaker.extend_data(&self.bytesio_data[..]); 119 self.handshaker.handshake().await?; 120 121 match self.handshaker.state() { 122 ServerHandshakeState::Finish => { 123 self.state = ServerSessionState::ReadChunk; 124 125 let left_bytes = self.handshaker.get_remaining_bytes(); 126 if left_bytes.len() > 0 { 127 self.unpacketizer.extend_data(&left_bytes[..]); 128 self.need_process = true; 129 } 130 self.send_set_chunk_size().await?; 131 return Ok(()); 132 } 133 _ => {} 134 } 135 136 Ok(()) 137 } 138 139 async fn read_parse_chunks(&mut self) -> Result<(), SessionError> { 140 if !self.need_process { 141 self.bytesio_data = self.io.lock().await.read().await?; 142 self.unpacketizer.extend_data(&self.bytesio_data[..]); 143 } 144 145 self.need_process = false; 146 147 loop { 148 let result = self.unpacketizer.read_chunks(); 149 150 if let Ok(rv) = result { 151 match rv { 152 UnpackResult::Chunks(chunks) => { 153 for chunk_info in chunks.iter() { 154 let mut msg = MessageParser::new(chunk_info.clone()).parse()?; 155 156 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 157 let timestamp = chunk_info.message_header.timestamp; 158 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 159 .await?; 160 } 161 } 162 _ => {} 163 } 164 } else { 165 break; 166 } 167 } 168 Ok(()) 169 } 170 171 async fn play(&mut self) -> Result<(), SessionError> { 172 match self.common.send_channel_data().await { 173 Ok(_) => {} 174 175 Err(err) => { 176 self.common 177 .unsubscribe_from_channels( 178 self.app_name.clone(), 179 self.stream_name.clone(), 180 self.subscriber_id, 181 ) 182 .await?; 183 return Err(err); 184 } 185 } 186 187 Ok(()) 188 } 189 190 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 191 let mut controlmessage = 192 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 193 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 194 195 Ok(()) 196 } 197 198 pub async fn process_messages( 199 &mut self, 200 rtmp_msg: &mut RtmpMessageData, 201 msg_stream_id: &u32, 202 timestamp: &u32, 203 ) -> Result<(), SessionError> { 204 match rtmp_msg { 205 RtmpMessageData::Amf0Command { 206 command_name, 207 transaction_id, 208 command_object, 209 others, 210 } => { 211 self.on_amf0_command_message( 212 msg_stream_id, 213 command_name, 214 transaction_id, 215 command_object, 216 others, 217 ) 218 .await? 219 } 220 RtmpMessageData::SetChunkSize { chunk_size } => { 221 self.on_set_chunk_size(chunk_size.clone() as usize)?; 222 } 223 RtmpMessageData::AudioData { data } => { 224 self.common.on_audio_data(data, timestamp)?; 225 } 226 RtmpMessageData::VideoData { data } => { 227 self.common.on_video_data(data, timestamp)?; 228 } 229 RtmpMessageData::AmfData { raw_data } => { 230 self.common.on_meta_data(raw_data, timestamp)?; 231 } 232 233 _ => {} 234 } 235 Ok(()) 236 } 237 238 pub async fn on_amf0_command_message( 239 &mut self, 240 stream_id: &u32, 241 command_name: &Amf0ValueType, 242 transaction_id: &Amf0ValueType, 243 command_object: &Amf0ValueType, 244 others: &mut Vec<Amf0ValueType>, 245 ) -> Result<(), SessionError> { 246 let empty_cmd_name = &String::new(); 247 let cmd_name = match command_name { 248 Amf0ValueType::UTF8String(str) => str, 249 _ => empty_cmd_name, 250 }; 251 252 let transaction_id = match transaction_id { 253 Amf0ValueType::Number(number) => number, 254 _ => &0.0, 255 }; 256 257 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 258 let obj = match command_object { 259 Amf0ValueType::Object(obj) => obj, 260 _ => &empty_cmd_obj, 261 }; 262 263 match cmd_name.as_str() { 264 "connect" => { 265 self.on_connect(&transaction_id, &obj).await?; 266 } 267 "createStream" => { 268 self.on_create_stream(transaction_id).await?; 269 } 270 "deleteStream" => { 271 if others.len() > 0 { 272 let stream_id = match others.pop() { 273 Some(val) => match val { 274 Amf0ValueType::Number(streamid) => streamid, 275 _ => 0.0, 276 }, 277 _ => 0.0, 278 }; 279 self.on_delete_stream(transaction_id, &stream_id).await?; 280 } 281 } 282 "play" => { 283 self.unpacketizer.session_type = config::SERVER_PULL; 284 self.on_play(transaction_id, stream_id, others).await?; 285 } 286 "publish" => { 287 self.unpacketizer.session_type = config::SERVER_PUSH; 288 self.on_publish(transaction_id, stream_id, others).await?; 289 } 290 _ => {} 291 } 292 293 Ok(()) 294 } 295 296 fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> { 297 self.unpacketizer.update_max_chunk_size(chunk_size); 298 Ok(()) 299 } 300 301 async fn on_connect( 302 &mut self, 303 transaction_id: &f64, 304 command_obj: &HashMap<String, Amf0ValueType>, 305 ) -> Result<(), SessionError> { 306 log::info!( 307 "[ S<-C ] connect and the transaction id: {}", 308 transaction_id 309 ); 310 311 self.connect_command_object = Some(command_obj.clone()); 312 let mut control_message = 313 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 314 control_message 315 .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 316 .await?; 317 control_message 318 .write_set_peer_bandwidth( 319 define::PEER_BANDWIDTH, 320 define::peer_bandwidth_limit_type::DYNAMIC, 321 ) 322 .await?; 323 //control_message.write_set_chunk_size(CHUNK_SIZE).await?; 324 325 let obj_encoding = command_obj.get("objectEncoding"); 326 let encoding = match obj_encoding { 327 Some(Amf0ValueType::Number(encoding)) => encoding, 328 _ => &define::OBJENCODING_AMF0, 329 }; 330 331 let app_name = command_obj.get("app"); 332 self.app_name = match app_name { 333 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 334 _ => { 335 return Err(SessionError { 336 value: SessionErrorValue::NoAppName, 337 }); 338 } 339 }; 340 341 let mut netconnection = NetConnection::new(BytesWriter::new()); 342 let data = netconnection.connect_response( 343 &transaction_id, 344 &define::FMSVER.to_string(), 345 &define::CAPABILITIES, 346 &String::from("NetConnection.Connect.Success"), 347 &define::LEVEL.to_string(), 348 &String::from("Connection Succeeded."), 349 encoding, 350 )?; 351 352 let mut chunk_info = ChunkInfo::new( 353 csid_type::COMMAND_AMF0_AMF3, 354 chunk_type::TYPE_0, 355 0, 356 data.len() as u32, 357 msg_type_id::COMMAND_AMF0, 358 0, 359 data, 360 ); 361 362 self.packetizer.write_chunk(&mut chunk_info).await?; 363 364 Ok(()) 365 } 366 367 pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 368 log::info!( 369 "[ S<-C ] create stream and the transaction id: {}", 370 transaction_id 371 ); 372 373 let mut netconnection = NetConnection::new(BytesWriter::new()); 374 let data = netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?; 375 376 let mut chunk_info = ChunkInfo::new( 377 csid_type::COMMAND_AMF0_AMF3, 378 chunk_type::TYPE_0, 379 0, 380 data.len() as u32, 381 msg_type_id::COMMAND_AMF0, 382 0, 383 data, 384 ); 385 386 self.packetizer.write_chunk(&mut chunk_info).await?; 387 388 Ok(()) 389 } 390 391 pub async fn on_delete_stream( 392 &mut self, 393 transaction_id: &f64, 394 stream_id: &f64, 395 ) -> Result<(), SessionError> { 396 log::info!( 397 "[ S<-C ] delete stream and the transaction id :{}, the stream id : {}", 398 transaction_id, 399 stream_id 400 ); 401 402 self.common 403 .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) 404 .await?; 405 406 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 407 netstream 408 .on_status( 409 transaction_id, 410 &"status".to_string(), 411 &"NetStream.DeleteStream.Suceess".to_string(), 412 &"".to_string(), 413 ) 414 .await?; 415 416 //self.unsubscribe_from_channels().await?; 417 418 Ok(()) 419 } 420 pub async fn on_play( 421 &mut self, 422 transaction_id: &f64, 423 stream_id: &u32, 424 other_values: &mut Vec<Amf0ValueType>, 425 ) -> Result<(), SessionError> { 426 let length = other_values.len() as u8; 427 let mut index: u8 = 0; 428 429 let mut stream_name: Option<String> = None; 430 let mut start: Option<f64> = None; 431 let mut duration: Option<f64> = None; 432 let mut reset: Option<bool> = None; 433 434 loop { 435 if index >= length { 436 break; 437 } 438 index = index + 1; 439 stream_name = match other_values.remove(0) { 440 Amf0ValueType::UTF8String(val) => Some(val), 441 _ => None, 442 }; 443 444 if index >= length { 445 break; 446 } 447 index = index + 1; 448 start = match other_values.remove(0) { 449 Amf0ValueType::Number(val) => Some(val), 450 _ => None, 451 }; 452 453 if index >= length { 454 break; 455 } 456 index = index + 1; 457 duration = match other_values.remove(0) { 458 Amf0ValueType::Number(val) => Some(val), 459 _ => None, 460 }; 461 462 if index >= length { 463 break; 464 } 465 //index = index + 1; 466 reset = match other_values.remove(0) { 467 Amf0ValueType::Boolean(val) => Some(val), 468 _ => None, 469 }; 470 break; 471 } 472 log::info!( 473 "on_play, start: {}, duration: {}, reset: {}", 474 start.is_some(), 475 duration.is_some(), 476 reset.is_some() 477 ); 478 479 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 480 event_messages.write_stream_begin(stream_id.clone()).await?; 481 482 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 483 netstream 484 .on_status( 485 transaction_id, 486 &"status".to_string(), 487 &"NetStream.Play.Reset".to_string(), 488 &"reset".to_string(), 489 ) 490 .await?; 491 492 netstream 493 .on_status( 494 transaction_id, 495 &"status".to_string(), 496 &"NetStream.Play.Start".to_string(), 497 &"play start".to_string(), 498 ) 499 .await?; 500 501 netstream 502 .on_status( 503 transaction_id, 504 &"status".to_string(), 505 &"NetStream.Data.Start".to_string(), 506 &"data start.".to_string(), 507 ) 508 .await?; 509 510 netstream 511 .on_status( 512 transaction_id, 513 &"status".to_string(), 514 &"NetStream.Play.PublishNotify".to_string(), 515 &"play publish notify.".to_string(), 516 ) 517 .await?; 518 519 event_messages 520 .write_stream_is_record(stream_id.clone()) 521 .await?; 522 523 self.stream_name = stream_name.clone().unwrap(); 524 self.common 525 .subscribe_from_channels( 526 self.app_name.clone(), 527 stream_name.unwrap(), 528 self.subscriber_id, 529 ) 530 .await?; 531 532 self.state = ServerSessionState::Play; 533 534 Ok(()) 535 } 536 537 pub async fn on_publish( 538 &mut self, 539 transaction_id: &f64, 540 stream_id: &u32, 541 other_values: &mut Vec<Amf0ValueType>, 542 ) -> Result<(), SessionError> { 543 let length = other_values.len(); 544 545 if length < 2 { 546 return Err(SessionError { 547 value: SessionErrorValue::Amf0ValueCountNotCorrect, 548 }); 549 } 550 551 let stream_name = match other_values.remove(0) { 552 Amf0ValueType::UTF8String(val) => val, 553 _ => { 554 return Err(SessionError { 555 value: SessionErrorValue::Amf0ValueCountNotCorrect, 556 }); 557 } 558 }; 559 560 self.stream_name = stream_name; 561 562 let _ = match other_values.remove(0) { 563 Amf0ValueType::UTF8String(val) => val, 564 _ => { 565 return Err(SessionError { 566 value: SessionErrorValue::Amf0ValueCountNotCorrect, 567 }); 568 } 569 }; 570 571 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 572 event_messages.write_stream_begin(stream_id.clone()).await?; 573 574 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 575 netstream 576 .on_status( 577 transaction_id, 578 &"status".to_string(), 579 &"NetStream.Publish.Start".to_string(), 580 &"".to_string(), 581 ) 582 .await?; 583 584 self.common 585 .publish_to_channels(self.app_name.clone(), self.stream_name.clone()) 586 .await?; 587 588 Ok(()) 589 } 590 } 591