1 use { 2 super::{ 3 common::Common, 4 define, 5 define::SessionType, 6 errors::{SessionError, SessionErrorValue}, 7 }, 8 crate::{ 9 amf0::Amf0ValueType, 10 channels::define::ChannelEventProducer, 11 chunk::{ 12 define::{chunk_type, csid_type, CHUNK_SIZE}, 13 packetizer::ChunkPacketizer, 14 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 15 ChunkInfo, 16 }, 17 config, 18 handshake::handshake::{HandshakeServer, ServerHandshakeState}, 19 messages::{ 20 define::{msg_type_id, RtmpMessageData}, 21 parser::MessageParser, 22 }, 23 netconnection::commands::NetConnection, 24 netstream::writer::NetStreamWriter, 25 protocol_control_messages::writer::ProtocolControlMessagesWriter, 26 user_control_messages::writer::EventMessagesWriter, 27 }, 28 bytes::BytesMut, 29 networkio::{ 30 bytes_writer::{AsyncBytesWriter, BytesWriter}, 31 networkio::NetworkIO, 32 }, 33 std::{collections::HashMap, sync::Arc}, 34 tokio::{net::TcpStream, sync::Mutex}, 35 }; 36 37 enum ServerSessionState { 38 Handshake, 39 ReadChunk, 40 // OnConnect, 41 // OnCreateStream, 42 //Publish, 43 Play, 44 } 45 46 pub struct ServerSession { 47 app_name: String, 48 stream_name: String, 49 50 io: Arc<Mutex<NetworkIO>>, 51 handshaker: HandshakeServer, 52 53 packetizer: ChunkPacketizer, 54 unpacketizer: ChunkUnpacketizer, 55 56 state: ServerSessionState, 57 58 common: Common, 59 60 netio_data: BytesMut, 61 need_process: bool, 62 63 pub session_id: u64, 64 pub session_type: u8, 65 66 connect_command_object: Option<HashMap<String, Amf0ValueType>>, 67 } 68 69 impl ServerSession { 70 pub fn new(stream: TcpStream, event_producer: ChannelEventProducer, session_id: u64) -> Self { 71 let net_io = Arc::new(Mutex::new(NetworkIO::new(stream))); 72 73 Self { 74 app_name: String::from(""), 75 stream_name: String::from(""), 76 77 io: Arc::clone(&net_io), 78 handshaker: HandshakeServer::new(Arc::clone(&net_io)), 79 80 packetizer: ChunkPacketizer::new(Arc::clone(&net_io)), 81 unpacketizer: ChunkUnpacketizer::new(), 82 83 state: ServerSessionState::Handshake, 84 85 common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Server), 86 87 session_id: session_id, 88 netio_data: BytesMut::new(), 89 need_process: false, 90 session_type: 0, 91 connect_command_object: None, 92 } 93 } 94 95 pub async fn run(&mut self) -> Result<(), SessionError> { 96 loop { 97 match self.state { 98 ServerSessionState::Handshake => { 99 self.handshake().await?; 100 } 101 ServerSessionState::ReadChunk => { 102 self.read_parse_chunks().await?; 103 } 104 ServerSessionState::Play => { 105 self.play().await?; 106 } 107 } 108 } 109 110 //Ok(()) 111 } 112 113 async fn handshake(&mut self) -> Result<(), SessionError> { 114 self.netio_data = self.io.lock().await.read().await?; 115 self.handshaker.extend_data(&self.netio_data[..]); 116 self.handshaker.handshake().await?; 117 118 match self.handshaker.state() { 119 ServerHandshakeState::Finish => { 120 self.state = ServerSessionState::ReadChunk; 121 122 let left_bytes = self.handshaker.get_remaining_bytes(); 123 if left_bytes.len() > 0 { 124 self.unpacketizer.extend_data(&left_bytes[..]); 125 self.need_process = true; 126 } 127 self.send_set_chunk_size().await?; 128 return Ok(()); 129 } 130 _ => {} 131 } 132 133 Ok(()) 134 } 135 136 async fn read_parse_chunks(&mut self) -> Result<(), SessionError> { 137 if !self.need_process { 138 self.netio_data = self.io.lock().await.read().await?; 139 self.unpacketizer.extend_data(&self.netio_data[..]); 140 } 141 142 self.need_process = false; 143 144 loop { 145 let result = self.unpacketizer.read_chunks(); 146 147 if let Ok(rv) = result { 148 match rv { 149 UnpackResult::Chunks(chunks) => { 150 for chunk_info in chunks.iter() { 151 let mut msg = MessageParser::new(chunk_info.clone(), self.session_type) 152 .parse()?; 153 154 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 155 let timestamp = chunk_info.message_header.timestamp; 156 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 157 .await?; 158 } 159 } 160 _ => {} 161 } 162 } else { 163 break; 164 } 165 } 166 Ok(()) 167 } 168 169 async fn play(&mut self) -> Result<(), SessionError> { 170 match self.common.send_channel_data().await { 171 Ok(_) => {} 172 173 Err(err) => { 174 self.common 175 .unsubscribe_from_channels( 176 self.app_name.clone(), 177 self.stream_name.clone(), 178 self.session_id, 179 ) 180 .await?; 181 return Err(err); 182 } 183 } 184 185 Ok(()) 186 } 187 188 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 189 let mut controlmessage = 190 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 191 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 192 193 Ok(()) 194 } 195 196 pub async fn process_messages( 197 &mut self, 198 rtmp_msg: &mut RtmpMessageData, 199 msg_stream_id: &u32, 200 timestamp: &u32, 201 ) -> Result<(), SessionError> { 202 match rtmp_msg { 203 RtmpMessageData::Amf0Command { 204 command_name, 205 transaction_id, 206 command_object, 207 others, 208 } => { 209 self.on_amf0_command_message( 210 msg_stream_id, 211 command_name, 212 transaction_id, 213 command_object, 214 others, 215 ) 216 .await? 217 } 218 RtmpMessageData::SetChunkSize { chunk_size } => { 219 self.on_set_chunk_size(chunk_size.clone() as usize)?; 220 } 221 RtmpMessageData::AudioData { data } => { 222 self.common.on_audio_data(data, timestamp)?; 223 } 224 RtmpMessageData::VideoData { data } => { 225 self.common.on_video_data(data, timestamp)?; 226 } 227 RtmpMessageData::AmfData { raw_data } => { 228 self.common.on_meta_data(raw_data, timestamp)?; 229 } 230 231 _ => {} 232 } 233 Ok(()) 234 } 235 236 pub async fn on_amf0_command_message( 237 &mut self, 238 stream_id: &u32, 239 command_name: &Amf0ValueType, 240 transaction_id: &Amf0ValueType, 241 command_object: &Amf0ValueType, 242 others: &mut Vec<Amf0ValueType>, 243 ) -> Result<(), SessionError> { 244 let empty_cmd_name = &String::new(); 245 let cmd_name = match command_name { 246 Amf0ValueType::UTF8String(str) => str, 247 _ => empty_cmd_name, 248 }; 249 250 let transaction_id = match transaction_id { 251 Amf0ValueType::Number(number) => number, 252 _ => &0.0, 253 }; 254 255 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 256 let obj = match command_object { 257 Amf0ValueType::Object(obj) => obj, 258 _ => &empty_cmd_obj, 259 }; 260 261 match cmd_name.as_str() { 262 "connect" => { 263 self.on_connect(&transaction_id, &obj).await?; 264 } 265 "createStream" => { 266 self.on_create_stream(transaction_id).await?; 267 } 268 "deleteStream" => { 269 if others.len() > 0 { 270 let stream_id = match others.pop() { 271 Some(val) => match val { 272 Amf0ValueType::Number(streamid) => streamid, 273 _ => 0.0, 274 }, 275 _ => 0.0, 276 }; 277 self.on_delete_stream(transaction_id, &stream_id).await?; 278 } 279 } 280 "play" => { 281 self.session_type = config::SERVER_PULL; 282 self.unpacketizer.session_type = config::SERVER_PULL; 283 self.on_play(transaction_id, stream_id, others).await?; 284 } 285 "publish" => { 286 self.session_type = config::SERVER_PUSH; 287 self.unpacketizer.session_type = config::SERVER_PUSH; 288 self.on_publish(transaction_id, stream_id, others).await?; 289 } 290 _ => {} 291 } 292 293 Ok(()) 294 } 295 296 fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> { 297 self.unpacketizer.update_max_chunk_size(chunk_size); 298 Ok(()) 299 } 300 301 async fn on_connect( 302 &mut self, 303 transaction_id: &f64, 304 command_obj: &HashMap<String, Amf0ValueType>, 305 ) -> Result<(), SessionError> { 306 log::info!( 307 "[ C->S ] connect and the transaction id: {}", 308 transaction_id 309 ); 310 311 self.connect_command_object = Some(command_obj.clone()); 312 let mut control_message = 313 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 314 control_message 315 .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 316 .await?; 317 control_message 318 .write_set_peer_bandwidth( 319 define::PEER_BANDWIDTH, 320 define::peer_bandwidth_limit_type::DYNAMIC, 321 ) 322 .await?; 323 //control_message.write_set_chunk_size(CHUNK_SIZE).await?; 324 325 let obj_encoding = command_obj.get("objectEncoding"); 326 let encoding = match obj_encoding { 327 Some(Amf0ValueType::Number(encoding)) => encoding, 328 _ => &define::OBJENCODING_AMF0, 329 }; 330 331 let app_name = command_obj.get("app"); 332 self.app_name = match app_name { 333 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 334 _ => { 335 return Err(SessionError { 336 value: SessionErrorValue::NoAppName, 337 }); 338 } 339 }; 340 341 let mut netconnection = NetConnection::new(BytesWriter::new()); 342 let data = netconnection.connect_response( 343 &transaction_id, 344 &define::FMSVER.to_string(), 345 &define::CAPABILITIES, 346 &String::from("NetConnection.Connect.Success"), 347 &define::LEVEL.to_string(), 348 &String::from("Connection Succeeded."), 349 encoding, 350 )?; 351 352 let mut chunk_info = ChunkInfo::new( 353 csid_type::COMMAND_AMF0_AMF3, 354 chunk_type::TYPE_0, 355 0, 356 data.len() as u32, 357 msg_type_id::COMMAND_AMF0, 358 0, 359 data, 360 ); 361 362 self.packetizer.write_chunk(&mut chunk_info).await?; 363 364 Ok(()) 365 } 366 367 pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 368 log::info!( 369 "[ C->S ] create stream and the transaction id: {}", 370 transaction_id 371 ); 372 373 let mut netconnection = NetConnection::new(BytesWriter::new()); 374 let data = netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?; 375 376 let mut chunk_info = ChunkInfo::new( 377 csid_type::COMMAND_AMF0_AMF3, 378 chunk_type::TYPE_0, 379 0, 380 data.len() as u32, 381 msg_type_id::COMMAND_AMF0, 382 0, 383 data, 384 ); 385 386 self.packetizer.write_chunk(&mut chunk_info).await?; 387 388 Ok(()) 389 } 390 391 pub async fn on_delete_stream( 392 &mut self, 393 transaction_id: &f64, 394 stream_id: &f64, 395 ) -> Result<(), SessionError> { 396 log::info!( 397 "[ C->S ] delete stream and the transaction id :{}, the stream id : {}", 398 transaction_id, 399 stream_id 400 ); 401 402 self.common 403 .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) 404 .await?; 405 406 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 407 netstream 408 .on_status( 409 transaction_id, 410 &"status".to_string(), 411 &"NetStream.DeleteStream.Suceess".to_string(), 412 &"".to_string(), 413 ) 414 .await?; 415 416 //self.unsubscribe_from_channels().await?; 417 418 Ok(()) 419 } 420 pub async fn on_play( 421 &mut self, 422 transaction_id: &f64, 423 stream_id: &u32, 424 other_values: &mut Vec<Amf0ValueType>, 425 ) -> Result<(), SessionError> { 426 let length = other_values.len() as u8; 427 let mut index: u8 = 0; 428 429 let mut stream_name: Option<String> = None; 430 let mut start: Option<f64> = None; 431 let mut duration: Option<f64> = None; 432 let mut reset: Option<bool> = None; 433 434 loop { 435 if index >= length { 436 break; 437 } 438 index = index + 1; 439 stream_name = match other_values.remove(0) { 440 Amf0ValueType::UTF8String(val) => Some(val), 441 _ => None, 442 }; 443 444 if index >= length { 445 break; 446 } 447 index = index + 1; 448 start = match other_values.remove(0) { 449 Amf0ValueType::Number(val) => Some(val), 450 _ => None, 451 }; 452 453 if index >= length { 454 break; 455 } 456 index = index + 1; 457 duration = match other_values.remove(0) { 458 Amf0ValueType::Number(val) => Some(val), 459 _ => None, 460 }; 461 462 if index >= length { 463 break; 464 } 465 //index = index + 1; 466 reset = match other_values.remove(0) { 467 Amf0ValueType::Boolean(val) => Some(val), 468 _ => None, 469 }; 470 break; 471 } 472 log::info!( 473 "on_play, start: {}, duration: {}, reset: {}", 474 start.is_some(), 475 duration.is_some(), 476 reset.is_some() 477 ); 478 479 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 480 event_messages.write_stream_begin(stream_id.clone()).await?; 481 482 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 483 netstream 484 .on_status( 485 transaction_id, 486 &"status".to_string(), 487 &"NetStream.Play.Reset".to_string(), 488 &"reset".to_string(), 489 ) 490 .await?; 491 492 netstream 493 .on_status( 494 transaction_id, 495 &"status".to_string(), 496 &"NetStream.Play.Start".to_string(), 497 &"play start".to_string(), 498 ) 499 .await?; 500 501 netstream 502 .on_status( 503 transaction_id, 504 &"status".to_string(), 505 &"NetStream.Data.Start".to_string(), 506 &"data start.".to_string(), 507 ) 508 .await?; 509 510 netstream 511 .on_status( 512 transaction_id, 513 &"status".to_string(), 514 &"NetStream.Play.PublishNotify".to_string(), 515 &"play publish notify.".to_string(), 516 ) 517 .await?; 518 519 event_messages 520 .write_stream_is_record(stream_id.clone()) 521 .await?; 522 523 self.stream_name = stream_name.clone().unwrap(); 524 self.common 525 .subscribe_from_channels(self.app_name.clone(), stream_name.unwrap(), self.session_id) 526 .await?; 527 528 self.state = ServerSessionState::Play; 529 530 Ok(()) 531 } 532 533 pub async fn on_publish( 534 &mut self, 535 transaction_id: &f64, 536 stream_id: &u32, 537 other_values: &mut Vec<Amf0ValueType>, 538 ) -> Result<(), SessionError> { 539 let length = other_values.len(); 540 541 if length < 2 { 542 return Err(SessionError { 543 value: SessionErrorValue::Amf0ValueCountNotCorrect, 544 }); 545 } 546 547 let stream_name = match other_values.remove(0) { 548 Amf0ValueType::UTF8String(val) => val, 549 _ => { 550 return Err(SessionError { 551 value: SessionErrorValue::Amf0ValueCountNotCorrect, 552 }); 553 } 554 }; 555 556 self.stream_name = stream_name; 557 558 let _ = match other_values.remove(0) { 559 Amf0ValueType::UTF8String(val) => val, 560 _ => { 561 return Err(SessionError { 562 value: SessionErrorValue::Amf0ValueCountNotCorrect, 563 }); 564 } 565 }; 566 567 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 568 event_messages.write_stream_begin(stream_id.clone()).await?; 569 570 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 571 netstream 572 .on_status( 573 transaction_id, 574 &"status".to_string(), 575 &"NetStream.Publish.Start".to_string(), 576 &"".to_string(), 577 ) 578 .await?; 579 580 self.common 581 .publish_to_channels(self.app_name.clone(), self.stream_name.clone()) 582 .await?; 583 584 Ok(()) 585 } 586 } 587