1 use { 2 super::{ 3 common::Common, 4 define, 5 errors::{SessionError, SessionErrorValue}, 6 }, 7 crate::{ 8 amf0::Amf0ValueType, 9 channels::define::{ 10 ChannelData, ChannelDataConsumer, ChannelDataProducer, ChannelEvent, 11 ChannelEventProducer, 12 }, 13 chunk::{ 14 define::{chunk_type, csid_type, CHUNK_SIZE}, 15 packetizer::ChunkPacketizer, 16 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 17 ChunkInfo, 18 }, 19 config, 20 handshake::handshake::{ServerHandshakeState, SimpleHandshakeServer}, 21 messages::{ 22 define::{msg_type_id, RtmpMessageData}, 23 parser::MessageParser, 24 }, 25 netconnection::commands::NetConnection, 26 netstream::writer::NetStreamWriter, 27 protocol_control_messages::writer::ProtocolControlMessagesWriter, 28 user_control_messages::writer::EventMessagesWriter, 29 }, 30 bytes::BytesMut, 31 networkio::{ 32 bytes_writer::{AsyncBytesWriter, BytesWriter}, 33 networkio::NetworkIO, 34 }, 35 std::{collections::HashMap, sync::Arc}, 36 tokio::{ 37 net::TcpStream, 38 sync::{mpsc, oneshot, Mutex}, 39 }, 40 }; 41 42 enum ServerSessionState { 43 Handshake, 44 ReadChunk, 45 // OnConnect, 46 // OnCreateStream, 47 //Publish, 48 Play, 49 } 50 51 pub struct ServerSession { 52 app_name: String, 53 stream_name: String, 54 55 io: Arc<Mutex<NetworkIO>>, 56 simple_handshaker: SimpleHandshakeServer, 57 //complex_handshaker: ComplexHandshakeServer, 58 packetizer: ChunkPacketizer, 59 unpacketizer: ChunkUnpacketizer, 60 61 state: ServerSessionState, 62 63 common: Common, 64 65 netio_data: BytesMut, 66 need_process: bool, 67 68 pub session_id: u64, 69 pub session_type: u8, 70 } 71 72 impl ServerSession { 73 pub fn new(stream: TcpStream, event_producer: ChannelEventProducer, session_id: u64) -> Self { 74 let net_io = Arc::new(Mutex::new(NetworkIO::new(stream))); 75 76 Self { 77 app_name: String::from(""), 78 stream_name: String::from(""), 79 80 io: Arc::clone(&net_io), 81 simple_handshaker: SimpleHandshakeServer::new(Arc::clone(&net_io)), 82 //complex_handshaker: ComplexHandshakeServer::new(Arc::clone(&net_io)), 83 packetizer: ChunkPacketizer::new(Arc::clone(&net_io)), 84 unpacketizer: ChunkUnpacketizer::new(), 85 86 state: ServerSessionState::Handshake, 87 88 common: Common::new(Arc::clone(&net_io), event_producer), 89 90 session_id: session_id, 91 netio_data: BytesMut::new(), 92 need_process: false, 93 session_type: 0, 94 } 95 } 96 97 pub async fn run(&mut self) -> Result<(), SessionError> { 98 loop { 99 match self.state { 100 ServerSessionState::Handshake => { 101 self.handshake().await?; 102 } 103 ServerSessionState::ReadChunk => { 104 self.read_parse_chunks().await?; 105 } 106 ServerSessionState::Play => { 107 self.play().await?; 108 } 109 } 110 } 111 112 //Ok(()) 113 } 114 115 async fn handshake(&mut self) -> Result<(), SessionError> { 116 self.netio_data = self.io.lock().await.read().await?; 117 self.simple_handshaker.extend_data(&self.netio_data[..]); 118 self.simple_handshaker.handshake().await?; 119 120 match self.simple_handshaker.state { 121 ServerHandshakeState::Finish => { 122 self.state = ServerSessionState::ReadChunk; 123 124 let left_bytes = self.simple_handshaker.get_remaining_bytes(); 125 if left_bytes.len() > 0 { 126 self.unpacketizer.extend_data(&left_bytes[..]); 127 self.need_process = true; 128 } 129 130 return Ok(()); 131 } 132 _ => {} 133 } 134 135 Ok(()) 136 } 137 138 async fn read_parse_chunks(&mut self) -> Result<(), SessionError> { 139 if !self.need_process { 140 self.netio_data = self.io.lock().await.read().await?; 141 self.unpacketizer.extend_data(&self.netio_data[..]); 142 } 143 144 self.need_process = false; 145 146 loop { 147 let result = self.unpacketizer.read_chunks(); 148 149 if let Ok(rv) = result { 150 match rv { 151 UnpackResult::Chunks(chunks) => { 152 for chunk_info in chunks.iter() { 153 let mut msg = MessageParser::new(chunk_info.clone(), self.session_type) 154 .parse()?; 155 156 let msg_stream_id = chunk_info.message_header.msg_streamd_id; 157 let timestamp = chunk_info.message_header.timestamp; 158 self.process_messages(&mut msg, &msg_stream_id, ×tamp) 159 .await?; 160 } 161 } 162 _ => {} 163 } 164 } else { 165 break; 166 } 167 } 168 Ok(()) 169 } 170 171 async fn play(&mut self) -> Result<(), SessionError> { 172 match self.common.send_channel_data().await { 173 Ok(_) => {} 174 175 Err(err) => { 176 self.common 177 .unsubscribe_from_channels( 178 self.app_name.clone(), 179 self.stream_name.clone(), 180 self.session_id, 181 ) 182 .await?; 183 return Err(err); 184 } 185 } 186 187 Ok(()) 188 } 189 190 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 191 let mut controlmessage = 192 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 193 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 194 195 Ok(()) 196 } 197 198 pub async fn process_messages( 199 &mut self, 200 rtmp_msg: &mut RtmpMessageData, 201 msg_stream_id: &u32, 202 timestamp: &u32, 203 ) -> Result<(), SessionError> { 204 match rtmp_msg { 205 RtmpMessageData::Amf0Command { 206 command_name, 207 transaction_id, 208 command_object, 209 others, 210 } => { 211 self.on_amf0_command_message( 212 msg_stream_id, 213 command_name, 214 transaction_id, 215 command_object, 216 others, 217 ) 218 .await? 219 } 220 RtmpMessageData::SetChunkSize { chunk_size } => { 221 self.on_set_chunk_size(chunk_size.clone() as usize)?; 222 } 223 RtmpMessageData::AudioData { data } => { 224 self.common.on_audio_data(data, timestamp)?; 225 } 226 RtmpMessageData::VideoData { data } => { 227 self.common.on_video_data(data, timestamp)?; 228 } 229 RtmpMessageData::AmfData { raw_data } => { 230 self.common.on_amf_data(raw_data)?; 231 } 232 233 _ => {} 234 } 235 Ok(()) 236 } 237 238 pub async fn on_amf0_command_message( 239 &mut self, 240 stream_id: &u32, 241 command_name: &Amf0ValueType, 242 transaction_id: &Amf0ValueType, 243 command_object: &Amf0ValueType, 244 others: &mut Vec<Amf0ValueType>, 245 ) -> Result<(), SessionError> { 246 let empty_cmd_name = &String::new(); 247 let cmd_name = match command_name { 248 Amf0ValueType::UTF8String(str) => str, 249 _ => empty_cmd_name, 250 }; 251 252 let transaction_id = match transaction_id { 253 Amf0ValueType::Number(number) => number, 254 _ => &0.0, 255 }; 256 257 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 258 let obj = match command_object { 259 Amf0ValueType::Object(obj) => obj, 260 _ => &empty_cmd_obj, 261 }; 262 263 match cmd_name.as_str() { 264 "connect" => { 265 print!("connect ......."); 266 self.on_connect(&transaction_id, &obj).await?; 267 } 268 "createStream" => { 269 self.on_create_stream(transaction_id).await?; 270 } 271 "deleteStream" => { 272 print!("deletestream....\n"); 273 if others.len() > 0 { 274 let stream_id = match others.pop() { 275 Some(val) => match val { 276 Amf0ValueType::Number(streamid) => streamid, 277 _ => 0.0, 278 }, 279 _ => 0.0, 280 }; 281 282 print!("deletestream....{}\n", stream_id); 283 284 self.on_delete_stream(transaction_id, &stream_id).await?; 285 } 286 } 287 "play" => { 288 self.session_type = config::SERVER_PULL; 289 self.unpacketizer.session_type = config::SERVER_PULL; 290 self.on_play(transaction_id, stream_id, others).await?; 291 } 292 "publish" => { 293 self.session_type = config::SERVER_PUSH; 294 self.unpacketizer.session_type = config::SERVER_PUSH; 295 self.on_publish(transaction_id, stream_id, others).await?; 296 } 297 _ => {} 298 } 299 300 Ok(()) 301 } 302 303 fn on_set_chunk_size(&mut self, chunk_size: usize) -> Result<(), SessionError> { 304 self.unpacketizer.update_max_chunk_size(chunk_size); 305 Ok(()) 306 } 307 308 async fn on_connect( 309 &mut self, 310 transaction_id: &f64, 311 command_obj: &HashMap<String, Amf0ValueType>, 312 ) -> Result<(), SessionError> { 313 let mut control_message = 314 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 315 control_message 316 .write_window_acknowledgement_size(define::WINDOW_ACKNOWLEDGEMENT_SIZE) 317 .await?; 318 control_message 319 .write_set_peer_bandwidth( 320 define::PEER_BANDWIDTH, 321 define::peer_bandwidth_limit_type::DYNAMIC, 322 ) 323 .await?; 324 control_message.write_set_chunk_size(CHUNK_SIZE).await?; 325 326 let obj_encoding = command_obj.get("objectEncoding"); 327 let encoding = match obj_encoding { 328 Some(Amf0ValueType::Number(encoding)) => encoding, 329 _ => &define::OBJENCODING_AMF0, 330 }; 331 332 let app_name = command_obj.get("app"); 333 self.app_name = match app_name { 334 Some(Amf0ValueType::UTF8String(app)) => app.clone(), 335 _ => { 336 return Err(SessionError { 337 value: SessionErrorValue::NoAppName, 338 }); 339 } 340 }; 341 342 let mut netconnection = NetConnection::new(BytesWriter::new()); 343 let data = netconnection.connect_response( 344 &transaction_id, 345 &define::FMSVER.to_string(), 346 &define::CAPABILITIES, 347 &String::from("NetConnection.Connect.Success"), 348 &define::LEVEL.to_string(), 349 &String::from("Connection Succeeded."), 350 encoding, 351 )?; 352 353 let mut chunk_info = ChunkInfo::new( 354 csid_type::COMMAND_AMF0_AMF3, 355 chunk_type::TYPE_0, 356 0, 357 data.len() as u32, 358 msg_type_id::COMMAND_AMF0, 359 0, 360 data, 361 ); 362 363 self.packetizer.write_chunk(&mut chunk_info).await?; 364 365 Ok(()) 366 } 367 368 pub async fn on_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 369 let mut netconnection = NetConnection::new(BytesWriter::new()); 370 let data = netconnection.create_stream_response(transaction_id, &define::STREAM_ID)?; 371 372 let mut chunk_info = ChunkInfo::new( 373 csid_type::COMMAND_AMF0_AMF3, 374 chunk_type::TYPE_0, 375 0, 376 data.len() as u32, 377 msg_type_id::COMMAND_AMF0, 378 0, 379 data, 380 ); 381 382 self.packetizer.write_chunk(&mut chunk_info).await?; 383 384 Ok(()) 385 } 386 387 pub async fn on_delete_stream( 388 &mut self, 389 transaction_id: &f64, 390 stream_id: &f64, 391 ) -> Result<(), SessionError> { 392 self.common 393 .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) 394 .await?; 395 396 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 397 netstream 398 .on_status( 399 transaction_id, 400 &"status".to_string(), 401 &"NetStream.DeleteStream.Suceess".to_string(), 402 &"".to_string(), 403 ) 404 .await?; 405 406 print!("stream id{}", stream_id); 407 408 //self.unsubscribe_from_channels().await?; 409 410 Ok(()) 411 } 412 pub async fn on_play( 413 &mut self, 414 transaction_id: &f64, 415 stream_id: &u32, 416 other_values: &mut Vec<Amf0ValueType>, 417 ) -> Result<(), SessionError> { 418 let length = other_values.len() as u8; 419 let mut index: u8 = 0; 420 421 let mut stream_name: Option<String> = None; 422 let mut start: Option<f64> = None; 423 let mut duration: Option<f64> = None; 424 let mut reset: Option<bool> = None; 425 426 loop { 427 if index >= length { 428 break; 429 } 430 index = index + 1; 431 stream_name = match other_values.remove(0) { 432 Amf0ValueType::UTF8String(val) => Some(val), 433 _ => None, 434 }; 435 436 if index >= length { 437 break; 438 } 439 index = index + 1; 440 start = match other_values.remove(0) { 441 Amf0ValueType::Number(val) => Some(val), 442 _ => None, 443 }; 444 445 if index >= length { 446 break; 447 } 448 index = index + 1; 449 duration = match other_values.remove(0) { 450 Amf0ValueType::Number(val) => Some(val), 451 _ => None, 452 }; 453 454 if index >= length { 455 break; 456 } 457 //index = index + 1; 458 reset = match other_values.remove(0) { 459 Amf0ValueType::Boolean(val) => Some(val), 460 _ => None, 461 }; 462 break; 463 } 464 print!("start {}", start.is_some()); 465 print!("druation {}", duration.is_some()); 466 print!("reset {}", reset.is_some()); 467 468 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 469 event_messages.write_stream_begin(stream_id.clone()).await?; 470 471 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 472 netstream 473 .on_status( 474 transaction_id, 475 &"status".to_string(), 476 &"NetStream.Play.Reset".to_string(), 477 &"reset".to_string(), 478 ) 479 .await?; 480 481 netstream 482 .on_status( 483 transaction_id, 484 &"status".to_string(), 485 &"NetStream.Play.Start".to_string(), 486 &"play start".to_string(), 487 ) 488 .await?; 489 490 netstream 491 .on_status( 492 transaction_id, 493 &"status".to_string(), 494 &"NetStream.Data.Start".to_string(), 495 &"data start.".to_string(), 496 ) 497 .await?; 498 499 netstream 500 .on_status( 501 transaction_id, 502 &"status".to_string(), 503 &"NetStream.Play.PublishNotify".to_string(), 504 &"play publish notify.".to_string(), 505 ) 506 .await?; 507 508 event_messages 509 .write_stream_is_record(stream_id.clone()) 510 .await?; 511 512 self.common 513 .subscribe_from_channels(self.app_name.clone(), stream_name.unwrap(), self.session_id) 514 .await?; 515 self.state = ServerSessionState::Play; 516 517 Ok(()) 518 } 519 520 pub async fn on_publish( 521 &mut self, 522 transaction_id: &f64, 523 stream_id: &u32, 524 other_values: &mut Vec<Amf0ValueType>, 525 ) -> Result<(), SessionError> { 526 let length = other_values.len(); 527 528 if length < 2 { 529 return Err(SessionError { 530 value: SessionErrorValue::Amf0ValueCountNotCorrect, 531 }); 532 } 533 534 let stream_name = match other_values.remove(0) { 535 Amf0ValueType::UTF8String(val) => val, 536 _ => { 537 return Err(SessionError { 538 value: SessionErrorValue::Amf0ValueCountNotCorrect, 539 }); 540 } 541 }; 542 543 self.stream_name = stream_name; 544 545 let _ = match other_values.remove(0) { 546 Amf0ValueType::UTF8String(val) => val, 547 _ => { 548 return Err(SessionError { 549 value: SessionErrorValue::Amf0ValueCountNotCorrect, 550 }); 551 } 552 }; 553 554 let mut event_messages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 555 event_messages.write_stream_begin(stream_id.clone()).await?; 556 557 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 558 netstream 559 .on_status( 560 transaction_id, 561 &"status".to_string(), 562 &"NetStream.Publish.Start".to_string(), 563 &"".to_string(), 564 ) 565 .await?; 566 567 print!("before publish_to_channels\n"); 568 self.common 569 .publish_to_channels(self.app_name.clone(), self.stream_name.clone()) 570 .await?; 571 print!("after publish_to_channels\n"); 572 573 Ok(()) 574 } 575 } 576