1 use { 2 super::{ 3 common::Common, 4 define, 5 define::SessionType, 6 errors::{SessionError, SessionErrorValue}, 7 }, 8 crate::utils::print::print, 9 crate::{ 10 amf0::Amf0ValueType, 11 channels::define::ChannelEventProducer, 12 chunk::{ 13 define::{chunk_type, csid_type, CHUNK_SIZE}, 14 packetizer::ChunkPacketizer, 15 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 16 ChunkInfo, 17 }, 18 handshake::handshake::{ClientHandshakeState, SimpleHandshakeClient}, 19 messages::{ 20 define::{msg_type_id, RtmpMessageData}, 21 parser::MessageParser, 22 }, 23 netconnection::commands::{ConnectProperties, NetConnection}, 24 netstream::writer::NetStreamWriter, 25 protocol_control_messages::writer::ProtocolControlMessagesWriter, 26 user_control_messages::writer::EventMessagesWriter, 27 }, 28 networkio::{ 29 bytes_writer::{AsyncBytesWriter, BytesWriter}, 30 networkio::NetworkIO, 31 }, 32 std::{collections::HashMap, sync::Arc}, 33 tokio::{net::TcpStream, sync::Mutex}, 34 }; 35 36 #[allow(dead_code)] 37 enum ClientSessionState { 38 Handshake, 39 Connect, 40 CreateStream, 41 Play, 42 PublishingContent, 43 StartPublish, 44 WaitStateChange, 45 } 46 47 #[allow(dead_code)] 48 enum ClientSessionPlayState { 49 Handshake, 50 Connect, 51 CreateStream, 52 Play, 53 } 54 55 #[allow(dead_code)] 56 enum ClientSessionPublishState { 57 Handshake, 58 Connect, 59 CreateStream, 60 PublishingContent, 61 } 62 #[allow(dead_code)] 63 pub enum ClientType { 64 Play, 65 Publish, 66 } 67 pub struct ClientSession { 68 io: Arc<Mutex<NetworkIO>>, 69 common: Common, 70 71 handshaker: SimpleHandshakeClient, 72 73 packetizer: ChunkPacketizer, 74 unpacketizer: ChunkUnpacketizer, 75 76 app_name: String, 77 stream_name: String, 78 session_type: u8, 79 session_id: u64, 80 81 state: ClientSessionState, 82 client_type: ClientType, 83 } 84 85 impl ClientSession { 86 #[allow(dead_code)] 87 pub fn new( 88 stream: TcpStream, 89 client_type: ClientType, 90 app_name: String, 91 stream_name: String, 92 event_producer: ChannelEventProducer, 93 session_id: u64, 94 ) -> Self { 95 let net_io = Arc::new(Mutex::new(NetworkIO::new(stream))); 96 97 Self { 98 io: Arc::clone(&net_io), 99 common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Client), 100 101 handshaker: SimpleHandshakeClient::new(Arc::clone(&net_io)), 102 103 packetizer: ChunkPacketizer::new(Arc::clone(&net_io)), 104 unpacketizer: ChunkUnpacketizer::new(), 105 106 app_name: app_name, 107 stream_name: stream_name, 108 client_type: client_type, 109 110 state: ClientSessionState::Handshake, 111 session_type: 0, 112 session_id: session_id, 113 } 114 } 115 116 pub async fn run(&mut self) -> Result<(), SessionError> { 117 loop { 118 match self.state { 119 ClientSessionState::Handshake => { 120 println!("handshake"); 121 self.handshake().await?; 122 continue; 123 } 124 ClientSessionState::Connect => { 125 println!("connect"); 126 self.send_connect(&(define::TRANSACTION_ID_CONNECT as f64)) 127 .await?; 128 self.state = ClientSessionState::WaitStateChange; 129 } 130 ClientSessionState::CreateStream => { 131 println!("CreateStream"); 132 self.send_create_stream(&(define::TRANSACTION_ID_CREATE_STREAM as f64)) 133 .await?; 134 self.state = ClientSessionState::WaitStateChange; 135 } 136 ClientSessionState::Play => { 137 self.send_play(&0.0, &self.stream_name.clone(), &0.0, &0.0, &false) 138 .await?; 139 self.state = ClientSessionState::WaitStateChange; 140 } 141 ClientSessionState::PublishingContent => { 142 println!("PublishingContent"); 143 self.send_publish(&0.0, &self.stream_name.clone(), &"live".to_string()) 144 .await?; 145 self.state = ClientSessionState::WaitStateChange; 146 } 147 ClientSessionState::StartPublish => { 148 println!("StartPublish"); 149 self.common.send_channel_data().await?; 150 } 151 ClientSessionState::WaitStateChange => {} 152 } 153 154 let data = self.io.lock().await.read().await?; 155 self.unpacketizer.extend_data(&data[..]); 156 let result = self.unpacketizer.read_chunk()?; 157 158 match result { 159 UnpackResult::ChunkInfo(chunk_info) => { 160 let mut message_parser = 161 MessageParser::new(chunk_info.clone(), self.session_type); 162 let mut msg = message_parser.parse()?; 163 let timestamp = chunk_info.message_header.timestamp; 164 165 self.process_messages(&mut msg, ×tamp).await?; 166 } 167 _ => {} 168 } 169 } 170 171 // Ok(()) 172 } 173 174 async fn handshake(&mut self) -> Result<(), SessionError> { 175 loop { 176 self.handshaker.handshake().await?; 177 if self.handshaker.state == ClientHandshakeState::Finish { 178 println!("handshake finish"); 179 break; 180 } 181 182 let data = self.io.lock().await.read().await?; 183 print(data.clone()); 184 self.handshaker.extend_data(&data[..]); 185 } 186 187 self.state = ClientSessionState::Connect; 188 189 Ok(()) 190 } 191 192 pub async fn process_messages( 193 &mut self, 194 msg: &mut RtmpMessageData, 195 timestamp: &u32, 196 ) -> Result<(), SessionError> { 197 match msg { 198 RtmpMessageData::Amf0Command { 199 command_name, 200 transaction_id, 201 command_object, 202 others, 203 } => { 204 self.on_amf0_command_message(command_name, transaction_id, command_object, others) 205 .await? 206 } 207 RtmpMessageData::SetPeerBandwidth { properties } => { 208 print!("{}", properties.window_size); 209 self.on_set_peer_bandwidth().await? 210 } 211 RtmpMessageData::SetChunkSize { chunk_size } => self.on_set_chunk_size(chunk_size)?, 212 213 RtmpMessageData::StreamBegin { stream_id } => self.on_stream_begin(stream_id)?, 214 215 RtmpMessageData::StreamIsRecorded { stream_id } => { 216 self.on_stream_is_recorded(stream_id)? 217 } 218 219 RtmpMessageData::AudioData { data } => self.common.on_audio_data(data, timestamp)?, 220 221 RtmpMessageData::VideoData { data } => self.common.on_video_data(data, timestamp)?, 222 223 _ => {} 224 } 225 Ok(()) 226 } 227 228 pub async fn on_amf0_command_message( 229 &mut self, 230 command_name: &Amf0ValueType, 231 transaction_id: &Amf0ValueType, 232 command_object: &Amf0ValueType, 233 others: &mut Vec<Amf0ValueType>, 234 ) -> Result<(), SessionError> { 235 let empty_cmd_name = &String::new(); 236 let cmd_name = match command_name { 237 Amf0ValueType::UTF8String(str) => str, 238 _ => empty_cmd_name, 239 }; 240 241 let transaction_id = match transaction_id { 242 Amf0ValueType::Number(number) => number.clone() as u8, 243 _ => 0, 244 }; 245 246 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 247 let _ = match command_object { 248 Amf0ValueType::Object(obj) => obj, 249 // Amf0ValueType::Null => 250 _ => &empty_cmd_obj, 251 }; 252 253 match cmd_name.as_str() { 254 "_result" => match transaction_id { 255 define::TRANSACTION_ID_CONNECT => { 256 self.on_result_connect().await?; 257 } 258 define::TRANSACTION_ID_CREATE_STREAM => { 259 self.on_result_create_stream()?; 260 } 261 _ => {} 262 }, 263 "_error" => { 264 self.on_error()?; 265 } 266 "onStatus" => { 267 match others.remove(0) { 268 Amf0ValueType::Object(obj) => self.on_status(&obj).await?, 269 _ => { 270 return Err(SessionError { 271 value: SessionErrorValue::Amf0ValueCountNotCorrect, 272 }) 273 } 274 }; 275 } 276 277 _ => {} 278 } 279 280 Ok(()) 281 } 282 283 pub async fn send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 284 self.send_set_chunk_size().await?; 285 286 let mut netconnection = NetConnection::new(BytesWriter::new()); 287 288 let mut properties = ConnectProperties::new_none(); 289 290 let url = format!("rtmp://localhost:1935/{app_name}", app_name = self.app_name); 291 properties.app = Some(self.app_name.clone()); 292 properties.tc_url = Some(url.clone()); 293 294 match self.client_type { 295 ClientType::Play => { 296 properties.flash_ver = Some("flashVerFMLE/3.0 (compatible; FMSc/1.0)".to_string()); 297 properties.swf_url = Some(url.clone()); 298 } 299 ClientType::Publish => { 300 properties.fpad = Some(false); 301 properties.capabilities = Some(15_f64); 302 properties.audio_codecs = Some(3191_f64); 303 properties.video_codecs = Some(252_f64); 304 properties.video_function = Some(1_f64); 305 } 306 } 307 308 let data = netconnection.connect(transaction_id, &properties)?; 309 310 let mut chunk_info = ChunkInfo::new( 311 csid_type::COMMAND_AMF0_AMF3, 312 chunk_type::TYPE_0, 313 0, 314 data.len() as u32, 315 msg_type_id::COMMAND_AMF0, 316 0, 317 data, 318 ); 319 320 self.packetizer.write_chunk(&mut chunk_info).await?; 321 Ok(()) 322 } 323 324 pub async fn send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 325 let mut netconnection = NetConnection::new(BytesWriter::new()); 326 let data = netconnection.create_stream(transaction_id)?; 327 328 let mut chunk_info = ChunkInfo::new( 329 csid_type::COMMAND_AMF0_AMF3, 330 chunk_type::TYPE_0, 331 0, 332 data.len() as u32, 333 msg_type_id::COMMAND_AMF0, 334 0, 335 data, 336 ); 337 338 self.packetizer.write_chunk(&mut chunk_info).await?; 339 340 Ok(()) 341 } 342 343 pub async fn send_delete_stream( 344 &mut self, 345 transaction_id: &f64, 346 stream_id: &f64, 347 ) -> Result<(), SessionError> { 348 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 349 netstream.delete_stream(transaction_id, stream_id).await?; 350 351 Ok(()) 352 } 353 354 pub async fn send_publish( 355 &mut self, 356 transaction_id: &f64, 357 stream_name: &String, 358 stream_type: &String, 359 ) -> Result<(), SessionError> { 360 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 361 netstream 362 .publish(transaction_id, stream_name, stream_type) 363 .await?; 364 365 Ok(()) 366 } 367 368 pub async fn send_play( 369 &mut self, 370 transaction_id: &f64, 371 stream_name: &String, 372 start: &f64, 373 duration: &f64, 374 reset: &bool, 375 ) -> Result<(), SessionError> { 376 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 377 netstream 378 .play(transaction_id, stream_name, start, duration, reset) 379 .await?; 380 381 Ok(()) 382 } 383 384 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 385 let mut controlmessage = 386 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 387 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 388 Ok(()) 389 } 390 391 pub async fn send_window_acknowledgement_size( 392 &mut self, 393 window_size: u32, 394 ) -> Result<(), SessionError> { 395 let mut controlmessage = 396 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 397 controlmessage 398 .write_window_acknowledgement_size(window_size) 399 .await?; 400 Ok(()) 401 } 402 403 pub async fn send_set_buffer_length( 404 &mut self, 405 stream_id: u32, 406 ms: u32, 407 ) -> Result<(), SessionError> { 408 let mut eventmessages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 409 eventmessages.write_set_buffer_length(stream_id, ms).await?; 410 411 Ok(()) 412 } 413 414 pub async fn on_result_connect(&mut self) -> Result<(), SessionError> { 415 let mut controlmessage = 416 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 417 controlmessage.write_acknowledgement(3107).await?; 418 419 let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); 420 netstream 421 .release_stream(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name) 422 .await?; 423 netstream 424 .fcpublish(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name) 425 .await?; 426 427 self.state = ClientSessionState::CreateStream; 428 429 Ok(()) 430 } 431 432 pub fn on_result_create_stream(&mut self) -> Result<(), SessionError> { 433 match self.client_type { 434 ClientType::Play => { 435 self.state = ClientSessionState::Play; 436 } 437 ClientType::Publish => { 438 self.state = ClientSessionState::PublishingContent; 439 } 440 } 441 Ok(()) 442 } 443 444 pub fn on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError> { 445 self.unpacketizer 446 .update_max_chunk_size(chunk_size.clone() as usize); 447 Ok(()) 448 } 449 450 pub fn on_stream_is_recorded(&mut self, stream_id: &mut u32) -> Result<(), SessionError> { 451 println!("stream is recorded stream_id is {}", stream_id); 452 Ok(()) 453 } 454 455 pub fn on_stream_begin(&mut self, stream_id: &mut u32) -> Result<(), SessionError> { 456 println!("stream is begin stream_id is {}", stream_id); 457 Ok(()) 458 } 459 460 pub async fn on_set_peer_bandwidth(&mut self) -> Result<(), SessionError> { 461 self.send_window_acknowledgement_size(250000).await?; 462 Ok(()) 463 } 464 465 pub fn on_error(&mut self) -> Result<(), SessionError> { 466 Ok(()) 467 } 468 469 pub async fn on_status( 470 &mut self, 471 obj: &HashMap<String, Amf0ValueType>, 472 ) -> Result<(), SessionError> { 473 println!("on_status==="); 474 if let Some(Amf0ValueType::UTF8String(code_info)) = obj.get("code") { 475 match &code_info[..] { 476 "NetStream.Publish.Start" => { 477 self.state = ClientSessionState::StartPublish; 478 self.common 479 .subscribe_from_channels( 480 self.app_name.clone(), 481 self.stream_name.clone(), 482 self.session_id, 483 ) 484 .await?; 485 } 486 "NetStream.Publish.Reset" => {} 487 488 "NetStream.Play.Start" => { 489 self.common 490 .publish_to_channels(self.app_name.clone(), self.stream_name.clone()) 491 .await? 492 } 493 _ => {} 494 } 495 } 496 println!("{}", obj.len()); 497 Ok(()) 498 } 499 } 500