1 use uuid::Uuid; 2 3 use { 4 super::{ 5 common::Common, 6 define, 7 define::SessionType, 8 errors::{SessionError, SessionErrorValue}, 9 }, 10 crate::utils::print::print, 11 crate::{ 12 amf0::Amf0ValueType, 13 channels::define::ChannelEventProducer, 14 chunk::{ 15 define::{chunk_type, csid_type, CHUNK_SIZE}, 16 packetizer::ChunkPacketizer, 17 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 18 ChunkInfo, 19 }, 20 handshake::handshake::{ClientHandshakeState, SimpleHandshakeClient}, 21 messages::{ 22 define::{msg_type_id, RtmpMessageData}, 23 parser::MessageParser, 24 }, 25 netconnection::commands::{ConnectProperties, NetConnection}, 26 netstream::writer::NetStreamWriter, 27 protocol_control_messages::writer::ProtocolControlMessagesWriter, 28 user_control_messages::writer::EventMessagesWriter, 29 }, 30 bytesio::{ 31 bytes_writer::{AsyncBytesWriter, BytesWriter}, 32 bytesio::BytesIO, 33 }, 34 std::{collections::HashMap, sync::Arc}, 35 tokio::{net::TcpStream, sync::Mutex}, 36 }; 37 38 #[allow(dead_code)] 39 enum ClientSessionState { 40 Handshake, 41 Connect, 42 CreateStream, 43 Play, 44 PublishingContent, 45 StartPublish, 46 WaitStateChange, 47 } 48 49 #[allow(dead_code)] 50 enum ClientSessionPlayState { 51 Handshake, 52 Connect, 53 CreateStream, 54 Play, 55 } 56 57 #[allow(dead_code)] 58 enum ClientSessionPublishState { 59 Handshake, 60 Connect, 61 CreateStream, 62 PublishingContent, 63 } 64 #[allow(dead_code)] 65 pub enum ClientType { 66 Play, 67 Publish, 68 } 69 pub struct ClientSession { 70 io: Arc<Mutex<BytesIO>>, 71 common: Common, 72 73 handshaker: SimpleHandshakeClient, 74 75 packetizer: ChunkPacketizer, 76 unpacketizer: ChunkUnpacketizer, 77 78 app_name: String, 79 stream_name: String, 80 81 /* Used to mark the subscriber's the data producer 82 in channels and delete it from map when unsubscribe 83 is called. */ 84 subscriber_id: Uuid, 85 86 state: ClientSessionState, 87 client_type: ClientType, 88 } 89 90 impl ClientSession { 91 #[allow(dead_code)] 92 pub fn new( 93 stream: TcpStream, 94 client_type: ClientType, 95 app_name: String, 96 stream_name: String, 97 event_producer: ChannelEventProducer, 98 ) -> Self { 99 let net_io = Arc::new(Mutex::new(BytesIO::new(stream))); 100 let subscriber_id = Uuid::new_v4(); 101 102 Self { 103 io: Arc::clone(&net_io), 104 common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Client), 105 106 handshaker: SimpleHandshakeClient::new(Arc::clone(&net_io)), 107 108 packetizer: ChunkPacketizer::new(Arc::clone(&net_io)), 109 unpacketizer: ChunkUnpacketizer::new(), 110 111 app_name, 112 stream_name, 113 client_type, 114 115 state: ClientSessionState::Handshake, 116 subscriber_id, 117 } 118 } 119 120 pub async fn run(&mut self) -> Result<(), SessionError> { 121 loop { 122 match self.state { 123 ClientSessionState::Handshake => { 124 log::info!("[C -> S] handshake..."); 125 self.handshake().await?; 126 continue; 127 } 128 ClientSessionState::Connect => { 129 log::info!("[C -> S] connect..."); 130 self.send_connect(&(define::TRANSACTION_ID_CONNECT as f64)) 131 .await?; 132 self.state = ClientSessionState::WaitStateChange; 133 } 134 ClientSessionState::CreateStream => { 135 log::info!("[C -> S] CreateStream..."); 136 self.send_create_stream(&(define::TRANSACTION_ID_CREATE_STREAM as f64)) 137 .await?; 138 self.state = ClientSessionState::WaitStateChange; 139 } 140 ClientSessionState::Play => { 141 log::info!("[C -> S] Play..."); 142 self.send_play(&0.0, &self.stream_name.clone(), &0.0, &0.0, &false) 143 .await?; 144 self.state = ClientSessionState::WaitStateChange; 145 } 146 ClientSessionState::PublishingContent => { 147 log::info!("[C -> S] PublishingContent..."); 148 self.send_publish(&0.0, &self.stream_name.clone(), &"live".to_string()) 149 .await?; 150 self.state = ClientSessionState::WaitStateChange; 151 } 152 ClientSessionState::StartPublish => { 153 log::info!("[C -> S] StartPublish..."); 154 self.common.send_channel_data().await?; 155 } 156 ClientSessionState::WaitStateChange => {} 157 } 158 159 let data = self.io.lock().await.read().await?; 160 self.unpacketizer.extend_data(&data[..]); 161 let result = self.unpacketizer.read_chunk()?; 162 163 match result { 164 UnpackResult::ChunkInfo(chunk_info) => { 165 let mut message_parser = MessageParser::new(chunk_info.clone()); 166 let mut msg = message_parser.parse()?; 167 let timestamp = chunk_info.message_header.timestamp; 168 169 self.process_messages(&mut msg, ×tamp).await?; 170 } 171 _ => {} 172 } 173 } 174 175 // Ok(()) 176 } 177 178 async fn handshake(&mut self) -> Result<(), SessionError> { 179 loop { 180 self.handshaker.handshake().await?; 181 if self.handshaker.state == ClientHandshakeState::Finish { 182 log::info!("handshake finish"); 183 break; 184 } 185 186 let data = self.io.lock().await.read().await?; 187 print(data.clone()); 188 self.handshaker.extend_data(&data[..]); 189 } 190 191 self.state = ClientSessionState::Connect; 192 193 Ok(()) 194 } 195 196 pub async fn process_messages( 197 &mut self, 198 msg: &mut RtmpMessageData, 199 timestamp: &u32, 200 ) -> Result<(), SessionError> { 201 match msg { 202 RtmpMessageData::Amf0Command { 203 command_name, 204 transaction_id, 205 command_object, 206 others, 207 } => { 208 self.on_amf0_command_message(command_name, transaction_id, command_object, others) 209 .await? 210 } 211 RtmpMessageData::SetPeerBandwidth { properties } => { 212 log::trace!( 213 "process_messages SetPeerBandwidth windows size: {}", 214 properties.window_size 215 ); 216 self.on_set_peer_bandwidth().await? 217 } 218 RtmpMessageData::SetChunkSize { chunk_size } => self.on_set_chunk_size(chunk_size)?, 219 220 RtmpMessageData::StreamBegin { stream_id } => self.on_stream_begin(stream_id)?, 221 222 RtmpMessageData::StreamIsRecorded { stream_id } => { 223 self.on_stream_is_recorded(stream_id)? 224 } 225 226 RtmpMessageData::AudioData { data } => self.common.on_audio_data(data, timestamp)?, 227 228 RtmpMessageData::VideoData { data } => self.common.on_video_data(data, timestamp)?, 229 230 _ => {} 231 } 232 Ok(()) 233 } 234 235 pub async fn on_amf0_command_message( 236 &mut self, 237 command_name: &Amf0ValueType, 238 transaction_id: &Amf0ValueType, 239 command_object: &Amf0ValueType, 240 others: &mut Vec<Amf0ValueType>, 241 ) -> Result<(), SessionError> { 242 let empty_cmd_name = &String::new(); 243 let cmd_name = match command_name { 244 Amf0ValueType::UTF8String(str) => str, 245 _ => empty_cmd_name, 246 }; 247 248 let transaction_id = match transaction_id { 249 Amf0ValueType::Number(number) => number.clone() as u8, 250 _ => 0, 251 }; 252 253 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 254 let _ = match command_object { 255 Amf0ValueType::Object(obj) => obj, 256 // Amf0ValueType::Null => 257 _ => &empty_cmd_obj, 258 }; 259 260 match cmd_name.as_str() { 261 "_result" => match transaction_id { 262 define::TRANSACTION_ID_CONNECT => { 263 self.on_result_connect().await?; 264 } 265 define::TRANSACTION_ID_CREATE_STREAM => { 266 self.on_result_create_stream()?; 267 } 268 _ => {} 269 }, 270 "_error" => { 271 self.on_error()?; 272 } 273 "onStatus" => { 274 match others.remove(0) { 275 Amf0ValueType::Object(obj) => self.on_status(&obj).await?, 276 _ => { 277 return Err(SessionError { 278 value: SessionErrorValue::Amf0ValueCountNotCorrect, 279 }) 280 } 281 }; 282 } 283 284 _ => {} 285 } 286 287 Ok(()) 288 } 289 290 pub async fn send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 291 self.send_set_chunk_size().await?; 292 293 let mut netconnection = NetConnection::new(BytesWriter::new()); 294 295 let mut properties = ConnectProperties::new_none(); 296 297 let url = format!("rtmp://localhost:1935/{app_name}", app_name = self.app_name); 298 properties.app = Some(self.app_name.clone()); 299 properties.tc_url = Some(url.clone()); 300 301 match self.client_type { 302 ClientType::Play => { 303 properties.flash_ver = Some("flashVerFMLE/3.0 (compatible; FMSc/1.0)".to_string()); 304 properties.swf_url = Some(url.clone()); 305 } 306 ClientType::Publish => { 307 properties.fpad = Some(false); 308 properties.capabilities = Some(15_f64); 309 properties.audio_codecs = Some(3191_f64); 310 properties.video_codecs = Some(252_f64); 311 properties.video_function = Some(1_f64); 312 } 313 } 314 315 let data = netconnection.connect(transaction_id, &properties)?; 316 317 let mut chunk_info = ChunkInfo::new( 318 csid_type::COMMAND_AMF0_AMF3, 319 chunk_type::TYPE_0, 320 0, 321 data.len() as u32, 322 msg_type_id::COMMAND_AMF0, 323 0, 324 data, 325 ); 326 327 self.packetizer.write_chunk(&mut chunk_info).await?; 328 Ok(()) 329 } 330 331 pub async fn send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 332 let mut netconnection = NetConnection::new(BytesWriter::new()); 333 let data = netconnection.create_stream(transaction_id)?; 334 335 let mut chunk_info = ChunkInfo::new( 336 csid_type::COMMAND_AMF0_AMF3, 337 chunk_type::TYPE_0, 338 0, 339 data.len() as u32, 340 msg_type_id::COMMAND_AMF0, 341 0, 342 data, 343 ); 344 345 self.packetizer.write_chunk(&mut chunk_info).await?; 346 347 Ok(()) 348 } 349 350 pub async fn send_delete_stream( 351 &mut self, 352 transaction_id: &f64, 353 stream_id: &f64, 354 ) -> Result<(), SessionError> { 355 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 356 netstream.delete_stream(transaction_id, stream_id).await?; 357 358 Ok(()) 359 } 360 361 pub async fn send_publish( 362 &mut self, 363 transaction_id: &f64, 364 stream_name: &String, 365 stream_type: &String, 366 ) -> Result<(), SessionError> { 367 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 368 netstream 369 .publish(transaction_id, stream_name, stream_type) 370 .await?; 371 372 Ok(()) 373 } 374 375 pub async fn send_play( 376 &mut self, 377 transaction_id: &f64, 378 stream_name: &String, 379 start: &f64, 380 duration: &f64, 381 reset: &bool, 382 ) -> Result<(), SessionError> { 383 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 384 netstream 385 .play(transaction_id, stream_name, start, duration, reset) 386 .await?; 387 388 Ok(()) 389 } 390 391 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 392 let mut controlmessage = 393 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 394 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 395 Ok(()) 396 } 397 398 pub async fn send_window_acknowledgement_size( 399 &mut self, 400 window_size: u32, 401 ) -> Result<(), SessionError> { 402 let mut controlmessage = 403 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 404 controlmessage 405 .write_window_acknowledgement_size(window_size) 406 .await?; 407 Ok(()) 408 } 409 410 pub async fn send_set_buffer_length( 411 &mut self, 412 stream_id: u32, 413 ms: u32, 414 ) -> Result<(), SessionError> { 415 let mut eventmessages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 416 eventmessages.write_set_buffer_length(stream_id, ms).await?; 417 418 Ok(()) 419 } 420 421 pub async fn on_result_connect(&mut self) -> Result<(), SessionError> { 422 let mut controlmessage = 423 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 424 controlmessage.write_acknowledgement(3107).await?; 425 426 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 427 netstream 428 .release_stream(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name) 429 .await?; 430 netstream 431 .fcpublish(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name) 432 .await?; 433 434 self.state = ClientSessionState::CreateStream; 435 436 Ok(()) 437 } 438 439 pub fn on_result_create_stream(&mut self) -> Result<(), SessionError> { 440 match self.client_type { 441 ClientType::Play => { 442 self.state = ClientSessionState::Play; 443 } 444 ClientType::Publish => { 445 self.state = ClientSessionState::PublishingContent; 446 } 447 } 448 Ok(()) 449 } 450 451 pub fn on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError> { 452 self.unpacketizer 453 .update_max_chunk_size(chunk_size.clone() as usize); 454 Ok(()) 455 } 456 457 pub fn on_stream_is_recorded(&mut self, stream_id: &mut u32) -> Result<(), SessionError> { 458 log::trace!("stream is recorded stream_id is {}", stream_id); 459 Ok(()) 460 } 461 462 pub fn on_stream_begin(&mut self, stream_id: &mut u32) -> Result<(), SessionError> { 463 log::trace!("stream is begin stream_id is {}", stream_id); 464 Ok(()) 465 } 466 467 pub async fn on_set_peer_bandwidth(&mut self) -> Result<(), SessionError> { 468 self.send_window_acknowledgement_size(250000).await?; 469 Ok(()) 470 } 471 472 pub fn on_error(&mut self) -> Result<(), SessionError> { 473 Ok(()) 474 } 475 476 pub async fn on_status( 477 &mut self, 478 obj: &HashMap<String, Amf0ValueType>, 479 ) -> Result<(), SessionError> { 480 if let Some(Amf0ValueType::UTF8String(code_info)) = obj.get("code") { 481 match &code_info[..] { 482 "NetStream.Publish.Start" => { 483 self.state = ClientSessionState::StartPublish; 484 self.common 485 .subscribe_from_channels( 486 self.app_name.clone(), 487 self.stream_name.clone(), 488 self.subscriber_id, 489 ) 490 .await?; 491 } 492 "NetStream.Publish.Reset" => {} 493 494 "NetStream.Play.Start" => { 495 self.common 496 .publish_to_channels(self.app_name.clone(), self.stream_name.clone()) 497 .await? 498 } 499 _ => {} 500 } 501 } 502 log::trace!("{}", obj.len()); 503 Ok(()) 504 } 505 } 506