1 use { 2 super::{ 3 common::Common, 4 define, 5 define::SessionType, 6 errors::{SessionError, SessionErrorValue}, 7 }, 8 crate::{ 9 amf0::Amf0ValueType, 10 channels::define::ChannelEventProducer, 11 chunk::{ 12 define::{chunk_type, csid_type, CHUNK_SIZE}, 13 packetizer::ChunkPacketizer, 14 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 15 ChunkInfo, 16 }, 17 config, 18 handshake::handshake::{HandshakeServer, ServerHandshakeState}, 19 messages::{ 20 define::{msg_type_id, RtmpMessageData}, 21 parser::MessageParser, 22 }, 23 netconnection::commands::NetConnection, 24 netstream::writer::NetStreamWriter, 25 protocol_control_messages::writer::ProtocolControlMessagesWriter, 26 user_control_messages::writer::EventMessagesWriter, 27 }, 28 bytes::BytesMut, 29 networkio::{ 30 bytes_writer::{AsyncBytesWriter, BytesWriter}, 31 networkio::NetworkIO, 32 }, 33 std::{collections::HashMap, sync::Arc}, 34 tokio::{net::TcpStream, sync::Mutex}, 35 }; 36 37 enum ServerSessionState { 38 Handshake, 39 ReadChunk, 40 // OnConnect, 41 // OnCreateStream, 42 //Publish, 43 Play, 44 } 45 46 pub struct ServerSession { 47 app_name: String, 48 stream_name: String, 49 50 io: Arc<Mutex<NetworkIO>>, 51 handshaker: HandshakeServer, 52 53 packetizer: ChunkPacketizer, 54 unpacketizer: ChunkUnpacketizer, 55 56 state: ServerSessionState, 57 58 common: Common, 59 60 netio_data: BytesMut, 61 need_process: bool, 62 63 pub session_id: u64, 64 pub session_type: u8, 65 66 connect_command_object: Option<HashMap<String, Amf0ValueType>>, 67 } 68 69 impl ServerSession { 70 pub fn new(stream: TcpStream, event_producer: ChannelEventProducer, session_id: u64) -> Self { 71 let net_io = Arc::new(Mutex::new(NetworkIO::new(stream))); 72 73 Self { 74 app_name: String::from(""), 75 stream_name: String::from(""), 76 77 io: Arc::clone(&net_io), 78 handshaker: HandshakeServer::new(Arc::clone(&net_io)), 79 80 packetizer: ChunkPacketizer::new(Arc::clone(&net_io)), 81 unpacketizer: ChunkUnpacketizer::new(), 82 83 state: ServerSessionState::Handshake, 84 85 common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Server), 86 87 session_id: session_id, 88 netio_data: BytesMut::new(), 89 need_process: false, 90 session_type: 0, 91 connect_command_object: None, 92 } 93 } 94 95 pub async fn run(&mut self) -> Result<(), SessionError> { 96 loop { 97 match self.state { 98 ServerSessionState::Handshake => { 99 self.handshake().await?; 100 } 101 ServerSessionState::ReadChunk => { 102 self.read_parse_chunks().await?; 103 } 104 ServerSessionState::Play => { 105 self.play().await?; 106 } 107 } 108 } 109 110 //Ok(()) 111 } 112 113 async fn handshake(&mut self) -> Result<(), SessionError> { 114 self.netio_data = self.io.lock().await.read().await?; 115 self.handshaker.extend_data(&self.netio_data[..]); 116 self.handshaker.handshake().await?; 117 118 match self.handshaker.state() { 119 ServerHandshakeState::Finish => { 120 self.state = ServerSessionState::ReadChunk; 121 122 let left_bytes = self.handshaker.get_remaining_bytes(); 123 if left_bytes.len() > 0 { 124 self.unpacketizer.extend_data(&left_bytes[..]); 125 self.need_process = true; 126 } 127 self.send_set_chunk_size().await?; 128 return Ok(()); 129 } 130 _ => {} 131 } 132 133 Ok(()) 134 } 135 136 async fn read_parse_chunks(&mut self) -> Result<(), SessionError> { 137 if !self.need_process { 138 self.netio_data = self.io.lock().await.read().await?; 139 self.unpacketizer.extend_data(&self.netio_data[..]); 140 } 141 142 self.need_process = false; 143 144 loop { 145 let result = self.unpacketizer.read_chunks(); 146 147 if let Ok(rv) = result { 148 match rv { 149 UnpackResult::Chunks(chunks) => { 150 for chunk_info in chunks.iter() { 151 let mut msg = MessageParser::new(chunk_info.clone()).parse()?; 152 153 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 154 let timestamp = chunk_info.message_header.timestamp; 155 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 156 .await?; 157 } 158 } 159 _ => {} 160 } 161 } else { 162 break; 163 } 164 } 165 Ok(()) 166 } 167 168 async fn play(&mut self) -> Result<(), SessionError> { 169 match self.common.send_channel_data().await { 170 Ok(_) => {} 171 172 Err(err) => { 173 self.common 174 .unsubscribe_from_channels( 175 self.app_name.clone(), 176 self.stream_name.clone(), 177 self.session_id, 178 ) 179 .await?; 180 return Err(err); 181 } 182 } 183 184 Ok(()) 185 } 186 187 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 188 let mut controlmessage = 189 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 190 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 191 192 Ok(()) 193 } 194 195 pub async fn process_messages( 196 &mut self, 197 rtmp_msg: &mut RtmpMessageData, 198 msg_stream_id: &u32, 199 timestamp: &u32, 200 ) -> Result<(), SessionError> { 201 match rtmp_msg { 202 RtmpMessageData::Amf0Command { 203 command_name, 204 transaction_id, 205 command_object, 206 others, 207 } => { 208 self.on_amf0_command_message( 209 msg_stream_id, 210 command_name, 211 transaction_id, 212 command_object, 213 others, 214 ) 215 .await? 216 } 217 RtmpMessageData::SetChunkSize { chunk_size } => { 218 self.on_set_chunk_size(chunk_size.clone() as usize)?; 219 } 220 RtmpMessageData::AudioData { data } => { 221 self.common.on_audio_data(data, timestamp)?; 222 } 223 RtmpMessageData::VideoData { data } => { 224 self.common.on_video_data(data, timestamp)?; 225 } 226 RtmpMessageData::AmfData { raw_data } => { 227 self.common.on_meta_data(raw_data, timestamp)?; 228 } 229 230 _ => {} 231 } 232 Ok(()) 233 } 234 235 pub async fn on_amf0_command_message( 236 &mut self, 237 stream_id: &u32, 238 command_name: &Amf0ValueType, 239 transaction_id: &Amf0ValueType, 240 command_object: &Amf0ValueType, 241 others: &mut Vec<Amf0ValueType>, 242 ) -> Result<(), SessionError> { 243 let empty_cmd_name = &String::new(); 244 let cmd_name = match command_name { 245 Amf0ValueType::UTF8String(str) => str, 246 _ => empty_cmd_name, 247 }; 248 249 let transaction_id = match transaction_id { 250 Amf0ValueType::Number(number) => number, 251 _ => &0.0, 252 }; 253 254 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 255 let obj = match command_object { 256 Amf0ValueType::Object(obj) => obj, 257 _ => &empty_cmd_obj, 258 }; 259 260 match cmd_name.as_str() { 261 "connect" => { 262 self.on_connect(&transaction_id, &obj).await?; 263 } 264 "createStream" => { 265 self.on_create_stream(transaction_id).await?; 266 } 267 "deleteStream" => { 268 if others.len() > 0 { 269 let stream_id = match others.pop() { 270 Some(val) => match val { 271 Amf0ValueType::Number(streamid) => streamid, 272 _ => 0.0, 273 }, 274 _ => 0.0, 275 }; 276 self.on_delete_stream(transaction_id, &stream_id).await?; 277 } 278 } 279 "play" => { 280 self.session_type = config::SERVER_PULL; 281 self.unpacketizer.session_type = config::SERVER_PULL; 282 self.on_play(transaction_id, stream_id, others).await?; 283 } 284 "publish" => { 285 self.session_type = config::SERVER_PUSH; 286 self.unpacketizer.session_type = config::SERVER_PUSH; 287 self.on_publish(transaction_id, stream_id, others).await?; 288 } 289 _ => {} 290 } 291 292 Ok(()) 293 } 294 295 fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> { 296 self.unpacketizer.update_max_chunk_size(chunk_size); 297 Ok(()) 298 } 299 300 async fn on_connect( 301 &mut self, 302 transaction_id: &f64, 303 command_obj: &HashMap<String, Amf0ValueType>, 304 ) -> Result<(), SessionError> { 305 log::info!( 306 "[ C->S ] connect and the transaction id: {}", 307 transaction_id 308 ); 309 310 self.connect_command_object = Some(command_obj.clone()); 311 let mut control_message = 312 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 313 control_message 314 .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 315 .await?; 316 control_message 317 .write_set_peer_bandwidth( 318 define::PEER_BANDWIDTH, 319 define::peer_bandwidth_limit_type::DYNAMIC, 320 ) 321 .await?; 322 //control_message.write_set_chunk_size(CHUNK_SIZE).await?; 323 324 let obj_encoding = command_obj.get("objectEncoding"); 325 let encoding = match obj_encoding { 326 Some(Amf0ValueType::Number(encoding)) => encoding, 327 _ => &define::OBJENCODING_AMF0, 328 }; 329 330 let app_name = command_obj.get("app"); 331 self.app_name = match app_name { 332 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 333 _ => { 334 return Err(SessionError { 335 value: SessionErrorValue::NoAppName, 336 }); 337 } 338 }; 339 340 let mut netconnection = NetConnection::new(BytesWriter::new()); 341 let data = netconnection.connect_response( 342 &transaction_id, 343 &define::FMSVER.to_string(), 344 &define::CAPABILITIES, 345 &String::from("NetConnection.Connect.Success"), 346 &define::LEVEL.to_string(), 347 &String::from("Connection Succeeded."), 348 encoding, 349 )?; 350 351 let mut chunk_info = ChunkInfo::new( 352 csid_type::COMMAND_AMF0_AMF3, 353 chunk_type::TYPE_0, 354 0, 355 data.len() as u32, 356 msg_type_id::COMMAND_AMF0, 357 0, 358 data, 359 ); 360 361 self.packetizer.write_chunk(&mut chunk_info).await?; 362 363 Ok(()) 364 } 365 366 pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 367 log::info!( 368 "[ C->S ] create stream and the transaction id: {}", 369 transaction_id 370 ); 371 372 let mut netconnection = NetConnection::new(BytesWriter::new()); 373 let data = netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?; 374 375 let mut chunk_info = ChunkInfo::new( 376 csid_type::COMMAND_AMF0_AMF3, 377 chunk_type::TYPE_0, 378 0, 379 data.len() as u32, 380 msg_type_id::COMMAND_AMF0, 381 0, 382 data, 383 ); 384 385 self.packetizer.write_chunk(&mut chunk_info).await?; 386 387 Ok(()) 388 } 389 390 pub async fn on_delete_stream( 391 &mut self, 392 transaction_id: &f64, 393 stream_id: &f64, 394 ) -> Result<(), SessionError> { 395 log::info!( 396 "[ C->S ] delete stream and the transaction id :{}, the stream id : {}", 397 transaction_id, 398 stream_id 399 ); 400 401 self.common 402 .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) 403 .await?; 404 405 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 406 netstream 407 .on_status( 408 transaction_id, 409 &"status".to_string(), 410 &"NetStream.DeleteStream.Suceess".to_string(), 411 &"".to_string(), 412 ) 413 .await?; 414 415 //self.unsubscribe_from_channels().await?; 416 417 Ok(()) 418 } 419 pub async fn on_play( 420 &mut self, 421 transaction_id: &f64, 422 stream_id: &u32, 423 other_values: &mut Vec<Amf0ValueType>, 424 ) -> Result<(), SessionError> { 425 let length = other_values.len() as u8; 426 let mut index: u8 = 0; 427 428 let mut stream_name: Option<String> = None; 429 let mut start: Option<f64> = None; 430 let mut duration: Option<f64> = None; 431 let mut reset: Option<bool> = None; 432 433 loop { 434 if index >= length { 435 break; 436 } 437 index = index + 1; 438 stream_name = match other_values.remove(0) { 439 Amf0ValueType::UTF8String(val) => Some(val), 440 _ => None, 441 }; 442 443 if index >= length { 444 break; 445 } 446 index = index + 1; 447 start = match other_values.remove(0) { 448 Amf0ValueType::Number(val) => Some(val), 449 _ => None, 450 }; 451 452 if index >= length { 453 break; 454 } 455 index = index + 1; 456 duration = match other_values.remove(0) { 457 Amf0ValueType::Number(val) => Some(val), 458 _ => None, 459 }; 460 461 if index >= length { 462 break; 463 } 464 //index = index + 1; 465 reset = match other_values.remove(0) { 466 Amf0ValueType::Boolean(val) => Some(val), 467 _ => None, 468 }; 469 break; 470 } 471 log::info!( 472 "on_play, start: {}, duration: {}, reset: {}", 473 start.is_some(), 474 duration.is_some(), 475 reset.is_some() 476 ); 477 478 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 479 event_messages.write_stream_begin(stream_id.clone()).await?; 480 481 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 482 netstream 483 .on_status( 484 transaction_id, 485 &"status".to_string(), 486 &"NetStream.Play.Reset".to_string(), 487 &"reset".to_string(), 488 ) 489 .await?; 490 491 netstream 492 .on_status( 493 transaction_id, 494 &"status".to_string(), 495 &"NetStream.Play.Start".to_string(), 496 &"play start".to_string(), 497 ) 498 .await?; 499 500 netstream 501 .on_status( 502 transaction_id, 503 &"status".to_string(), 504 &"NetStream.Data.Start".to_string(), 505 &"data start.".to_string(), 506 ) 507 .await?; 508 509 netstream 510 .on_status( 511 transaction_id, 512 &"status".to_string(), 513 &"NetStream.Play.PublishNotify".to_string(), 514 &"play publish notify.".to_string(), 515 ) 516 .await?; 517 518 event_messages 519 .write_stream_is_record(stream_id.clone()) 520 .await?; 521 522 self.stream_name = stream_name.clone().unwrap(); 523 self.common 524 .subscribe_from_channels(self.app_name.clone(), stream_name.unwrap(), self.session_id) 525 .await?; 526 527 self.state = ServerSessionState::Play; 528 529 Ok(()) 530 } 531 532 pub async fn on_publish( 533 &mut self, 534 transaction_id: &f64, 535 stream_id: &u32, 536 other_values: &mut Vec<Amf0ValueType>, 537 ) -> Result<(), SessionError> { 538 let length = other_values.len(); 539 540 if length < 2 { 541 return Err(SessionError { 542 value: SessionErrorValue::Amf0ValueCountNotCorrect, 543 }); 544 } 545 546 let stream_name = match other_values.remove(0) { 547 Amf0ValueType::UTF8String(val) => val, 548 _ => { 549 return Err(SessionError { 550 value: SessionErrorValue::Amf0ValueCountNotCorrect, 551 }); 552 } 553 }; 554 555 self.stream_name = stream_name; 556 557 let _ = match other_values.remove(0) { 558 Amf0ValueType::UTF8String(val) => val, 559 _ => { 560 return Err(SessionError { 561 value: SessionErrorValue::Amf0ValueCountNotCorrect, 562 }); 563 } 564 }; 565 566 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 567 event_messages.write_stream_begin(stream_id.clone()).await?; 568 569 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 570 netstream 571 .on_status( 572 transaction_id, 573 &"status".to_string(), 574 &"NetStream.Publish.Start".to_string(), 575 &"".to_string(), 576 ) 577 .await?; 578 579 self.common 580 .publish_to_channels(self.app_name.clone(), self.stream_name.clone()) 581 .await?; 582 583 Ok(()) 584 } 585 } 586