1 use { 2 super::{ 3 common::Common, 4 define, 5 define::SessionType, 6 errors::{SessionError, SessionErrorValue}, 7 }, 8 crate::{ 9 amf0::Amf0ValueType, 10 channels::define::ChannelEventProducer, 11 chunk::{ 12 define::{chunk_type, csid_type, CHUNK_SIZE}, 13 packetizer::ChunkPacketizer, 14 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 15 ChunkInfo, 16 }, 17 config, 18 handshake::handshake::{HandshakeServer, ServerHandshakeState}, 19 messages::{ 20 define::{msg_type_id, RtmpMessageData}, 21 parser::MessageParser, 22 }, 23 netconnection::commands::NetConnection, 24 netstream::writer::NetStreamWriter, 25 protocol_control_messages::writer::ProtocolControlMessagesWriter, 26 user_control_messages::writer::EventMessagesWriter, 27 }, 28 bytes::BytesMut, 29 networkio::{ 30 bytes_writer::{AsyncBytesWriter, BytesWriter}, 31 networkio::NetworkIO, 32 }, 33 std::{collections::HashMap, sync::Arc}, 34 tokio::{net::TcpStream, sync::Mutex}, 35 }; 36 37 enum ServerSessionState { 38 Handshake, 39 ReadChunk, 40 // OnConnect, 41 // OnCreateStream, 42 //Publish, 43 Play, 44 } 45 46 pub struct ServerSession { 47 app_name: String, 48 stream_name: String, 49 50 io: Arc<Mutex<NetworkIO>>, 51 handshaker: HandshakeServer, 52 53 packetizer: ChunkPacketizer, 54 unpacketizer: ChunkUnpacketizer, 55 56 state: ServerSessionState, 57 58 common: Common, 59 60 netio_data: BytesMut, 61 need_process: bool, 62 63 pub session_id: u64, 64 pub session_type: u8, 65 66 connect_command_object: Option<HashMap<String, Amf0ValueType>>, 67 } 68 69 impl ServerSession { 70 pub fn new(stream: TcpStream, event_producer: ChannelEventProducer, session_id: u64) -> Self { 71 let net_io = Arc::new(Mutex::new(NetworkIO::new(stream))); 72 73 Self { 74 app_name: String::from(""), 75 stream_name: String::from(""), 76 77 io: Arc::clone(&net_io), 78 handshaker: HandshakeServer::new(Arc::clone(&net_io)), 79 80 packetizer: ChunkPacketizer::new(Arc::clone(&net_io)), 81 unpacketizer: ChunkUnpacketizer::new(), 82 83 state: ServerSessionState::Handshake, 84 85 common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Server), 86 87 session_id: session_id, 88 netio_data: BytesMut::new(), 89 need_process: false, 90 session_type: 0, 91 connect_command_object: None, 92 } 93 } 94 95 pub async fn run(&mut self) -> Result<(), SessionError> { 96 loop { 97 match self.state { 98 ServerSessionState::Handshake => { 99 self.handshake().await?; 100 } 101 ServerSessionState::ReadChunk => { 102 self.read_parse_chunks().await?; 103 } 104 ServerSessionState::Play => { 105 self.play().await?; 106 } 107 } 108 } 109 110 //Ok(()) 111 } 112 113 async fn handshake(&mut self) -> Result<(), SessionError> { 114 self.netio_data = self.io.lock().await.read().await?; 115 self.handshaker.extend_data(&self.netio_data[..]); 116 self.handshaker.handshake().await?; 117 118 match self.handshaker.state() { 119 ServerHandshakeState::Finish => { 120 self.state = ServerSessionState::ReadChunk; 121 122 let left_bytes = self.handshaker.get_remaining_bytes(); 123 if left_bytes.len() > 0 { 124 self.unpacketizer.extend_data(&left_bytes[..]); 125 self.need_process = true; 126 } 127 self.send_set_chunk_size().await?; 128 return Ok(()); 129 } 130 _ => {} 131 } 132 133 Ok(()) 134 } 135 136 async fn read_parse_chunks(&mut self) -> Result<(), SessionError> { 137 if !self.need_process { 138 self.netio_data = self.io.lock().await.read().await?; 139 self.unpacketizer.extend_data(&self.netio_data[..]); 140 } 141 142 self.need_process = false; 143 144 loop { 145 let result = self.unpacketizer.read_chunks(); 146 147 if let Ok(rv) = result { 148 match rv { 149 UnpackResult::Chunks(chunks) => { 150 for chunk_info in chunks.iter() { 151 let mut msg = MessageParser::new(chunk_info.clone(), self.session_type) 152 .parse()?; 153 154 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 155 let timestamp = chunk_info.message_header.timestamp; 156 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 157 .await?; 158 } 159 } 160 _ => {} 161 } 162 } else { 163 break; 164 } 165 } 166 Ok(()) 167 } 168 169 async fn play(&mut self) -> Result<(), SessionError> { 170 match self.common.send_channel_data().await { 171 Ok(_) => {} 172 173 Err(err) => { 174 self.common 175 .unsubscribe_from_channels( 176 self.app_name.clone(), 177 self.stream_name.clone(), 178 self.session_id, 179 ) 180 .await?; 181 return Err(err); 182 } 183 } 184 185 Ok(()) 186 } 187 188 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 189 let mut controlmessage = 190 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 191 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 192 193 Ok(()) 194 } 195 196 pub async fn process_messages( 197 &mut self, 198 rtmp_msg: &mut RtmpMessageData, 199 msg_stream_id: &u32, 200 timestamp: &u32, 201 ) -> Result<(), SessionError> { 202 match rtmp_msg { 203 RtmpMessageData::Amf0Command { 204 command_name, 205 transaction_id, 206 command_object, 207 others, 208 } => { 209 self.on_amf0_command_message( 210 msg_stream_id, 211 command_name, 212 transaction_id, 213 command_object, 214 others, 215 ) 216 .await? 217 } 218 RtmpMessageData::SetChunkSize { chunk_size } => { 219 self.on_set_chunk_size(chunk_size.clone() as usize)?; 220 } 221 RtmpMessageData::AudioData { data } => { 222 self.common.on_audio_data(data, timestamp)?; 223 } 224 RtmpMessageData::VideoData { data } => { 225 self.common.on_video_data(data, timestamp)?; 226 } 227 RtmpMessageData::AmfData { raw_data } => { 228 self.common.on_meta_data(raw_data, timestamp)?; 229 } 230 231 _ => {} 232 } 233 Ok(()) 234 } 235 236 pub async fn on_amf0_command_message( 237 &mut self, 238 stream_id: &u32, 239 command_name: &Amf0ValueType, 240 transaction_id: &Amf0ValueType, 241 command_object: &Amf0ValueType, 242 others: &mut Vec<Amf0ValueType>, 243 ) -> Result<(), SessionError> { 244 let empty_cmd_name = &String::new(); 245 let cmd_name = match command_name { 246 Amf0ValueType::UTF8String(str) => str, 247 _ => empty_cmd_name, 248 }; 249 250 let transaction_id = match transaction_id { 251 Amf0ValueType::Number(number) => number, 252 _ => &0.0, 253 }; 254 255 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 256 let obj = match command_object { 257 Amf0ValueType::Object(obj) => obj, 258 _ => &empty_cmd_obj, 259 }; 260 261 match cmd_name.as_str() { 262 "connect" => { 263 print!("connect ......."); 264 self.on_connect(&transaction_id, &obj).await?; 265 } 266 "createStream" => { 267 self.on_create_stream(transaction_id).await?; 268 } 269 "deleteStream" => { 270 print!("deletestream....\n"); 271 if others.len() > 0 { 272 let stream_id = match others.pop() { 273 Some(val) => match val { 274 Amf0ValueType::Number(streamid) => streamid, 275 _ => 0.0, 276 }, 277 _ => 0.0, 278 }; 279 print!("deletestream....{}\n", stream_id); 280 self.on_delete_stream(transaction_id, &stream_id).await?; 281 } 282 } 283 "play" => { 284 self.session_type = config::SERVER_PULL; 285 self.unpacketizer.session_type = config::SERVER_PULL; 286 self.on_play(transaction_id, stream_id, others).await?; 287 } 288 "publish" => { 289 self.session_type = config::SERVER_PUSH; 290 self.unpacketizer.session_type = config::SERVER_PUSH; 291 self.on_publish(transaction_id, stream_id, others).await?; 292 } 293 _ => {} 294 } 295 296 Ok(()) 297 } 298 299 fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> { 300 self.unpacketizer.update_max_chunk_size(chunk_size); 301 Ok(()) 302 } 303 304 async fn on_connect( 305 &mut self, 306 transaction_id: &f64, 307 command_obj: &HashMap<String, Amf0ValueType>, 308 ) -> Result<(), SessionError> { 309 self.connect_command_object = Some(command_obj.clone()); 310 let mut control_message = 311 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 312 control_message 313 .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 314 .await?; 315 control_message 316 .write_set_peer_bandwidth( 317 define::PEER_BANDWIDTH, 318 define::peer_bandwidth_limit_type::DYNAMIC, 319 ) 320 .await?; 321 //control_message.write_set_chunk_size(CHUNK_SIZE).await?; 322 323 let obj_encoding = command_obj.get("objectEncoding"); 324 let encoding = match obj_encoding { 325 Some(Amf0ValueType::Number(encoding)) => encoding, 326 _ => &define::OBJENCODING_AMF0, 327 }; 328 329 let app_name = command_obj.get("app"); 330 self.app_name = match app_name { 331 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 332 _ => { 333 return Err(SessionError { 334 value: SessionErrorValue::NoAppName, 335 }); 336 } 337 }; 338 339 let mut netconnection = NetConnection::new(BytesWriter::new()); 340 let data = netconnection.connect_response( 341 &transaction_id, 342 &define::FMSVER.to_string(), 343 &define::CAPABILITIES, 344 &String::from("NetConnection.Connect.Success"), 345 &define::LEVEL.to_string(), 346 &String::from("Connection Succeeded."), 347 encoding, 348 )?; 349 350 let mut chunk_info = ChunkInfo::new( 351 csid_type::COMMAND_AMF0_AMF3, 352 chunk_type::TYPE_0, 353 0, 354 data.len() as u32, 355 msg_type_id::COMMAND_AMF0, 356 0, 357 data, 358 ); 359 360 self.packetizer.write_chunk(&mut chunk_info).await?; 361 362 Ok(()) 363 } 364 365 pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 366 let mut netconnection = NetConnection::new(BytesWriter::new()); 367 let data = netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?; 368 369 let mut chunk_info = ChunkInfo::new( 370 csid_type::COMMAND_AMF0_AMF3, 371 chunk_type::TYPE_0, 372 0, 373 data.len() as u32, 374 msg_type_id::COMMAND_AMF0, 375 0, 376 data, 377 ); 378 379 self.packetizer.write_chunk(&mut chunk_info).await?; 380 381 Ok(()) 382 } 383 384 pub async fn on_delete_stream( 385 &mut self, 386 transaction_id: &f64, 387 stream_id: &f64, 388 ) -> Result<(), SessionError> { 389 self.common 390 .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) 391 .await?; 392 393 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 394 netstream 395 .on_status( 396 transaction_id, 397 &"status".to_string(), 398 &"NetStream.DeleteStream.Suceess".to_string(), 399 &"".to_string(), 400 ) 401 .await?; 402 403 print!("stream id{}", stream_id); 404 405 //self.unsubscribe_from_channels().await?; 406 407 Ok(()) 408 } 409 pub async fn on_play( 410 &mut self, 411 transaction_id: &f64, 412 stream_id: &u32, 413 other_values: &mut Vec<Amf0ValueType>, 414 ) -> Result<(), SessionError> { 415 let length = other_values.len() as u8; 416 let mut index: u8 = 0; 417 418 let mut stream_name: Option<String> = None; 419 let mut start: Option<f64> = None; 420 let mut duration: Option<f64> = None; 421 let mut reset: Option<bool> = None; 422 423 loop { 424 if index >= length { 425 break; 426 } 427 index = index + 1; 428 stream_name = match other_values.remove(0) { 429 Amf0ValueType::UTF8String(val) => Some(val), 430 _ => None, 431 }; 432 433 if index >= length { 434 break; 435 } 436 index = index + 1; 437 start = match other_values.remove(0) { 438 Amf0ValueType::Number(val) => Some(val), 439 _ => None, 440 }; 441 442 if index >= length { 443 break; 444 } 445 index = index + 1; 446 duration = match other_values.remove(0) { 447 Amf0ValueType::Number(val) => Some(val), 448 _ => None, 449 }; 450 451 if index >= length { 452 break; 453 } 454 //index = index + 1; 455 reset = match other_values.remove(0) { 456 Amf0ValueType::Boolean(val) => Some(val), 457 _ => None, 458 }; 459 break; 460 } 461 print!("start {}", start.is_some()); 462 print!("druation {}", duration.is_some()); 463 print!("reset {}", reset.is_some()); 464 465 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 466 event_messages.write_stream_begin(stream_id.clone()).await?; 467 468 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 469 netstream 470 .on_status( 471 transaction_id, 472 &"status".to_string(), 473 &"NetStream.Play.Reset".to_string(), 474 &"reset".to_string(), 475 ) 476 .await?; 477 478 netstream 479 .on_status( 480 transaction_id, 481 &"status".to_string(), 482 &"NetStream.Play.Start".to_string(), 483 &"play start".to_string(), 484 ) 485 .await?; 486 487 netstream 488 .on_status( 489 transaction_id, 490 &"status".to_string(), 491 &"NetStream.Data.Start".to_string(), 492 &"data start.".to_string(), 493 ) 494 .await?; 495 496 netstream 497 .on_status( 498 transaction_id, 499 &"status".to_string(), 500 &"NetStream.Play.PublishNotify".to_string(), 501 &"play publish notify.".to_string(), 502 ) 503 .await?; 504 505 event_messages 506 .write_stream_is_record(stream_id.clone()) 507 .await?; 508 509 self.stream_name = stream_name.clone().unwrap(); 510 self.common 511 .subscribe_from_channels(self.app_name.clone(), stream_name.unwrap(), self.session_id) 512 .await?; 513 514 self.state = ServerSessionState::Play; 515 516 Ok(()) 517 } 518 519 pub async fn on_publish( 520 &mut self, 521 transaction_id: &f64, 522 stream_id: &u32, 523 other_values: &mut Vec<Amf0ValueType>, 524 ) -> Result<(), SessionError> { 525 let length = other_values.len(); 526 527 if length < 2 { 528 return Err(SessionError { 529 value: SessionErrorValue::Amf0ValueCountNotCorrect, 530 }); 531 } 532 533 let stream_name = match other_values.remove(0) { 534 Amf0ValueType::UTF8String(val) => val, 535 _ => { 536 return Err(SessionError { 537 value: SessionErrorValue::Amf0ValueCountNotCorrect, 538 }); 539 } 540 }; 541 542 self.stream_name = stream_name; 543 544 let _ = match other_values.remove(0) { 545 Amf0ValueType::UTF8String(val) => val, 546 _ => { 547 return Err(SessionError { 548 value: SessionErrorValue::Amf0ValueCountNotCorrect, 549 }); 550 } 551 }; 552 553 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 554 event_messages.write_stream_begin(stream_id.clone()).await?; 555 556 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 557 netstream 558 .on_status( 559 transaction_id, 560 &"status".to_string(), 561 &"NetStream.Publish.Start".to_string(), 562 &"".to_string(), 563 ) 564 .await?; 565 566 self.common 567 .publish_to_channels(self.app_name.clone(), self.stream_name.clone()) 568 .await?; 569 570 Ok(()) 571 } 572 } 573