1 use { 2 super::{ 3 common::Common, 4 define, 5 define::SessionType, 6 errors::{SessionError, SessionErrorValue}, 7 }, 8 //crate::utils::print::print, 9 crate::{ 10 amf0::Amf0ValueType, 11 channels::define::ChannelEventProducer, 12 chunk::{ 13 define::CHUNK_SIZE, 14 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 15 }, 16 handshake, 17 handshake::{define::ClientHandshakeState, handshake_client::SimpleHandshakeClient}, 18 messages::{define::RtmpMessageData, parser::MessageParser}, 19 netconnection::writer::{ConnectProperties, NetConnection}, 20 netstream::writer::NetStreamWriter, 21 protocol_control_messages::writer::ProtocolControlMessagesWriter, 22 user_control_messages::writer::EventMessagesWriter, 23 utils::RtmpUrlParser, 24 }, 25 bytesio::{bytes_writer::AsyncBytesWriter, bytesio::BytesIO}, 26 indexmap::IndexMap, 27 std::sync::Arc, 28 tokio::{net::TcpStream, sync::Mutex}, 29 uuid::Uuid, 30 }; 31 32 #[allow(dead_code)] 33 enum ClientSessionState { 34 Handshake, 35 Connect, 36 CreateStream, 37 Play, 38 PublishingContent, 39 StartPublish, 40 WaitStateChange, 41 } 42 43 #[allow(dead_code)] 44 enum ClientSessionPlayState { 45 Handshake, 46 Connect, 47 CreateStream, 48 Play, 49 } 50 51 #[allow(dead_code)] 52 enum ClientSessionPublishState { 53 Handshake, 54 Connect, 55 CreateStream, 56 PublishingContent, 57 } 58 #[allow(dead_code)] 59 pub enum ClientType { 60 Play, 61 Publish, 62 } 63 pub struct ClientSession { 64 io: Arc<Mutex<BytesIO>>, 65 common: Common, 66 handshaker: SimpleHandshakeClient, 67 unpacketizer: ChunkUnpacketizer, 68 //domain name with port 69 raw_domain_name: String, 70 app_name: String, 71 //stream name with parameters 72 raw_stream_name: String, 73 stream_name: String, 74 /* Used to mark the subscriber's the data producer 75 in channels and delete it from map when unsubscribe 76 is called. */ 77 session_id: Uuid, 78 state: ClientSessionState, 79 client_type: ClientType, 80 sub_app_name: Option<String>, 81 sub_stream_name: Option<String>, 82 } 83 84 impl ClientSession { 85 pub fn new( 86 stream: TcpStream, 87 client_type: ClientType, 88 raw_domain_name: String, 89 app_name: String, 90 raw_stream_name: String, 91 event_producer: ChannelEventProducer, 92 ) -> Self { 93 let remote_addr = if let Ok(addr) = stream.peer_addr() { 94 log::info!("server session: {}", addr.to_string()); 95 Some(addr) 96 } else { 97 None 98 }; 99 100 let net_io = Arc::new(Mutex::new(BytesIO::new(stream))); 101 let subscriber_id = Uuid::new_v4(); 102 103 let common = Common::new( 104 Arc::clone(&net_io), 105 event_producer, 106 SessionType::Client, 107 remote_addr, 108 ); 109 110 let (stream_name, _) = RtmpUrlParser::default() 111 .set_raw_stream_name(raw_stream_name.clone()) 112 .parse_raw_stream_name(); 113 114 Self { 115 io: Arc::clone(&net_io), 116 common, 117 handshaker: SimpleHandshakeClient::new(Arc::clone(&net_io)), 118 unpacketizer: ChunkUnpacketizer::new(), 119 raw_domain_name, 120 app_name, 121 raw_stream_name, 122 stream_name, 123 session_id: subscriber_id, 124 state: ClientSessionState::Handshake, 125 client_type, 126 sub_app_name: None, 127 sub_stream_name: None, 128 } 129 } 130 131 pub async fn run(&mut self) -> Result<(), SessionError> { 132 loop { 133 match self.state { 134 ClientSessionState::Handshake => { 135 log::info!("[C -> S] handshake..."); 136 self.handshake().await?; 137 continue; 138 } 139 ClientSessionState::Connect => { 140 log::info!("[C -> S] connect..."); 141 self.send_connect(&(define::TRANSACTION_ID_CONNECT as f64)) 142 .await?; 143 self.state = ClientSessionState::WaitStateChange; 144 } 145 ClientSessionState::CreateStream => { 146 log::info!("[C -> S] CreateStream..."); 147 self.send_create_stream(&(define::TRANSACTION_ID_CREATE_STREAM as f64)) 148 .await?; 149 self.state = ClientSessionState::WaitStateChange; 150 } 151 ClientSessionState::Play => { 152 log::info!("[C -> S] Play..."); 153 self.send_play(&0.0, &self.raw_stream_name.clone(), &0.0, &0.0, &false) 154 .await?; 155 self.state = ClientSessionState::WaitStateChange; 156 } 157 ClientSessionState::PublishingContent => { 158 log::info!("[C -> S] PublishingContent..."); 159 self.send_publish(&3.0, &self.raw_stream_name.clone(), &"live".to_string()) 160 .await?; 161 self.state = ClientSessionState::WaitStateChange; 162 } 163 ClientSessionState::StartPublish => { 164 log::info!("[C -> S] StartPublish..."); 165 self.common.send_channel_data().await?; 166 } 167 ClientSessionState::WaitStateChange => {} 168 } 169 170 let data = self.io.lock().await.read().await?; 171 self.unpacketizer.extend_data(&data[..]); 172 173 loop { 174 match self.unpacketizer.read_chunks() { 175 Ok(rv) => { 176 if let UnpackResult::Chunks(chunks) = rv { 177 for chunk_info in chunks.iter() { 178 let mut msg = MessageParser::new(chunk_info.clone()).parse()?; 179 let timestamp = chunk_info.message_header.timestamp; 180 self.process_messages(&mut msg, ×tamp).await?; 181 } 182 } 183 } 184 Err(err) => { 185 log::trace!("read trunks error: {}", err); 186 break; 187 } 188 } 189 } 190 } 191 } 192 193 async fn handshake(&mut self) -> Result<(), SessionError> { 194 loop { 195 self.handshaker.handshake().await?; 196 if self.handshaker.state == ClientHandshakeState::Finish { 197 log::info!("handshake finish"); 198 break; 199 } 200 201 let mut bytes_len = 0; 202 while bytes_len < handshake::define::RTMP_HANDSHAKE_SIZE * 2 { 203 let data = self.io.lock().await.read().await?; 204 bytes_len += data.len(); 205 self.handshaker.extend_data(&data[..]); 206 } 207 } 208 209 self.state = ClientSessionState::Connect; 210 211 Ok(()) 212 } 213 214 pub async fn process_messages( 215 &mut self, 216 msg: &mut RtmpMessageData, 217 timestamp: &u32, 218 ) -> Result<(), SessionError> { 219 match msg { 220 RtmpMessageData::Amf0Command { 221 command_name, 222 transaction_id, 223 command_object, 224 others, 225 } => { 226 log::info!("[C <- S] on_amf0_command_message..."); 227 self.on_amf0_command_message(command_name, transaction_id, command_object, others) 228 .await? 229 } 230 RtmpMessageData::SetPeerBandwidth { .. } => { 231 log::info!("[C <- S] on_set_peer_bandwidth..."); 232 self.on_set_peer_bandwidth().await? 233 } 234 RtmpMessageData::WindowAcknowledgementSize { .. } => { 235 log::info!("[C <- S] on_windows_acknowledgement_size..."); 236 } 237 RtmpMessageData::SetChunkSize { chunk_size } => { 238 log::info!("[C <- S] on_set_chunk_size..."); 239 self.on_set_chunk_size(chunk_size)?; 240 } 241 RtmpMessageData::StreamBegin { stream_id } => { 242 log::info!("[C <- S] on_stream_begin..."); 243 self.on_stream_begin(stream_id)?; 244 } 245 RtmpMessageData::StreamIsRecorded { stream_id } => { 246 log::info!("[C <- S] on_stream_is_recorded..."); 247 self.on_stream_is_recorded(stream_id)?; 248 } 249 RtmpMessageData::AudioData { data } => self.common.on_audio_data(data, timestamp)?, 250 RtmpMessageData::VideoData { data } => self.common.on_video_data(data, timestamp)?, 251 RtmpMessageData::AmfData { raw_data } => { 252 self.common.on_meta_data(raw_data, timestamp)?; 253 } 254 255 _ => {} 256 } 257 Ok(()) 258 } 259 260 pub async fn on_amf0_command_message( 261 &mut self, 262 command_name: &Amf0ValueType, 263 transaction_id: &Amf0ValueType, 264 command_object: &Amf0ValueType, 265 others: &mut Vec<Amf0ValueType>, 266 ) -> Result<(), SessionError> { 267 log::info!("[C <- S] on_amf0_command_message..."); 268 let empty_cmd_name = &String::new(); 269 let cmd_name = match command_name { 270 Amf0ValueType::UTF8String(str) => str, 271 _ => empty_cmd_name, 272 }; 273 274 let transaction_id = match transaction_id { 275 Amf0ValueType::Number(number) => *number as u8, 276 _ => 0, 277 }; 278 279 let empty_cmd_obj: IndexMap<String, Amf0ValueType> = IndexMap::new(); 280 let _ = match command_object { 281 Amf0ValueType::Object(obj) => obj, 282 // Amf0ValueType::Null => 283 _ => &empty_cmd_obj, 284 }; 285 286 match cmd_name.as_str() { 287 "_result" => match transaction_id { 288 define::TRANSACTION_ID_CONNECT => { 289 log::info!("[C <- S] on_result_connect..."); 290 self.on_result_connect().await?; 291 } 292 define::TRANSACTION_ID_CREATE_STREAM => { 293 log::info!("[C <- S] on_result_create_stream..."); 294 self.on_result_create_stream()?; 295 } 296 _ => {} 297 }, 298 "_error" => { 299 self.on_error()?; 300 } 301 "onStatus" => { 302 match others.remove(0) { 303 Amf0ValueType::Object(obj) => self.on_status(&obj).await?, 304 _ => { 305 return Err(SessionError { 306 value: SessionErrorValue::Amf0ValueCountNotCorrect, 307 }) 308 } 309 }; 310 } 311 312 _ => {} 313 } 314 315 Ok(()) 316 } 317 318 pub async fn send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 319 self.send_set_chunk_size().await?; 320 321 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 322 let mut properties = ConnectProperties::new_none(); 323 324 let url = format!( 325 "rtmp://{domain_name}/{app_name}", 326 domain_name = self.raw_domain_name, 327 app_name = self.app_name 328 ); 329 properties.app = Some(self.app_name.clone()); 330 331 match self.client_type { 332 ClientType::Play => { 333 properties.flash_ver = Some("LNX 9,0,124,2".to_string()); 334 properties.tc_url = Some(url.clone()); 335 properties.fpad = Some(false); 336 properties.capabilities = Some(15_f64); 337 properties.audio_codecs = Some(4071_f64); 338 properties.video_codecs = Some(252_f64); 339 properties.video_function = Some(1_f64); 340 } 341 ClientType::Publish => { 342 properties.pub_type = Some("nonprivate".to_string()); 343 properties.flash_ver = Some("FMLE/3.0 (compatible; xiu)".to_string()); 344 properties.fpad = Some(false); 345 properties.tc_url = Some(url.clone()); 346 } 347 } 348 349 netconnection 350 .write_connect(transaction_id, &properties) 351 .await?; 352 353 Ok(()) 354 } 355 356 pub async fn send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 357 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 358 netconnection.write_create_stream(transaction_id).await?; 359 360 Ok(()) 361 } 362 363 pub async fn send_delete_stream( 364 &mut self, 365 transaction_id: &f64, 366 stream_id: &f64, 367 ) -> Result<(), SessionError> { 368 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 369 netstream 370 .write_delete_stream(transaction_id, stream_id) 371 .await?; 372 373 Ok(()) 374 } 375 376 pub async fn send_publish( 377 &mut self, 378 transaction_id: &f64, 379 stream_name: &String, 380 stream_type: &String, 381 ) -> Result<(), SessionError> { 382 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 383 netstream 384 .write_publish(transaction_id, stream_name, stream_type) 385 .await?; 386 387 Ok(()) 388 } 389 390 pub async fn send_play( 391 &mut self, 392 transaction_id: &f64, 393 stream_name: &String, 394 start: &f64, 395 duration: &f64, 396 reset: &bool, 397 ) -> Result<(), SessionError> { 398 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 399 netstream 400 .write_play(transaction_id, stream_name, start, duration, reset) 401 .await?; 402 403 let mut netconnection = NetConnection::new(Arc::clone(&self.io)); 404 netconnection 405 .write_get_stream_length(transaction_id, stream_name) 406 .await?; 407 408 self.send_set_buffer_length(1, 1300).await?; 409 410 Ok(()) 411 } 412 413 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 414 let mut controlmessage = 415 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 416 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 417 Ok(()) 418 } 419 420 pub async fn send_window_acknowledgement_size( 421 &mut self, 422 window_size: u32, 423 ) -> Result<(), SessionError> { 424 let mut controlmessage = 425 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 426 controlmessage 427 .write_window_acknowledgement_size(window_size) 428 .await?; 429 Ok(()) 430 } 431 432 pub async fn send_set_buffer_length( 433 &mut self, 434 stream_id: u32, 435 ms: u32, 436 ) -> Result<(), SessionError> { 437 let mut eventmessages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 438 eventmessages.write_set_buffer_length(stream_id, ms).await?; 439 440 Ok(()) 441 } 442 443 pub async fn on_result_connect(&mut self) -> Result<(), SessionError> { 444 let mut controlmessage = 445 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 446 controlmessage.write_acknowledgement(3107).await?; 447 448 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 449 netstream 450 .write_release_stream(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name) 451 .await?; 452 netstream 453 .write_fcpublish(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name) 454 .await?; 455 456 self.state = ClientSessionState::CreateStream; 457 458 Ok(()) 459 } 460 461 pub fn on_result_create_stream(&mut self) -> Result<(), SessionError> { 462 match self.client_type { 463 ClientType::Play => { 464 self.state = ClientSessionState::Play; 465 } 466 ClientType::Publish => { 467 self.state = ClientSessionState::PublishingContent; 468 } 469 } 470 Ok(()) 471 } 472 473 pub fn on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError> { 474 self.unpacketizer 475 .update_max_chunk_size(*chunk_size as usize); 476 Ok(()) 477 } 478 479 pub fn on_stream_is_recorded(&mut self, stream_id: &mut u32) -> Result<(), SessionError> { 480 log::trace!("stream is recorded stream_id is {}", stream_id); 481 Ok(()) 482 } 483 484 pub fn on_stream_begin(&mut self, stream_id: &mut u32) -> Result<(), SessionError> { 485 log::trace!("stream is begin stream_id is {}", stream_id); 486 Ok(()) 487 } 488 489 pub async fn on_set_peer_bandwidth(&mut self) -> Result<(), SessionError> { 490 self.send_window_acknowledgement_size(5000000).await?; 491 492 Ok(()) 493 } 494 495 pub fn on_error(&mut self) -> Result<(), SessionError> { 496 Ok(()) 497 } 498 499 pub async fn on_status( 500 &mut self, 501 obj: &IndexMap<String, Amf0ValueType>, 502 ) -> Result<(), SessionError> { 503 if let Some(Amf0ValueType::UTF8String(code_info)) = obj.get("code") { 504 match &code_info[..] { 505 "NetStream.Publish.Start" => { 506 self.state = ClientSessionState::StartPublish; 507 //subscribe from local session and publish to remote rtmp server 508 if let (Some(app_name), Some(stream_name)) = 509 (&self.sub_app_name, &self.sub_stream_name) 510 { 511 self.common 512 .subscribe_from_channels( 513 app_name.clone(), 514 stream_name.clone(), 515 self.session_id, 516 ) 517 .await?; 518 } else { 519 self.common 520 .subscribe_from_channels( 521 self.app_name.clone(), 522 self.stream_name.clone(), 523 self.session_id, 524 ) 525 .await?; 526 } 527 } 528 "NetStream.Publish.Reset" => {} 529 "NetStream.Play.Start" => { 530 //pull from remote rtmp server and publish to local session 531 self.common 532 .publish_to_channels( 533 self.app_name.clone(), 534 self.stream_name.clone(), 535 self.session_id, 536 ) 537 .await? 538 } 539 _ => {} 540 } 541 } 542 log::trace!("{}", obj.len()); 543 Ok(()) 544 } 545 546 pub fn subscribe(&mut self, app_name: String, stream_name: String) { 547 self.sub_app_name = Some(app_name); 548 self.sub_stream_name = Some(stream_name); 549 } 550 } 551