1 use { 2 super::{ 3 common::Common, 4 define, 5 define::SessionType, 6 errors::{SessionError, SessionErrorValue}, 7 }, 8 crate::utils::print::print, 9 crate::{ 10 amf0::Amf0ValueType, 11 channels::define::ChannelEventProducer, 12 chunk::{ 13 define::{chunk_type, csid_type, CHUNK_SIZE}, 14 packetizer::ChunkPacketizer, 15 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 16 ChunkInfo, 17 }, 18 handshake::handshake::{ClientHandshakeState, SimpleHandshakeClient}, 19 messages::{ 20 define::{msg_type_id, RtmpMessageData}, 21 parser::MessageParser, 22 }, 23 netconnection::commands::{ConnectProperties, NetConnection}, 24 netstream::writer::NetStreamWriter, 25 protocol_control_messages::writer::ProtocolControlMessagesWriter, 26 user_control_messages::writer::EventMessagesWriter, 27 }, 28 bytes::BytesMut, 29 networkio::{ 30 bytes_writer::{AsyncBytesWriter, BytesWriter}, 31 networkio::NetworkIO, 32 }, 33 std::{collections::HashMap, sync::Arc}, 34 tokio::{net::TcpStream, sync::Mutex}, 35 }; 36 37 #[allow(dead_code)] 38 enum ClientSessionState { 39 Handshake, 40 Connect, 41 CreateStream, 42 Play, 43 PublishingContent, 44 StartPublish, 45 WaitStateChange, 46 } 47 48 #[allow(dead_code)] 49 enum ClientSessionPlayState { 50 Handshake, 51 Connect, 52 CreateStream, 53 Play, 54 } 55 56 #[allow(dead_code)] 57 enum ClientSessionPublishState { 58 Handshake, 59 Connect, 60 CreateStream, 61 PublishingContent, 62 } 63 #[allow(dead_code)] 64 pub enum ClientType { 65 Play, 66 Publish, 67 } 68 pub struct ClientSession { 69 io: Arc<Mutex<NetworkIO>>, 70 common: Common, 71 72 handshaker: SimpleHandshakeClient, 73 74 packetizer: ChunkPacketizer, 75 unpacketizer: ChunkUnpacketizer, 76 77 app_name: String, 78 stream_name: String, 79 session_type: u8, 80 session_id: u64, 81 82 state: ClientSessionState, 83 client_type: ClientType, 84 connect_command_object: Option<HashMap<String, Amf0ValueType>>, 85 } 86 87 impl ClientSession { 88 #[allow(dead_code)] 89 pub fn new( 90 stream: TcpStream, 91 client_type: ClientType, 92 app_name: String, 93 stream_name: String, 94 event_producer: ChannelEventProducer, 95 session_id: u64, 96 ) -> Self { 97 let net_io = Arc::new(Mutex::new(NetworkIO::new(stream))); 98 99 Self { 100 io: Arc::clone(&net_io), 101 common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Client), 102 103 handshaker: SimpleHandshakeClient::new(Arc::clone(&net_io)), 104 105 packetizer: ChunkPacketizer::new(Arc::clone(&net_io)), 106 unpacketizer: ChunkUnpacketizer::new(), 107 108 app_name: app_name, 109 stream_name: stream_name, 110 client_type: client_type, 111 112 state: ClientSessionState::Handshake, 113 session_type: 0, 114 session_id: session_id, 115 connect_command_object: None, 116 } 117 } 118 119 pub fn set_connect_command_object(&mut self, object: HashMap<String, Amf0ValueType>) { 120 self.connect_command_object = Some(object); 121 } 122 123 pub async fn run(&mut self) -> Result<(), SessionError> { 124 loop { 125 match self.state { 126 ClientSessionState::Handshake => { 127 println!("handshake"); 128 self.handshake().await?; 129 continue; 130 } 131 ClientSessionState::Connect => { 132 println!("connect"); 133 self.send_connect(&(define::TRANSACTION_ID_CONNECT as f64)) 134 .await?; 135 self.state = ClientSessionState::WaitStateChange; 136 } 137 ClientSessionState::CreateStream => { 138 println!("CreateStream"); 139 self.send_create_stream(&(define::TRANSACTION_ID_CREATE_STREAM as f64)) 140 .await?; 141 self.state = ClientSessionState::WaitStateChange; 142 } 143 ClientSessionState::Play => { 144 self.send_play(&0.0, &self.stream_name.clone(), &0.0, &0.0, &false) 145 .await?; 146 self.state = ClientSessionState::WaitStateChange; 147 } 148 ClientSessionState::PublishingContent => { 149 println!("PublishingContent"); 150 self.send_publish(&0.0, &self.stream_name.clone(), &"live".to_string()) 151 .await?; 152 self.state = ClientSessionState::WaitStateChange; 153 } 154 ClientSessionState::StartPublish => { 155 println!("StartPublish"); 156 self.common.send_channel_data().await?; 157 } 158 ClientSessionState::WaitStateChange => {} 159 } 160 161 let data = self.io.lock().await.read().await?; 162 self.unpacketizer.extend_data(&data[..]); 163 let result = self.unpacketizer.read_chunk()?; 164 165 match result { 166 UnpackResult::ChunkInfo(chunk_info) => { 167 let mut message_parser = 168 MessageParser::new(chunk_info.clone(), self.session_type); 169 let mut msg = message_parser.parse()?; 170 let timestamp = chunk_info.message_header.timestamp; 171 172 self.process_messages(&mut msg, ×tamp).await?; 173 } 174 _ => {} 175 } 176 } 177 178 // Ok(()) 179 } 180 181 async fn handshake(&mut self) -> Result<(), SessionError> { 182 loop { 183 self.handshaker.handshake().await?; 184 if self.handshaker.state == ClientHandshakeState::Finish { 185 println!("handshake finish"); 186 break; 187 } 188 189 let data = self.io.lock().await.read().await?; 190 print(data.clone()); 191 self.handshaker.extend_data(&data[..]); 192 } 193 194 self.state = ClientSessionState::Connect; 195 196 Ok(()) 197 } 198 199 pub async fn process_messages( 200 &mut self, 201 msg: &mut RtmpMessageData, 202 timestamp: &u32, 203 ) -> Result<(), SessionError> { 204 match msg { 205 RtmpMessageData::Amf0Command { 206 command_name, 207 transaction_id, 208 command_object, 209 others, 210 } => { 211 self.on_amf0_command_message(command_name, transaction_id, command_object, others) 212 .await? 213 } 214 RtmpMessageData::SetPeerBandwidth { properties } => { 215 print!("{}", properties.window_size); 216 self.on_set_peer_bandwidth().await? 217 } 218 RtmpMessageData::SetChunkSize { chunk_size } => self.on_set_chunk_size(chunk_size)?, 219 220 RtmpMessageData::StreamBegin { stream_id } => self.on_stream_begin(stream_id)?, 221 222 RtmpMessageData::StreamIsRecorded { stream_id } => { 223 self.on_stream_is_recorded(stream_id)? 224 } 225 226 RtmpMessageData::AudioData { data } => self.common.on_audio_data(data, timestamp)?, 227 228 RtmpMessageData::VideoData { data } => self.common.on_video_data(data, timestamp)?, 229 230 _ => {} 231 } 232 Ok(()) 233 } 234 235 pub async fn on_amf0_command_message( 236 &mut self, 237 command_name: &Amf0ValueType, 238 transaction_id: &Amf0ValueType, 239 command_object: &Amf0ValueType, 240 others: &mut Vec<Amf0ValueType>, 241 ) -> Result<(), SessionError> { 242 let empty_cmd_name = &String::new(); 243 let cmd_name = match command_name { 244 Amf0ValueType::UTF8String(str) => str, 245 _ => empty_cmd_name, 246 }; 247 248 let transaction_id = match transaction_id { 249 Amf0ValueType::Number(number) => number.clone() as u8, 250 _ => 0, 251 }; 252 253 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 254 let _ = match command_object { 255 Amf0ValueType::Object(obj) => obj, 256 // Amf0ValueType::Null => 257 _ => &empty_cmd_obj, 258 }; 259 260 match cmd_name.as_str() { 261 "_result" => match transaction_id { 262 define::TRANSACTION_ID_CONNECT => { 263 self.on_result_connect().await?; 264 } 265 define::TRANSACTION_ID_CREATE_STREAM => { 266 self.on_result_create_stream()?; 267 } 268 _ => {} 269 }, 270 "_error" => { 271 self.on_error()?; 272 } 273 "onStatus" => { 274 match others.remove(0) { 275 Amf0ValueType::Object(obj) => self.on_status(&obj).await?, 276 _ => { 277 return Err(SessionError { 278 value: SessionErrorValue::Amf0ValueCountNotCorrect, 279 }) 280 } 281 }; 282 } 283 284 _ => {} 285 } 286 287 Ok(()) 288 } 289 290 pub async fn send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 291 self.send_set_chunk_size().await?; 292 293 // let properties = ConnectProperties::new(self.app_name.clone()); 294 let mut netconnection = NetConnection::new(BytesWriter::new()); 295 let data = netconnection 296 .connect_with_value(transaction_id, self.connect_command_object.clone().unwrap())?; 297 298 let mut chunk_info = ChunkInfo::new( 299 csid_type::COMMAND_AMF0_AMF3, 300 chunk_type::TYPE_0, 301 0, 302 data.len() as u32, 303 msg_type_id::COMMAND_AMF0, 304 0, 305 data, 306 ); 307 308 self.packetizer.write_chunk(&mut chunk_info).await?; 309 Ok(()) 310 } 311 312 pub async fn send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 313 let mut netconnection = NetConnection::new(BytesWriter::new()); 314 let data = netconnection.create_stream(transaction_id)?; 315 316 let mut chunk_info = ChunkInfo::new( 317 csid_type::COMMAND_AMF0_AMF3, 318 chunk_type::TYPE_0, 319 0, 320 data.len() as u32, 321 msg_type_id::COMMAND_AMF0, 322 0, 323 data, 324 ); 325 326 self.packetizer.write_chunk(&mut chunk_info).await?; 327 328 Ok(()) 329 } 330 331 pub async fn send_delete_stream( 332 &mut self, 333 transaction_id: &f64, 334 stream_id: &f64, 335 ) -> Result<(), SessionError> { 336 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 337 netstream.delete_stream(transaction_id, stream_id).await?; 338 339 Ok(()) 340 } 341 342 pub async fn send_publish( 343 &mut self, 344 transaction_id: &f64, 345 stream_name: &String, 346 stream_type: &String, 347 ) -> Result<(), SessionError> { 348 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 349 netstream 350 .publish(transaction_id, stream_name, stream_type) 351 .await?; 352 353 Ok(()) 354 } 355 356 pub async fn send_play( 357 &mut self, 358 transaction_id: &f64, 359 stream_name: &String, 360 start: &f64, 361 duration: &f64, 362 reset: &bool, 363 ) -> Result<(), SessionError> { 364 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 365 netstream 366 .play(transaction_id, stream_name, start, duration, reset) 367 .await?; 368 369 Ok(()) 370 } 371 372 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 373 let mut controlmessage = 374 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 375 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 376 Ok(()) 377 } 378 379 pub async fn send_window_acknowledgement_size( 380 &mut self, 381 window_size: u32, 382 ) -> Result<(), SessionError> { 383 let mut controlmessage = 384 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 385 controlmessage 386 .write_window_acknowledgement_size(window_size) 387 .await?; 388 Ok(()) 389 } 390 391 pub async fn send_set_buffer_length( 392 &mut self, 393 stream_id: u32, 394 ms: u32, 395 ) -> Result<(), SessionError> { 396 let mut eventmessages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 397 eventmessages.write_set_buffer_length(stream_id, ms).await?; 398 399 Ok(()) 400 } 401 402 pub async fn on_result_connect(&mut self) -> Result<(), SessionError> { 403 let mut controlmessage = 404 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 405 controlmessage.write_acknowledgement(3107).await?; 406 407 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 408 netstream 409 .release_stream(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name) 410 .await?; 411 netstream 412 .fcpublish(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name) 413 .await?; 414 415 self.state = ClientSessionState::CreateStream; 416 417 Ok(()) 418 } 419 420 pub fn on_result_create_stream(&mut self) -> Result<(), SessionError> { 421 match self.client_type { 422 ClientType::Play => { 423 self.state = ClientSessionState::Play; 424 } 425 ClientType::Publish => { 426 self.state = ClientSessionState::PublishingContent; 427 } 428 } 429 Ok(()) 430 } 431 432 pub fn on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError> { 433 self.unpacketizer 434 .update_max_chunk_size(chunk_size.clone() as usize); 435 Ok(()) 436 } 437 438 pub fn on_stream_is_recorded(&mut self, stream_id: &mut u32) -> Result<(), SessionError> { 439 Ok(()) 440 } 441 442 pub fn on_stream_begin(&mut self, stream_id: &mut u32) -> Result<(), SessionError> { 443 Ok(()) 444 } 445 446 pub async fn on_set_peer_bandwidth(&mut self) -> Result<(), SessionError> { 447 self.send_window_acknowledgement_size(250000).await?; 448 Ok(()) 449 } 450 451 pub fn on_error(&mut self) -> Result<(), SessionError> { 452 Ok(()) 453 } 454 455 pub async fn on_status( 456 &mut self, 457 obj: &HashMap<String, Amf0ValueType>, 458 ) -> Result<(), SessionError> { 459 println!("on_status==="); 460 if let Some(Amf0ValueType::UTF8String(code_info)) = obj.get("code") { 461 match &code_info[..] { 462 "NetStream.Publish.Start" => { 463 self.state = ClientSessionState::StartPublish; 464 self.common 465 .subscribe_from_channels( 466 self.app_name.clone(), 467 self.stream_name.clone(), 468 self.session_id, 469 ) 470 .await?; 471 } 472 "NetStream.Publish.Reset" => {} 473 474 // "NetStream.Play.Start" => self.common.publish_to_channels( 475 // self.app_name.clone(), 476 // self.stream_name.clone(), 477 // connect_command_object, 478 // ), 479 480 _ => {} 481 } 482 } 483 println!("{}", obj.len()); 484 Ok(()) 485 } 486 } 487