1 use { 2 super::{ 3 common::Common, 4 define, 5 define::SessionType, 6 errors::{SessionError, SessionErrorValue}, 7 }, 8 crate::{ 9 amf0::Amf0ValueType, 10 channels::define::{ 11 ChannelData, ChannelDataConsumer, ChannelDataProducer, ChannelEvent, 12 ChannelEventProducer, 13 }, 14 chunk::{ 15 define::{chunk_type, csid_type, CHUNK_SIZE}, 16 packetizer::ChunkPacketizer, 17 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 18 ChunkInfo, 19 }, 20 config, 21 handshake::handshake::{ServerHandshakeState, SimpleHandshakeServer}, 22 messages::{ 23 define::{msg_type_id, RtmpMessageData}, 24 parser::MessageParser, 25 }, 26 netconnection::commands::NetConnection, 27 netstream::writer::NetStreamWriter, 28 protocol_control_messages::writer::ProtocolControlMessagesWriter, 29 user_control_messages::writer::EventMessagesWriter, 30 }, 31 bytes::BytesMut, 32 networkio::{ 33 bytes_writer::{AsyncBytesWriter, BytesWriter}, 34 networkio::NetworkIO, 35 }, 36 std::{collections::HashMap, sync::Arc}, 37 tokio::{ 38 net::TcpStream, 39 sync::{mpsc, oneshot, Mutex}, 40 }, 41 }; 42 43 enum ServerSessionState { 44 Handshake, 45 ReadChunk, 46 // OnConnect, 47 // OnCreateStream, 48 //Publish, 49 Play, 50 } 51 52 pub struct ServerSession { 53 app_name: String, 54 stream_name: String, 55 56 io: Arc<Mutex<NetworkIO>>, 57 simple_handshaker: SimpleHandshakeServer, 58 //complex_handshaker: ComplexHandshakeServer, 59 packetizer: ChunkPacketizer, 60 unpacketizer: ChunkUnpacketizer, 61 62 state: ServerSessionState, 63 64 common: Common, 65 66 netio_data: BytesMut, 67 need_process: bool, 68 69 pub session_id: u64, 70 pub session_type: u8, 71 72 connect_command_object: Option<HashMap<String, Amf0ValueType>>, 73 } 74 75 impl ServerSession { 76 pub fn new(stream: TcpStream, event_producer: ChannelEventProducer, session_id: u64) -> Self { 77 let net_io = Arc::new(Mutex::new(NetworkIO::new(stream))); 78 79 Self { 80 app_name: String::from(""), 81 stream_name: String::from(""), 82 83 io: Arc::clone(&net_io), 84 simple_handshaker: SimpleHandshakeServer::new(Arc::clone(&net_io)), 85 //complex_handshaker: ComplexHandshakeServer::new(Arc::clone(&net_io)), 86 packetizer: ChunkPacketizer::new(Arc::clone(&net_io)), 87 unpacketizer: ChunkUnpacketizer::new(), 88 89 state: ServerSessionState::Handshake, 90 91 common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Server), 92 93 session_id: session_id, 94 netio_data: BytesMut::new(), 95 need_process: false, 96 session_type: 0, 97 connect_command_object: None, 98 } 99 } 100 101 pub async fn run(&mut self) -> Result<(), SessionError> { 102 loop { 103 match self.state { 104 ServerSessionState::Handshake => { 105 self.handshake().await?; 106 } 107 ServerSessionState::ReadChunk => { 108 self.read_parse_chunks().await?; 109 } 110 ServerSessionState::Play => { 111 self.play().await?; 112 } 113 } 114 } 115 116 //Ok(()) 117 } 118 119 async fn handshake(&mut self) -> Result<(), SessionError> { 120 self.netio_data = self.io.lock().await.read().await?; 121 self.simple_handshaker.extend_data(&self.netio_data[..]); 122 self.simple_handshaker.handshake().await?; 123 124 match self.simple_handshaker.state { 125 ServerHandshakeState::Finish => { 126 self.state = ServerSessionState::ReadChunk; 127 128 let left_bytes = self.simple_handshaker.get_remaining_bytes(); 129 if left_bytes.len() > 0 { 130 self.unpacketizer.extend_data(&left_bytes[..]); 131 self.need_process = true; 132 } 133 134 return Ok(()); 135 } 136 _ => {} 137 } 138 139 Ok(()) 140 } 141 142 async fn read_parse_chunks(&mut self) -> Result<(), SessionError> { 143 self.send_set_chunk_size().await?; 144 if !self.need_process { 145 self.netio_data = self.io.lock().await.read().await?; 146 self.unpacketizer.extend_data(&self.netio_data[..]); 147 } 148 149 self.need_process = false; 150 151 loop { 152 let result = self.unpacketizer.read_chunks(); 153 154 if let Ok(rv) = result { 155 match rv { 156 UnpackResult::Chunks(chunks) => { 157 for chunk_info in chunks.iter() { 158 let mut msg = MessageParser::new(chunk_info.clone(), self.session_type) 159 .parse()?; 160 161 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 162 let timestamp = chunk_info.message_header.timestamp; 163 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 164 .await?; 165 } 166 } 167 _ => {} 168 } 169 } else { 170 break; 171 } 172 } 173 Ok(()) 174 } 175 176 async fn play(&mut self) -> Result<(), SessionError> { 177 match self.common.send_channel_data().await { 178 Ok(_) => {} 179 180 Err(err) => { 181 self.common 182 .unsubscribe_from_channels( 183 self.app_name.clone(), 184 self.stream_name.clone(), 185 self.session_id, 186 ) 187 .await?; 188 return Err(err); 189 } 190 } 191 192 Ok(()) 193 } 194 195 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 196 let mut controlmessage = 197 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 198 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 199 200 Ok(()) 201 } 202 203 pub async fn process_messages( 204 &mut self, 205 rtmp_msg: &mut RtmpMessageData, 206 msg_stream_id: &u32, 207 timestamp: &u32, 208 ) -> Result<(), SessionError> { 209 match rtmp_msg { 210 RtmpMessageData::Amf0Command { 211 command_name, 212 transaction_id, 213 command_object, 214 others, 215 } => { 216 self.on_amf0_command_message( 217 msg_stream_id, 218 command_name, 219 transaction_id, 220 command_object, 221 others, 222 ) 223 .await? 224 } 225 RtmpMessageData::SetChunkSize { chunk_size } => { 226 self.on_set_chunk_size(chunk_size.clone() as usize)?; 227 } 228 RtmpMessageData::AudioData { data } => { 229 self.common.on_audio_data(data, timestamp)?; 230 } 231 RtmpMessageData::VideoData { data } => { 232 self.common.on_video_data(data, timestamp)?; 233 } 234 RtmpMessageData::AmfData { raw_data } => { 235 self.common.on_amf_data(raw_data)?; 236 } 237 238 _ => {} 239 } 240 Ok(()) 241 } 242 243 pub async fn on_amf0_command_message( 244 &mut self, 245 stream_id: &u32, 246 command_name: &Amf0ValueType, 247 transaction_id: &Amf0ValueType, 248 command_object: &Amf0ValueType, 249 others: &mut Vec<Amf0ValueType>, 250 ) -> Result<(), SessionError> { 251 let empty_cmd_name = &String::new(); 252 let cmd_name = match command_name { 253 Amf0ValueType::UTF8String(str) => str, 254 _ => empty_cmd_name, 255 }; 256 257 let transaction_id = match transaction_id { 258 Amf0ValueType::Number(number) => number, 259 _ => &0.0, 260 }; 261 262 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 263 let obj = match command_object { 264 Amf0ValueType::Object(obj) => obj, 265 _ => &empty_cmd_obj, 266 }; 267 268 match cmd_name.as_str() { 269 "connect" => { 270 print!("connect ......."); 271 self.on_connect(&transaction_id, &obj).await?; 272 } 273 "createStream" => { 274 self.on_create_stream(transaction_id).await?; 275 } 276 "deleteStream" => { 277 print!("deletestream....\n"); 278 if others.len() > 0 { 279 let stream_id = match others.pop() { 280 Some(val) => match val { 281 Amf0ValueType::Number(streamid) => streamid, 282 _ => 0.0, 283 }, 284 _ => 0.0, 285 }; 286 print!("deletestream....{}\n", stream_id); 287 self.on_delete_stream(transaction_id, &stream_id).await?; 288 } 289 } 290 "play" => { 291 self.session_type = config::SERVER_PULL; 292 self.unpacketizer.session_type = config::SERVER_PULL; 293 self.on_play(transaction_id, stream_id, others).await?; 294 } 295 "publish" => { 296 self.session_type = config::SERVER_PUSH; 297 self.unpacketizer.session_type = config::SERVER_PUSH; 298 self.on_publish(transaction_id, stream_id, others).await?; 299 } 300 _ => {} 301 } 302 303 Ok(()) 304 } 305 306 fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> { 307 self.unpacketizer.update_max_chunk_size(chunk_size); 308 Ok(()) 309 } 310 311 async fn on_connect( 312 &mut self, 313 transaction_id: &f64, 314 command_obj: &HashMap<String, Amf0ValueType>, 315 ) -> Result<(), SessionError> { 316 self.connect_command_object = Some(command_obj.clone()); 317 let mut control_message = 318 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 319 control_message 320 .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 321 .await?; 322 control_message 323 .write_set_peer_bandwidth( 324 define::PEER_BANDWIDTH, 325 define::peer_bandwidth_limit_type::DYNAMIC, 326 ) 327 .await?; 328 //control_message.write_set_chunk_size(CHUNK_SIZE).await?; 329 330 let obj_encoding = command_obj.get("objectEncoding"); 331 let encoding = match obj_encoding { 332 Some(Amf0ValueType::Number(encoding)) => encoding, 333 _ => &define::OBJENCODING_AMF0, 334 }; 335 336 let app_name = command_obj.get("app"); 337 self.app_name = match app_name { 338 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 339 _ => { 340 return Err(SessionError { 341 value: SessionErrorValue::NoAppName, 342 }); 343 } 344 }; 345 346 let mut netconnection = NetConnection::new(BytesWriter::new()); 347 let data = netconnection.connect_response( 348 &transaction_id, 349 &define::FMSVER.to_string(), 350 &define::CAPABILITIES, 351 &String::from("NetConnection.Connect.Success"), 352 &define::LEVEL.to_string(), 353 &String::from("Connection Succeeded."), 354 encoding, 355 )?; 356 357 let mut chunk_info = ChunkInfo::new( 358 csid_type::COMMAND_AMF0_AMF3, 359 chunk_type::TYPE_0, 360 0, 361 data.len() as u32, 362 msg_type_id::COMMAND_AMF0, 363 0, 364 data, 365 ); 366 367 self.packetizer.write_chunk(&mut chunk_info).await?; 368 369 Ok(()) 370 } 371 372 pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 373 let mut netconnection = NetConnection::new(BytesWriter::new()); 374 let data = netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?; 375 376 let mut chunk_info = ChunkInfo::new( 377 csid_type::COMMAND_AMF0_AMF3, 378 chunk_type::TYPE_0, 379 0, 380 data.len() as u32, 381 msg_type_id::COMMAND_AMF0, 382 0, 383 data, 384 ); 385 386 self.packetizer.write_chunk(&mut chunk_info).await?; 387 388 Ok(()) 389 } 390 391 pub async fn on_delete_stream( 392 &mut self, 393 transaction_id: &f64, 394 stream_id: &f64, 395 ) -> Result<(), SessionError> { 396 self.common 397 .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) 398 .await?; 399 400 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 401 netstream 402 .on_status( 403 transaction_id, 404 &"status".to_string(), 405 &"NetStream.DeleteStream.Suceess".to_string(), 406 &"".to_string(), 407 ) 408 .await?; 409 410 print!("stream id{}", stream_id); 411 412 //self.unsubscribe_from_channels().await?; 413 414 Ok(()) 415 } 416 pub async fn on_play( 417 &mut self, 418 transaction_id: &f64, 419 stream_id: &u32, 420 other_values: &mut Vec<Amf0ValueType>, 421 ) -> Result<(), SessionError> { 422 let length = other_values.len() as u8; 423 let mut index: u8 = 0; 424 425 let mut stream_name: Option<String> = None; 426 let mut start: Option<f64> = None; 427 let mut duration: Option<f64> = None; 428 let mut reset: Option<bool> = None; 429 430 loop { 431 if index >= length { 432 break; 433 } 434 index = index + 1; 435 stream_name = match other_values.remove(0) { 436 Amf0ValueType::UTF8String(val) => Some(val), 437 _ => None, 438 }; 439 440 if index >= length { 441 break; 442 } 443 index = index + 1; 444 start = match other_values.remove(0) { 445 Amf0ValueType::Number(val) => Some(val), 446 _ => None, 447 }; 448 449 if index >= length { 450 break; 451 } 452 index = index + 1; 453 duration = match other_values.remove(0) { 454 Amf0ValueType::Number(val) => Some(val), 455 _ => None, 456 }; 457 458 if index >= length { 459 break; 460 } 461 //index = index + 1; 462 reset = match other_values.remove(0) { 463 Amf0ValueType::Boolean(val) => Some(val), 464 _ => None, 465 }; 466 break; 467 } 468 print!("start {}", start.is_some()); 469 print!("druation {}", duration.is_some()); 470 print!("reset {}", reset.is_some()); 471 472 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 473 event_messages.write_stream_begin(stream_id.clone()).await?; 474 475 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 476 netstream 477 .on_status( 478 transaction_id, 479 &"status".to_string(), 480 &"NetStream.Play.Reset".to_string(), 481 &"reset".to_string(), 482 ) 483 .await?; 484 485 netstream 486 .on_status( 487 transaction_id, 488 &"status".to_string(), 489 &"NetStream.Play.Start".to_string(), 490 &"play start".to_string(), 491 ) 492 .await?; 493 494 netstream 495 .on_status( 496 transaction_id, 497 &"status".to_string(), 498 &"NetStream.Data.Start".to_string(), 499 &"data start.".to_string(), 500 ) 501 .await?; 502 503 netstream 504 .on_status( 505 transaction_id, 506 &"status".to_string(), 507 &"NetStream.Play.PublishNotify".to_string(), 508 &"play publish notify.".to_string(), 509 ) 510 .await?; 511 512 event_messages 513 .write_stream_is_record(stream_id.clone()) 514 .await?; 515 516 self.stream_name = stream_name.clone().unwrap(); 517 self.common 518 .subscribe_from_channels(self.app_name.clone(), stream_name.unwrap(), self.session_id) 519 .await?; 520 521 self.state = ServerSessionState::Play; 522 523 Ok(()) 524 } 525 526 pub async fn on_publish( 527 &mut self, 528 transaction_id: &f64, 529 stream_id: &u32, 530 other_values: &mut Vec<Amf0ValueType>, 531 ) -> Result<(), SessionError> { 532 let length = other_values.len(); 533 534 if length < 2 { 535 return Err(SessionError { 536 value: SessionErrorValue::Amf0ValueCountNotCorrect, 537 }); 538 } 539 540 let stream_name = match other_values.remove(0) { 541 Amf0ValueType::UTF8String(val) => val, 542 _ => { 543 return Err(SessionError { 544 value: SessionErrorValue::Amf0ValueCountNotCorrect, 545 }); 546 } 547 }; 548 549 self.stream_name = stream_name; 550 551 let _ = match other_values.remove(0) { 552 Amf0ValueType::UTF8String(val) => val, 553 _ => { 554 return Err(SessionError { 555 value: SessionErrorValue::Amf0ValueCountNotCorrect, 556 }); 557 } 558 }; 559 560 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 561 event_messages.write_stream_begin(stream_id.clone()).await?; 562 563 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 564 netstream 565 .on_status( 566 transaction_id, 567 &"status".to_string(), 568 &"NetStream.Publish.Start".to_string(), 569 &"".to_string(), 570 ) 571 .await?; 572 573 //print!("before publish_to_channels\n"); 574 self.common 575 .publish_to_channels( 576 self.app_name.clone(), 577 self.stream_name.clone(), 578 self.connect_command_object.clone().unwrap(), 579 ) 580 .await?; 581 //print!("after publish_to_channels\n"); 582 583 Ok(()) 584 } 585 } 586