1 use { 2 super::{ 3 common::Common, 4 define, 5 define::SessionType, 6 errors::{SessionError, SessionErrorValue}, 7 }, 8 crate::{ 9 amf0::Amf0ValueType, 10 channels::define::ChannelEventProducer, 11 chunk::{ 12 define::{chunk_type, csid_type, CHUNK_SIZE}, 13 packetizer::ChunkPacketizer, 14 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 15 ChunkInfo, 16 }, 17 config, 18 handshake::handshake::{ServerHandshakeState, SimpleHandshakeServer}, 19 messages::{ 20 define::{msg_type_id, RtmpMessageData}, 21 parser::MessageParser, 22 }, 23 netconnection::commands::NetConnection, 24 netstream::writer::NetStreamWriter, 25 protocol_control_messages::writer::ProtocolControlMessagesWriter, 26 user_control_messages::writer::EventMessagesWriter, 27 }, 28 bytes::BytesMut, 29 networkio::{ 30 bytes_writer::{AsyncBytesWriter, BytesWriter}, 31 networkio::NetworkIO, 32 }, 33 std::{collections::HashMap, sync::Arc}, 34 tokio::{net::TcpStream, sync::Mutex}, 35 }; 36 37 enum ServerSessionState { 38 Handshake, 39 ReadChunk, 40 // OnConnect, 41 // OnCreateStream, 42 //Publish, 43 Play, 44 } 45 46 pub struct ServerSession { 47 app_name: String, 48 stream_name: String, 49 50 io: Arc<Mutex<NetworkIO>>, 51 simple_handshaker: SimpleHandshakeServer, 52 //complex_handshaker: ComplexHandshakeServer, 53 packetizer: ChunkPacketizer, 54 unpacketizer: ChunkUnpacketizer, 55 56 state: ServerSessionState, 57 58 common: Common, 59 60 netio_data: BytesMut, 61 need_process: bool, 62 63 pub session_id: u64, 64 pub session_type: u8, 65 66 connect_command_object: Option<HashMap<String, Amf0ValueType>>, 67 } 68 69 impl ServerSession { 70 pub fn new(stream: TcpStream, event_producer: ChannelEventProducer, session_id: u64) -> Self { 71 let net_io = Arc::new(Mutex::new(NetworkIO::new(stream))); 72 73 Self { 74 app_name: String::from(""), 75 stream_name: String::from(""), 76 77 io: Arc::clone(&net_io), 78 simple_handshaker: SimpleHandshakeServer::new(Arc::clone(&net_io)), 79 //complex_handshaker: ComplexHandshakeServer::new(Arc::clone(&net_io)), 80 packetizer: ChunkPacketizer::new(Arc::clone(&net_io)), 81 unpacketizer: ChunkUnpacketizer::new(), 82 83 state: ServerSessionState::Handshake, 84 85 common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Server), 86 87 session_id: session_id, 88 netio_data: BytesMut::new(), 89 need_process: false, 90 session_type: 0, 91 connect_command_object: None, 92 } 93 } 94 95 pub async fn run(&mut self) -> Result<(), SessionError> { 96 loop { 97 match self.state { 98 ServerSessionState::Handshake => { 99 self.handshake().await?; 100 } 101 ServerSessionState::ReadChunk => { 102 self.read_parse_chunks().await?; 103 } 104 ServerSessionState::Play => { 105 self.play().await?; 106 } 107 } 108 } 109 110 //Ok(()) 111 } 112 113 async fn handshake(&mut self) -> Result<(), SessionError> { 114 self.netio_data = self.io.lock().await.read().await?; 115 self.simple_handshaker.extend_data(&self.netio_data[..]); 116 self.simple_handshaker.handshake().await?; 117 118 match self.simple_handshaker.state { 119 ServerHandshakeState::Finish => { 120 self.state = ServerSessionState::ReadChunk; 121 122 let left_bytes = self.simple_handshaker.get_remaining_bytes(); 123 if left_bytes.len() > 0 { 124 self.unpacketizer.extend_data(&left_bytes[..]); 125 self.need_process = true; 126 } 127 128 return Ok(()); 129 } 130 _ => {} 131 } 132 133 Ok(()) 134 } 135 136 async fn read_parse_chunks(&mut self) -> Result<(), SessionError> { 137 self.send_set_chunk_size().await?; 138 if !self.need_process { 139 self.netio_data = self.io.lock().await.read().await?; 140 self.unpacketizer.extend_data(&self.netio_data[..]); 141 } 142 143 self.need_process = false; 144 145 loop { 146 let result = self.unpacketizer.read_chunks(); 147 148 if let Ok(rv) = result { 149 match rv { 150 UnpackResult::Chunks(chunks) => { 151 for chunk_info in chunks.iter() { 152 let mut msg = MessageParser::new(chunk_info.clone(), self.session_type) 153 .parse()?; 154 155 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 156 let timestamp = chunk_info.message_header.timestamp; 157 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 158 .await?; 159 } 160 } 161 _ => {} 162 } 163 } else { 164 break; 165 } 166 } 167 Ok(()) 168 } 169 170 async fn play(&mut self) -> Result<(), SessionError> { 171 match self.common.send_channel_data().await { 172 Ok(_) => {} 173 174 Err(err) => { 175 self.common 176 .unsubscribe_from_channels( 177 self.app_name.clone(), 178 self.stream_name.clone(), 179 self.session_id, 180 ) 181 .await?; 182 return Err(err); 183 } 184 } 185 186 Ok(()) 187 } 188 189 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 190 let mut controlmessage = 191 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 192 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 193 194 Ok(()) 195 } 196 197 pub async fn process_messages( 198 &mut self, 199 rtmp_msg: &mut RtmpMessageData, 200 msg_stream_id: &u32, 201 timestamp: &u32, 202 ) -> Result<(), SessionError> { 203 match rtmp_msg { 204 RtmpMessageData::Amf0Command { 205 command_name, 206 transaction_id, 207 command_object, 208 others, 209 } => { 210 self.on_amf0_command_message( 211 msg_stream_id, 212 command_name, 213 transaction_id, 214 command_object, 215 others, 216 ) 217 .await? 218 } 219 RtmpMessageData::SetChunkSize { chunk_size } => { 220 self.on_set_chunk_size(chunk_size.clone() as usize)?; 221 } 222 RtmpMessageData::AudioData { data } => { 223 self.common.on_audio_data(data, timestamp)?; 224 } 225 RtmpMessageData::VideoData { data } => { 226 self.common.on_video_data(data, timestamp)?; 227 } 228 RtmpMessageData::AmfData { raw_data } => { 229 self.common.on_meta_data(raw_data, timestamp)?; 230 } 231 232 _ => {} 233 } 234 Ok(()) 235 } 236 237 pub async fn on_amf0_command_message( 238 &mut self, 239 stream_id: &u32, 240 command_name: &Amf0ValueType, 241 transaction_id: &Amf0ValueType, 242 command_object: &Amf0ValueType, 243 others: &mut Vec<Amf0ValueType>, 244 ) -> Result<(), SessionError> { 245 let empty_cmd_name = &String::new(); 246 let cmd_name = match command_name { 247 Amf0ValueType::UTF8String(str) => str, 248 _ => empty_cmd_name, 249 }; 250 251 let transaction_id = match transaction_id { 252 Amf0ValueType::Number(number) => number, 253 _ => &0.0, 254 }; 255 256 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 257 let obj = match command_object { 258 Amf0ValueType::Object(obj) => obj, 259 _ => &empty_cmd_obj, 260 }; 261 262 match cmd_name.as_str() { 263 "connect" => { 264 print!("connect ......."); 265 self.on_connect(&transaction_id, &obj).await?; 266 } 267 "createStream" => { 268 self.on_create_stream(transaction_id).await?; 269 } 270 "deleteStream" => { 271 print!("deletestream....\n"); 272 if others.len() > 0 { 273 let stream_id = match others.pop() { 274 Some(val) => match val { 275 Amf0ValueType::Number(streamid) => streamid, 276 _ => 0.0, 277 }, 278 _ => 0.0, 279 }; 280 print!("deletestream....{}\n", stream_id); 281 self.on_delete_stream(transaction_id, &stream_id).await?; 282 } 283 } 284 "play" => { 285 self.session_type = config::SERVER_PULL; 286 self.unpacketizer.session_type = config::SERVER_PULL; 287 self.on_play(transaction_id, stream_id, others).await?; 288 } 289 "publish" => { 290 self.session_type = config::SERVER_PUSH; 291 self.unpacketizer.session_type = config::SERVER_PUSH; 292 self.on_publish(transaction_id, stream_id, others).await?; 293 } 294 _ => {} 295 } 296 297 Ok(()) 298 } 299 300 fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> { 301 self.unpacketizer.update_max_chunk_size(chunk_size); 302 Ok(()) 303 } 304 305 async fn on_connect( 306 &mut self, 307 transaction_id: &f64, 308 command_obj: &HashMap<String, Amf0ValueType>, 309 ) -> Result<(), SessionError> { 310 self.connect_command_object = Some(command_obj.clone()); 311 let mut control_message = 312 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 313 control_message 314 .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 315 .await?; 316 control_message 317 .write_set_peer_bandwidth( 318 define::PEER_BANDWIDTH, 319 define::peer_bandwidth_limit_type::DYNAMIC, 320 ) 321 .await?; 322 //control_message.write_set_chunk_size(CHUNK_SIZE).await?; 323 324 let obj_encoding = command_obj.get("objectEncoding"); 325 let encoding = match obj_encoding { 326 Some(Amf0ValueType::Number(encoding)) => encoding, 327 _ => &define::OBJENCODING_AMF0, 328 }; 329 330 let app_name = command_obj.get("app"); 331 self.app_name = match app_name { 332 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 333 _ => { 334 return Err(SessionError { 335 value: SessionErrorValue::NoAppName, 336 }); 337 } 338 }; 339 340 let mut netconnection = NetConnection::new(BytesWriter::new()); 341 let data = netconnection.connect_response( 342 &transaction_id, 343 &define::FMSVER.to_string(), 344 &define::CAPABILITIES, 345 &String::from("NetConnection.Connect.Success"), 346 &define::LEVEL.to_string(), 347 &String::from("Connection Succeeded."), 348 encoding, 349 )?; 350 351 let mut chunk_info = ChunkInfo::new( 352 csid_type::COMMAND_AMF0_AMF3, 353 chunk_type::TYPE_0, 354 0, 355 data.len() as u32, 356 msg_type_id::COMMAND_AMF0, 357 0, 358 data, 359 ); 360 361 self.packetizer.write_chunk(&mut chunk_info).await?; 362 363 Ok(()) 364 } 365 366 pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 367 let mut netconnection = NetConnection::new(BytesWriter::new()); 368 let data = netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?; 369 370 let mut chunk_info = ChunkInfo::new( 371 csid_type::COMMAND_AMF0_AMF3, 372 chunk_type::TYPE_0, 373 0, 374 data.len() as u32, 375 msg_type_id::COMMAND_AMF0, 376 0, 377 data, 378 ); 379 380 self.packetizer.write_chunk(&mut chunk_info).await?; 381 382 Ok(()) 383 } 384 385 pub async fn on_delete_stream( 386 &mut self, 387 transaction_id: &f64, 388 stream_id: &f64, 389 ) -> Result<(), SessionError> { 390 self.common 391 .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) 392 .await?; 393 394 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 395 netstream 396 .on_status( 397 transaction_id, 398 &"status".to_string(), 399 &"NetStream.DeleteStream.Suceess".to_string(), 400 &"".to_string(), 401 ) 402 .await?; 403 404 print!("stream id{}", stream_id); 405 406 //self.unsubscribe_from_channels().await?; 407 408 Ok(()) 409 } 410 pub async fn on_play( 411 &mut self, 412 transaction_id: &f64, 413 stream_id: &u32, 414 other_values: &mut Vec<Amf0ValueType>, 415 ) -> Result<(), SessionError> { 416 let length = other_values.len() as u8; 417 let mut index: u8 = 0; 418 419 let mut stream_name: Option<String> = None; 420 let mut start: Option<f64> = None; 421 let mut duration: Option<f64> = None; 422 let mut reset: Option<bool> = None; 423 424 loop { 425 if index >= length { 426 break; 427 } 428 index = index + 1; 429 stream_name = match other_values.remove(0) { 430 Amf0ValueType::UTF8String(val) => Some(val), 431 _ => None, 432 }; 433 434 if index >= length { 435 break; 436 } 437 index = index + 1; 438 start = match other_values.remove(0) { 439 Amf0ValueType::Number(val) => Some(val), 440 _ => None, 441 }; 442 443 if index >= length { 444 break; 445 } 446 index = index + 1; 447 duration = match other_values.remove(0) { 448 Amf0ValueType::Number(val) => Some(val), 449 _ => None, 450 }; 451 452 if index >= length { 453 break; 454 } 455 //index = index + 1; 456 reset = match other_values.remove(0) { 457 Amf0ValueType::Boolean(val) => Some(val), 458 _ => None, 459 }; 460 break; 461 } 462 print!("start {}", start.is_some()); 463 print!("druation {}", duration.is_some()); 464 print!("reset {}", reset.is_some()); 465 466 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 467 event_messages.write_stream_begin(stream_id.clone()).await?; 468 469 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 470 netstream 471 .on_status( 472 transaction_id, 473 &"status".to_string(), 474 &"NetStream.Play.Reset".to_string(), 475 &"reset".to_string(), 476 ) 477 .await?; 478 479 netstream 480 .on_status( 481 transaction_id, 482 &"status".to_string(), 483 &"NetStream.Play.Start".to_string(), 484 &"play start".to_string(), 485 ) 486 .await?; 487 488 netstream 489 .on_status( 490 transaction_id, 491 &"status".to_string(), 492 &"NetStream.Data.Start".to_string(), 493 &"data start.".to_string(), 494 ) 495 .await?; 496 497 netstream 498 .on_status( 499 transaction_id, 500 &"status".to_string(), 501 &"NetStream.Play.PublishNotify".to_string(), 502 &"play publish notify.".to_string(), 503 ) 504 .await?; 505 506 event_messages 507 .write_stream_is_record(stream_id.clone()) 508 .await?; 509 510 self.stream_name = stream_name.clone().unwrap(); 511 self.common 512 .subscribe_from_channels(self.app_name.clone(), stream_name.unwrap(), self.session_id) 513 .await?; 514 515 self.state = ServerSessionState::Play; 516 517 Ok(()) 518 } 519 520 pub async fn on_publish( 521 &mut self, 522 transaction_id: &f64, 523 stream_id: &u32, 524 other_values: &mut Vec<Amf0ValueType>, 525 ) -> Result<(), SessionError> { 526 let length = other_values.len(); 527 528 if length < 2 { 529 return Err(SessionError { 530 value: SessionErrorValue::Amf0ValueCountNotCorrect, 531 }); 532 } 533 534 let stream_name = match other_values.remove(0) { 535 Amf0ValueType::UTF8String(val) => val, 536 _ => { 537 return Err(SessionError { 538 value: SessionErrorValue::Amf0ValueCountNotCorrect, 539 }); 540 } 541 }; 542 543 self.stream_name = stream_name; 544 545 let _ = match other_values.remove(0) { 546 Amf0ValueType::UTF8String(val) => val, 547 _ => { 548 return Err(SessionError { 549 value: SessionErrorValue::Amf0ValueCountNotCorrect, 550 }); 551 } 552 }; 553 554 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 555 event_messages.write_stream_begin(stream_id.clone()).await?; 556 557 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 558 netstream 559 .on_status( 560 transaction_id, 561 &"status".to_string(), 562 &"NetStream.Publish.Start".to_string(), 563 &"".to_string(), 564 ) 565 .await?; 566 567 self.common 568 .publish_to_channels(self.app_name.clone(), self.stream_name.clone()) 569 .await?; 570 571 Ok(()) 572 } 573 } 574