1 use { 2 super::{ 3 common::Common, 4 define, 5 define::SessionType, 6 errors::{SessionError, SessionErrorValue}, 7 }, 8 crate::utils::print::print, 9 crate::{ 10 amf0::Amf0ValueType, 11 channels::define::ChannelEventProducer, 12 chunk::{ 13 define::{chunk_type, csid_type, CHUNK_SIZE}, 14 packetizer::ChunkPacketizer, 15 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 16 ChunkInfo, 17 }, 18 handshake::handshake::{ClientHandshakeState, SimpleHandshakeClient}, 19 messages::{ 20 define::{msg_type_id, RtmpMessageData}, 21 parser::MessageParser, 22 }, 23 netconnection::commands::{ConnectProperties, NetConnection}, 24 netstream::writer::NetStreamWriter, 25 protocol_control_messages::writer::ProtocolControlMessagesWriter, 26 user_control_messages::writer::EventMessagesWriter, 27 }, 28 bytes::BytesMut, 29 networkio::{ 30 bytes_writer::{AsyncBytesWriter, BytesWriter}, 31 networkio::NetworkIO, 32 }, 33 std::{collections::HashMap, sync::Arc}, 34 tokio::{net::TcpStream, sync::Mutex}, 35 }; 36 37 #[allow(dead_code)] 38 enum ClientSessionState { 39 Handshake, 40 Connect, 41 CreateStream, 42 Play, 43 PublishingContent, 44 StartPublish, 45 WaitStateChange, 46 } 47 48 #[allow(dead_code)] 49 enum ClientSessionPlayState { 50 Handshake, 51 Connect, 52 CreateStream, 53 Play, 54 } 55 56 #[allow(dead_code)] 57 enum ClientSessionPublishState { 58 Handshake, 59 Connect, 60 CreateStream, 61 PublishingContent, 62 } 63 #[allow(dead_code)] 64 pub enum ClientType { 65 Play, 66 Publish, 67 } 68 pub struct ClientSession { 69 io: Arc<Mutex<NetworkIO>>, 70 common: Common, 71 72 handshaker: SimpleHandshakeClient, 73 74 packetizer: ChunkPacketizer, 75 unpacketizer: ChunkUnpacketizer, 76 77 app_name: String, 78 stream_name: String, 79 session_type: u8, 80 session_id: u64, 81 82 state: ClientSessionState, 83 client_type: ClientType, 84 connect_command_object: Option<HashMap<String, Amf0ValueType>>, 85 } 86 87 impl ClientSession { 88 #[allow(dead_code)] 89 pub fn new( 90 stream: TcpStream, 91 client_type: ClientType, 92 app_name: String, 93 stream_name: String, 94 event_producer: ChannelEventProducer, 95 session_id: u64, 96 ) -> Self { 97 let net_io = Arc::new(Mutex::new(NetworkIO::new(stream))); 98 99 Self { 100 io: Arc::clone(&net_io), 101 common: Common::new(Arc::clone(&net_io), event_producer, SessionType::Client), 102 103 handshaker: SimpleHandshakeClient::new(Arc::clone(&net_io)), 104 105 packetizer: ChunkPacketizer::new(Arc::clone(&net_io)), 106 unpacketizer: ChunkUnpacketizer::new(), 107 108 app_name: app_name, 109 stream_name: stream_name, 110 client_type: client_type, 111 112 state: ClientSessionState::Handshake, 113 session_type: 0, 114 session_id: session_id, 115 connect_command_object: None, 116 } 117 } 118 119 pub fn set_connect_command_object(&mut self, object: HashMap<String, Amf0ValueType>) { 120 self.connect_command_object = Some(object); 121 } 122 123 pub async fn run(&mut self) -> Result<(), SessionError> { 124 loop { 125 match self.state { 126 ClientSessionState::Handshake => { 127 println!("handshake"); 128 self.handshake().await?; 129 continue; 130 } 131 ClientSessionState::Connect => { 132 println!("connect"); 133 self.send_connect(&(define::TRANSACTION_ID_CONNECT as f64)) 134 .await?; 135 self.state = ClientSessionState::WaitStateChange; 136 } 137 ClientSessionState::CreateStream => { 138 println!("CreateStream"); 139 self.send_create_stream(&(define::TRANSACTION_ID_CREATE_STREAM as f64)) 140 .await?; 141 self.state = ClientSessionState::WaitStateChange; 142 } 143 ClientSessionState::Play => { 144 self.send_play(&0.0, &self.stream_name.clone(), &0.0, &0.0, &false) 145 .await?; 146 self.state = ClientSessionState::WaitStateChange; 147 } 148 ClientSessionState::PublishingContent => { 149 println!("PublishingContent"); 150 self.send_publish(&0.0, &self.stream_name.clone(), &"live".to_string()) 151 .await?; 152 self.state = ClientSessionState::WaitStateChange; 153 } 154 ClientSessionState::StartPublish => { 155 println!("StartPublish"); 156 self.common.send_channel_data().await?; 157 } 158 ClientSessionState::WaitStateChange => {} 159 } 160 161 let data = self.io.lock().await.read().await?; 162 self.unpacketizer.extend_data(&data[..]); 163 let result = self.unpacketizer.read_chunk()?; 164 165 match result { 166 UnpackResult::ChunkInfo(chunk_info) => { 167 let mut message_parser = MessageParser::new(chunk_info, self.session_type); 168 let mut msg = message_parser.parse()?; 169 170 self.process_messages(&mut msg).await?; 171 } 172 _ => {} 173 } 174 } 175 176 // Ok(()) 177 } 178 179 async fn handshake(&mut self) -> Result<(), SessionError> { 180 loop { 181 self.handshaker.handshake().await?; 182 if self.handshaker.state == ClientHandshakeState::Finish { 183 println!("handshake finish"); 184 break; 185 } 186 187 let data = self.io.lock().await.read().await?; 188 print(data.clone()); 189 self.handshaker.extend_data(&data[..]); 190 } 191 192 self.state = ClientSessionState::Connect; 193 194 Ok(()) 195 } 196 197 pub async fn process_messages( 198 &mut self, 199 msg: &mut RtmpMessageData, 200 ) -> Result<(), SessionError> { 201 match msg { 202 RtmpMessageData::Amf0Command { 203 command_name, 204 transaction_id, 205 command_object, 206 others, 207 } => { 208 self.process_amf0_command_message( 209 command_name, 210 transaction_id, 211 command_object, 212 others, 213 ) 214 .await? 215 } 216 RtmpMessageData::SetPeerBandwidth { properties } => { 217 print!("{}", properties.window_size); 218 self.on_set_peer_bandwidth().await? 219 } 220 RtmpMessageData::SetChunkSize { chunk_size } => self.on_set_chunk_size(chunk_size)?, 221 RtmpMessageData::AudioData { data } => { 222 let _ = data.len(); 223 } 224 RtmpMessageData::VideoData { data } => { 225 let _ = data.len(); 226 } 227 228 _ => {} 229 } 230 Ok(()) 231 } 232 233 pub async fn process_amf0_command_message( 234 &mut self, 235 command_name: &Amf0ValueType, 236 transaction_id: &Amf0ValueType, 237 command_object: &Amf0ValueType, 238 others: &mut Vec<Amf0ValueType>, 239 ) -> Result<(), SessionError> { 240 let empty_cmd_name = &String::new(); 241 let cmd_name = match command_name { 242 Amf0ValueType::UTF8String(str) => str, 243 _ => empty_cmd_name, 244 }; 245 246 let transaction_id = match transaction_id { 247 Amf0ValueType::Number(number) => number.clone() as u8, 248 _ => 0, 249 }; 250 251 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 252 let _ = match command_object { 253 Amf0ValueType::Object(obj) => obj, 254 // Amf0ValueType::Null => 255 _ => &empty_cmd_obj, 256 }; 257 258 match cmd_name.as_str() { 259 "_result" => match transaction_id { 260 define::TRANSACTION_ID_CONNECT => { 261 self.on_result_connect().await?; 262 } 263 define::TRANSACTION_ID_CREATE_STREAM => { 264 self.on_result_create_stream()?; 265 } 266 _ => {} 267 }, 268 "_error" => { 269 self.on_error()?; 270 } 271 "onStatus" => { 272 match others.remove(0) { 273 Amf0ValueType::Object(obj) => self.on_status(&obj).await?, 274 _ => { 275 return Err(SessionError { 276 value: SessionErrorValue::Amf0ValueCountNotCorrect, 277 }) 278 } 279 }; 280 } 281 282 _ => {} 283 } 284 285 Ok(()) 286 } 287 288 pub async fn send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 289 self.send_set_chunk_size().await?; 290 291 // let properties = ConnectProperties::new(self.app_name.clone()); 292 let mut netconnection = NetConnection::new(BytesWriter::new()); 293 let data = netconnection 294 .connect_with_value(transaction_id, self.connect_command_object.clone().unwrap())?; 295 296 let mut chunk_info = ChunkInfo::new( 297 csid_type::COMMAND_AMF0_AMF3, 298 chunk_type::TYPE_0, 299 0, 300 data.len() as u32, 301 msg_type_id::COMMAND_AMF0, 302 0, 303 data, 304 ); 305 306 self.packetizer.write_chunk(&mut chunk_info).await?; 307 Ok(()) 308 } 309 310 pub async fn send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 311 let mut netconnection = NetConnection::new(BytesWriter::new()); 312 let data = netconnection.create_stream(transaction_id)?; 313 314 let mut chunk_info = ChunkInfo::new( 315 csid_type::COMMAND_AMF0_AMF3, 316 chunk_type::TYPE_0, 317 0, 318 data.len() as u32, 319 msg_type_id::COMMAND_AMF0, 320 0, 321 data, 322 ); 323 324 self.packetizer.write_chunk(&mut chunk_info).await?; 325 326 Ok(()) 327 } 328 329 pub async fn send_delete_stream( 330 &mut self, 331 transaction_id: &f64, 332 stream_id: &f64, 333 ) -> Result<(), SessionError> { 334 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 335 netstream.delete_stream(transaction_id, stream_id).await?; 336 337 Ok(()) 338 } 339 340 pub async fn send_publish( 341 &mut self, 342 transaction_id: &f64, 343 stream_name: &String, 344 stream_type: &String, 345 ) -> Result<(), SessionError> { 346 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 347 netstream 348 .publish(transaction_id, stream_name, stream_type) 349 .await?; 350 351 Ok(()) 352 } 353 354 pub async fn send_play( 355 &mut self, 356 transaction_id: &f64, 357 stream_name: &String, 358 start: &f64, 359 duration: &f64, 360 reset: &bool, 361 ) -> Result<(), SessionError> { 362 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 363 netstream 364 .play(transaction_id, stream_name, start, duration, reset) 365 .await?; 366 367 Ok(()) 368 } 369 370 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 371 let mut controlmessage = 372 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 373 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 374 Ok(()) 375 } 376 377 pub async fn send_window_acknowledgement_size( 378 &mut self, 379 window_size: u32, 380 ) -> Result<(), SessionError> { 381 let mut controlmessage = 382 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 383 controlmessage 384 .write_window_acknowledgement_size(window_size) 385 .await?; 386 Ok(()) 387 } 388 389 pub async fn send_set_buffer_length( 390 &mut self, 391 stream_id: u32, 392 ms: u32, 393 ) -> Result<(), SessionError> { 394 let mut eventmessages = EventMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 395 eventmessages.write_set_buffer_length(stream_id, ms).await?; 396 397 Ok(()) 398 } 399 400 pub async fn on_result_connect(&mut self) -> Result<(), SessionError> { 401 let mut controlmessage = 402 ProtocolControlMessagesWriter::new(AsyncBytesWriter::new(self.io.clone())); 403 controlmessage.write_acknowledgement(3107).await?; 404 405 let mut netstream = NetStreamWriter::new(BytesWriter::new(), Arc::clone(&self.io)); 406 netstream 407 .release_stream(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name) 408 .await?; 409 netstream 410 .fcpublish(&(define::TRANSACTION_ID_CONNECT as f64), &self.stream_name) 411 .await?; 412 413 self.state = ClientSessionState::CreateStream; 414 415 Ok(()) 416 } 417 418 pub fn on_result_create_stream(&mut self) -> Result<(), SessionError> { 419 match self.client_type { 420 ClientType::Play => { 421 self.state = ClientSessionState::Play; 422 } 423 ClientType::Publish => { 424 self.state = ClientSessionState::PublishingContent; 425 } 426 } 427 Ok(()) 428 } 429 430 pub fn on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError> { 431 self.unpacketizer 432 .update_max_chunk_size(chunk_size.clone() as usize); 433 Ok(()) 434 } 435 436 pub async fn on_set_peer_bandwidth(&mut self) -> Result<(), SessionError> { 437 self.send_window_acknowledgement_size(250000).await?; 438 Ok(()) 439 } 440 441 pub fn on_error(&mut self) -> Result<(), SessionError> { 442 Ok(()) 443 } 444 445 pub async fn on_status( 446 &mut self, 447 obj: &HashMap<String, Amf0ValueType>, 448 ) -> Result<(), SessionError> { 449 println!("on_status==="); 450 if let Some(Amf0ValueType::UTF8String(code_info)) = obj.get("code") { 451 match &code_info[..] == "NetStream.Publish.Start" { 452 true => { 453 self.state = ClientSessionState::StartPublish; 454 self.common 455 .subscribe_from_channels( 456 self.app_name.clone(), 457 self.stream_name.clone(), 458 self.session_id, 459 ) 460 .await?; 461 } 462 false => {} 463 } 464 } 465 println!("{}", obj.len()); 466 Ok(()) 467 } 468 } 469