1 use super::errors::SessionError; 2 3 use crate::chunk::define::{chunk_type, csid_type, CHUNK_SIZE}; 4 use crate::chunk::unpacketizer::ChunkUnpacketizer; 5 use crate::chunk::unpacketizer::UnpackResult; 6 use crate::chunk::{packetizer::ChunkPacketizer, ChunkInfo}; 7 8 use super::errors::SessionErrorValue; 9 use crate::handshake::handshake::SimpleHandshakeClient; 10 11 use crate::messages::define::msg_type_id; 12 use crate::messages::define::RtmpMessageData; 13 use crate::messages::parser::MessageParser; 14 15 use crate::amf0::Amf0ValueType; 16 17 use netio::bytes_writer::AsyncBytesWriter; 18 19 use netio::bytes_reader::BytesReader; 20 use netio::bytes_writer::BytesWriter; 21 use netio::netio::NetworkIO; 22 23 use std::time::Duration; 24 25 use crate::handshake::handshake::ClientHandshakeState; 26 use crate::netconnection::commands::ConnectProperties; 27 use crate::netconnection::commands::NetConnection; 28 use crate::netstream::commands::NetStream; 29 use crate::protocol_control_messages::control_messages::ControlMessages; 30 31 use crate::user_control_messages::event_messages::EventMessages; 32 33 use std::collections::HashMap; 34 35 use super::define; 36 use tokio::net::TcpStream; 37 38 use bytes::BytesMut; 39 use std::sync::Arc; 40 use tokio::sync::Mutex; 41 42 use std::cell::{RefCell, RefMut}; 43 use std::rc::Rc; 44 45 enum ClientSessionState { 46 Handshake, 47 Connect, 48 CreateStream, 49 Play, 50 PublishingContent, 51 } 52 53 enum ClientSessionPlayState { 54 Handshake, 55 Connect, 56 CreateStream, 57 Play, 58 } 59 60 enum ClientSessionPublishState { 61 Handshake, 62 Connect, 63 CreateStream, 64 PublishingContent, 65 } 66 67 enum ClientType { 68 Play, 69 Publish, 70 } 71 pub struct ClientSession { 72 packetizer: ChunkPacketizer, 73 unpacketizer: ChunkUnpacketizer, 74 handshaker: SimpleHandshakeClient, 75 io: Arc<Mutex<NetworkIO>>, 76 77 play_state: ClientSessionPlayState, 78 publish_state: ClientSessionPublishState, 79 state: ClientSessionState, 80 client_type: ClientType, 81 stream_name: String, 82 } 83 84 impl ClientSession { 85 fn new( 86 stream: TcpStream, 87 timeout: Duration, 88 client_type: ClientType, 89 stream_name: String, 90 ) -> Self { 91 let net_io = Arc::new(Mutex::new(NetworkIO::new(stream, timeout))); 92 93 // let reader = BytesReader::new(BytesMut::new()); 94 95 Self { 96 io: Arc::clone(&net_io), 97 98 packetizer: ChunkPacketizer::new(Arc::clone(&net_io)), 99 unpacketizer: ChunkUnpacketizer::new(), 100 handshaker: SimpleHandshakeClient::new(Arc::clone(&net_io)), 101 102 play_state: ClientSessionPlayState::Handshake, 103 publish_state: ClientSessionPublishState::Handshake, 104 state: ClientSessionState::Handshake, 105 client_type: client_type, 106 stream_name: stream_name, 107 } 108 } 109 110 pub async fn run(&mut self) -> Result<(), SessionError> { 111 loop { 112 match self.state { 113 ClientSessionState::Handshake => { 114 self.handshake().await?; 115 } 116 ClientSessionState::Connect => { 117 self.send_connect(&(define::TRANSACTION_ID_CONNECT as f64)) 118 .await?; 119 } 120 ClientSessionState::CreateStream => { 121 self.send_create_stream(&(define::TRANSACTION_ID_CREATE_STREAM as f64)) 122 .await?; 123 } 124 ClientSessionState::Play => { 125 self.send_play(&0.0, &self.stream_name.clone(), &0.0, &0.0, &false) 126 .await?; 127 } 128 ClientSessionState::PublishingContent => { 129 self.send_publish(&0.0, &self.stream_name.clone(), &"live".to_string()) 130 .await?; 131 } 132 } 133 134 let data = self.io.lock().await.read().await?; 135 self.unpacketizer.extend_data(&data[..]); 136 let result = self.unpacketizer.read_chunk()?; 137 138 match result { 139 UnpackResult::ChunkInfo(chunk_info) => { 140 let mut message_parser = MessageParser::new(chunk_info); 141 let mut msg = message_parser.parse()?; 142 143 self.process_messages(&mut msg).await?; 144 } 145 _ => {} 146 } 147 } 148 149 // Ok(()) 150 } 151 152 async fn handshake(&mut self) -> Result<(), SessionError> { 153 loop { 154 self.handshaker.handshake().await?; 155 if self.handshaker.state == ClientHandshakeState::Finish { 156 break; 157 } 158 159 let data = self.io.lock().await.read().await?; 160 self.handshaker.extend_data(&data[..]); 161 } 162 self.state = ClientSessionState::Connect; 163 164 Ok(()) 165 } 166 167 pub async fn process_messages( 168 &mut self, 169 msg: &mut RtmpMessageData, 170 ) -> Result<(), SessionError> { 171 match msg { 172 RtmpMessageData::Amf0Command { 173 command_name, 174 transaction_id, 175 command_object, 176 others, 177 } => self.process_amf0_command_message( 178 command_name, 179 transaction_id, 180 command_object, 181 others, 182 )?, 183 RtmpMessageData::SetPeerBandwidth { properties } => { 184 self.on_set_peer_bandwidth().await? 185 } 186 RtmpMessageData::SetChunkSize { chunk_size } => self.on_set_chunk_size(chunk_size)?, 187 RtmpMessageData::AudioData { data } => {} 188 RtmpMessageData::VideoData { data } => {} 189 190 _ => {} 191 } 192 Ok(()) 193 } 194 195 pub fn process_amf0_command_message( 196 &mut self, 197 command_name: &Amf0ValueType, 198 transaction_id: &Amf0ValueType, 199 command_object: &Amf0ValueType, 200 others: &mut Vec<Amf0ValueType>, 201 ) -> Result<(), SessionError> { 202 let empty_cmd_name = &String::new(); 203 let cmd_name = match command_name { 204 Amf0ValueType::UTF8String(str) => str, 205 _ => empty_cmd_name, 206 }; 207 208 let transaction_id = match transaction_id { 209 Amf0ValueType::Number(number) => number.clone() as u8, 210 _ => 0, 211 }; 212 213 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 214 let obj = match command_object { 215 Amf0ValueType::Object(obj) => obj, 216 // Amf0ValueType::Null => 217 _ => &empty_cmd_obj, 218 }; 219 220 match cmd_name.as_str() { 221 "_reslut" => match transaction_id { 222 define::TRANSACTION_ID_CONNECT => { 223 self.on_result_connect()?; 224 } 225 define::TRANSACTION_ID_CREATE_STREAM => { 226 self.on_result_create_stream()?; 227 } 228 _ => {} 229 }, 230 "_error" => { 231 self.on_error()?; 232 } 233 "onStatus" => { 234 match others.remove(0) { 235 Amf0ValueType::Object(obj) => self.on_status(&obj), 236 _ => Err(SessionError { 237 value: SessionErrorValue::Amf0ValueCountNotCorrect, 238 }), 239 }; 240 } 241 242 _ => {} 243 } 244 245 Ok(()) 246 } 247 248 pub async fn send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 249 let app_name = String::from("app"); 250 let properties = ConnectProperties::new(app_name); 251 252 let mut netconnection = NetConnection::new(BytesWriter::new()); 253 let data = netconnection.connect(transaction_id, &properties)?; 254 255 let mut chunk_info = ChunkInfo::new( 256 csid_type::COMMAND_AMF0_AMF3, 257 chunk_type::TYPE_0, 258 0, 259 data.len() as u32, 260 msg_type_id::COMMAND_AMF0, 261 0, 262 data, 263 ); 264 265 self.packetizer.write_chunk(&mut chunk_info).await?; 266 Ok(()) 267 } 268 269 pub async fn send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 270 let mut netconnection = NetConnection::new(BytesWriter::new()); 271 let data = netconnection.create_stream(transaction_id)?; 272 273 let mut chunk_info = ChunkInfo::new( 274 csid_type::COMMAND_AMF0_AMF3, 275 chunk_type::TYPE_0, 276 0, 277 data.len() as u32, 278 msg_type_id::COMMAND_AMF0, 279 0, 280 data, 281 ); 282 283 self.packetizer.write_chunk(&mut chunk_info).await?; 284 285 Ok(()) 286 } 287 288 pub async fn send_delete_stream( 289 &mut self, 290 transaction_id: &f64, 291 stream_id: &f64, 292 ) -> Result<(), SessionError> { 293 let mut netstream = NetStream::new(BytesWriter::new()); 294 let data = netstream.delete_stream(transaction_id, stream_id)?; 295 296 let mut chunk_info = ChunkInfo::new( 297 csid_type::COMMAND_AMF0_AMF3, 298 chunk_type::TYPE_0, 299 0, 300 data.len() as u32, 301 msg_type_id::COMMAND_AMF0, 302 0, 303 data, 304 ); 305 306 self.packetizer.write_chunk(&mut chunk_info).await?; 307 Ok(()) 308 } 309 310 pub async fn send_publish( 311 &mut self, 312 transaction_id: &f64, 313 stream_name: &String, 314 stream_type: &String, 315 ) -> Result<(), SessionError> { 316 let mut netstream = NetStream::new(BytesWriter::new()); 317 let data = netstream.publish(transaction_id, stream_name, stream_type)?; 318 319 let mut chunk_info = ChunkInfo::new( 320 csid_type::COMMAND_AMF0_AMF3, 321 chunk_type::TYPE_0, 322 0, 323 data.len() as u32, 324 msg_type_id::COMMAND_AMF0, 325 0, 326 data, 327 ); 328 329 self.packetizer.write_chunk(&mut chunk_info).await?; 330 331 Ok(()) 332 } 333 334 pub async fn send_play( 335 &mut self, 336 transaction_id: &f64, 337 stream_name: &String, 338 start: &f64, 339 duration: &f64, 340 reset: &bool, 341 ) -> Result<(), SessionError> { 342 let mut netstream = NetStream::new(BytesWriter::new()); 343 let data = netstream.play(transaction_id, stream_name, start, duration, reset)?; 344 345 let mut chunk_info = ChunkInfo::new( 346 csid_type::COMMAND_AMF0_AMF3, 347 chunk_type::TYPE_0, 348 0, 349 data.len() as u32, 350 msg_type_id::COMMAND_AMF0, 351 0, 352 data, 353 ); 354 355 self.packetizer.write_chunk(&mut chunk_info).await?; 356 357 Ok(()) 358 } 359 360 pub async fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 361 let mut controlmessage = ControlMessages::new(AsyncBytesWriter::new(self.io.clone())); 362 controlmessage.write_set_chunk_size(CHUNK_SIZE).await?; 363 Ok(()) 364 } 365 366 pub async fn send_window_acknowledgement_size( 367 &mut self, 368 window_size: u32, 369 ) -> Result<(), SessionError> { 370 let mut controlmessage = ControlMessages::new(AsyncBytesWriter::new(self.io.clone())); 371 controlmessage 372 .write_window_acknowledgement_size(window_size) 373 .await?; 374 Ok(()) 375 } 376 377 pub async fn send_set_buffer_length( 378 &mut self, 379 stream_id: u32, 380 ms: u32, 381 ) -> Result<(), SessionError> { 382 let mut eventmessages = EventMessages::new(AsyncBytesWriter::new(self.io.clone())); 383 eventmessages.set_buffer_length(stream_id, ms).await?; 384 385 Ok(()) 386 } 387 388 pub async fn send_audio(&mut self, data: BytesMut) -> Result<(), SessionError> { 389 let mut chunk_info = ChunkInfo::new( 390 csid_type::AUDIO, 391 chunk_type::TYPE_0, 392 0, 393 data.len() as u32, 394 msg_type_id::AUDIO, 395 0, 396 data, 397 ); 398 399 self.packetizer.write_chunk(&mut chunk_info).await?; 400 401 Ok(()) 402 } 403 404 pub async fn send_video(&mut self, data: BytesMut) -> Result<(), SessionError> { 405 let mut chunk_info = ChunkInfo::new( 406 csid_type::VIDEO, 407 chunk_type::TYPE_0, 408 0, 409 data.len() as u32, 410 msg_type_id::VIDEO, 411 0, 412 data, 413 ); 414 415 self.packetizer.write_chunk(&mut chunk_info).await?; 416 417 Ok(()) 418 } 419 420 pub fn on_result_connect(&mut self) -> Result<(), SessionError> { 421 self.state = ClientSessionState::CreateStream; 422 Ok(()) 423 } 424 425 pub fn on_result_create_stream(&mut self) -> Result<(), SessionError> { 426 match self.client_type { 427 ClientType::Play => { 428 self.state = ClientSessionState::Play; 429 } 430 ClientType::Publish => { 431 self.state = ClientSessionState::PublishingContent; 432 } 433 } 434 Ok(()) 435 } 436 437 pub fn on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError> { 438 self.unpacketizer 439 .update_max_chunk_size(chunk_size.clone() as usize); 440 Ok(()) 441 } 442 443 pub async fn on_set_peer_bandwidth(&mut self) -> Result<(), SessionError> { 444 self.send_window_acknowledgement_size(250000).await?; 445 Ok(()) 446 } 447 pub fn on_error(&mut self) -> Result<(), SessionError> { 448 Ok(()) 449 } 450 451 pub fn on_status(&mut self, obj: &HashMap<String, Amf0ValueType>) -> Result<(), SessionError> { 452 Ok(()) 453 } 454 } 455