1 use super::errors::SessionError; 2 3 use crate::chunk::define::{chunk_type, csid_type, CHUNK_SIZE}; 4 use crate::chunk::unpacketizer::ChunkUnpacketizer; 5 use crate::chunk::unpacketizer::UnpackResult; 6 use crate::chunk::{packetizer::ChunkPacketizer, ChunkInfo}; 7 8 use super::errors::SessionErrorValue; 9 use crate::handshake::handshake::SimpleHandshakeClient; 10 11 use crate::messages::define::msg_type_id; 12 use crate::messages::define::RtmpMessageData; 13 use crate::messages::parser::MessageParser; 14 15 use crate::amf0::Amf0ValueType; 16 17 use netio::bytes_writer::AsyncBytesWriter; 18 19 use netio::bytes_writer::BytesWriter; 20 use netio::netio::NetworkIO; 21 22 use std::time::Duration; 23 24 use crate::handshake::handshake::ClientHandshakeState; 25 use crate::netconnection::commands::ConnectProperties; 26 use crate::netconnection::commands::NetConnection; 27 use crate::netstream::commands::NetStream; 28 use crate::protocol_control_messages::control_messages::ControlMessages; 29 30 use crate::user_control_messages::event_messages::EventMessages; 31 32 use std::collections::HashMap; 33 34 use super::define; 35 use tokio::prelude::*; 36 37 use bytes::BytesMut; 38 use std::sync::Arc; 39 use tokio::sync::Mutex; 40 41 enum ClientSessionState { 42 Handshake, 43 Connect, 44 CreateStream, 45 Play, 46 PublishingContent, 47 } 48 49 enum ClientSessionPlayState { 50 Handshake, 51 Connect, 52 CreateStream, 53 Play, 54 } 55 56 enum ClientSessionPublishState { 57 Handshake, 58 Connect, 59 CreateStream, 60 PublishingContent, 61 } 62 63 enum ClientType { 64 Play, 65 Publish, 66 } 67 pub struct ClientSession<S> 68 where 69 S: AsyncRead + AsyncWrite + Unpin + Send + Sync, 70 { 71 packetizer: ChunkPacketizer<S>, 72 unpacketizer: ChunkUnpacketizer, 73 handshaker: SimpleHandshakeClient<S>, 74 io: Arc<Mutex<NetworkIO<S>>>, 75 76 play_state: ClientSessionPlayState, 77 publish_state: ClientSessionPublishState, 78 state: ClientSessionState, 79 client_type: ClientType, 80 stream_name: String, 81 } 82 83 impl<S> ClientSession<S> 84 where 85 S: AsyncRead + AsyncWrite + Unpin + Send + Sync, 86 { 87 fn new(stream: S, timeout: Duration, client_type: ClientType, stream_name: String) -> Self { 88 let net_io = Arc::new(Mutex::new(NetworkIO::new(stream, timeout))); 89 let bytes_writer = AsyncBytesWriter::new(Arc::clone(&net_io)); 90 91 Self { 92 io: Arc::clone(&net_io), 93 94 packetizer: ChunkPacketizer::new(bytes_writer), 95 unpacketizer: ChunkUnpacketizer::new(), 96 handshaker: SimpleHandshakeClient::new(Arc::clone(&net_io)), 97 98 play_state: ClientSessionPlayState::Handshake, 99 publish_state: ClientSessionPublishState::Handshake, 100 state: ClientSessionState::Handshake, 101 client_type: client_type, 102 stream_name: stream_name, 103 } 104 } 105 106 pub async fn run(&mut self) -> Result<(), SessionError> { 107 loop { 108 match self.state { 109 ClientSessionState::Handshake => { 110 self.handshake().await?; 111 } 112 ClientSessionState::Connect => { 113 self.send_connect(&(define::TRANSACTION_ID_CONNECT as f64)) 114 .await?; 115 } 116 ClientSessionState::CreateStream => { 117 self.send_create_stream(&(define::TRANSACTION_ID_CREATE_STREAM as f64)) 118 .await?; 119 } 120 ClientSessionState::Play => { 121 self.send_play(&0.0, &self.stream_name.clone(), &0.0, &0.0, &false) 122 .await?; 123 } 124 ClientSessionState::PublishingContent => { 125 self.send_publish(&0.0, &self.stream_name.clone(), &"live".to_string()) 126 .await?; 127 } 128 } 129 130 let data = self.io.lock().await.read().await?; 131 self.unpacketizer.extend_data(&data[..]); 132 let result = self.unpacketizer.read_chunk()?; 133 134 match result { 135 UnpackResult::ChunkInfo(chunk_info) => { 136 let mut message_parser = MessageParser::new(chunk_info); 137 let mut msg = message_parser.parse()?; 138 139 self.process_messages(&mut msg)?; 140 } 141 _ => {} 142 } 143 } 144 145 // Ok(()) 146 } 147 148 async fn handshake(&mut self) -> Result<(), SessionError> { 149 loop { 150 self.handshaker.handshake().await?; 151 if self.handshaker.state == ClientHandshakeState::Finish { 152 break; 153 } 154 155 let data = self.io.lock().await.read().await?; 156 self.handshaker.extend_data(&data[..]); 157 } 158 self.state = ClientSessionState::Connect; 159 160 Ok(()) 161 } 162 163 pub fn process_messages(&mut self, msg: &mut RtmpMessageData) -> Result<(), SessionError> { 164 match msg { 165 RtmpMessageData::Amf0Command { 166 command_name, 167 transaction_id, 168 command_object, 169 others, 170 } => self.process_amf0_command_message( 171 command_name, 172 transaction_id, 173 command_object, 174 others, 175 )?, 176 RtmpMessageData::SetPeerBandwidth { properties } => self.on_set_peer_bandwidth()?, 177 RtmpMessageData::SetChunkSize { chunk_size } => self.on_set_chunk_size(chunk_size)?, 178 RtmpMessageData::AudioData { data } => {} 179 RtmpMessageData::VideoData { data } => {} 180 181 _ => {} 182 } 183 Ok(()) 184 } 185 186 pub fn process_amf0_command_message( 187 &mut self, 188 command_name: &Amf0ValueType, 189 transaction_id: &Amf0ValueType, 190 command_object: &Amf0ValueType, 191 others: &mut Vec<Amf0ValueType>, 192 ) -> Result<(), SessionError> { 193 let empty_cmd_name = &String::new(); 194 let cmd_name = match command_name { 195 Amf0ValueType::UTF8String(str) => str, 196 _ => empty_cmd_name, 197 }; 198 199 let transaction_id = match transaction_id { 200 Amf0ValueType::Number(number) => number.clone() as u8, 201 _ => 0, 202 }; 203 204 let empty_cmd_obj: HashMap<String, Amf0ValueType> = HashMap::new(); 205 let obj = match command_object { 206 Amf0ValueType::Object(obj) => obj, 207 // Amf0ValueType::Null => 208 _ => &empty_cmd_obj, 209 }; 210 211 match cmd_name.as_str() { 212 "_reslut" => match transaction_id { 213 define::TRANSACTION_ID_CONNECT => { 214 self.on_result_connect()?; 215 } 216 define::TRANSACTION_ID_CREATE_STREAM => { 217 self.on_result_create_stream()?; 218 } 219 _ => {} 220 }, 221 "_error" => { 222 self.on_error()?; 223 } 224 "onStatus" => { 225 match others.remove(0) { 226 Amf0ValueType::Object(obj) => self.on_status(&obj), 227 _ => Err(SessionError { 228 value: SessionErrorValue::Amf0ValueCountNotCorrect, 229 }), 230 }; 231 } 232 233 _ => {} 234 } 235 236 Ok(()) 237 } 238 239 pub async fn send_connect(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 240 let app_name = String::from("app"); 241 let properties = ConnectProperties::new(app_name); 242 243 let mut netconnection = NetConnection::new(BytesWriter::new()); 244 let data = netconnection.connect(transaction_id, &properties)?; 245 246 let mut chunk_info = ChunkInfo::new( 247 csid_type::COMMAND_AMF0_AMF3, 248 chunk_type::TYPE_0, 249 0, 250 data.len() as u32, 251 msg_type_id::COMMAND_AMF0, 252 0, 253 data, 254 ); 255 256 self.packetizer.write_chunk(&mut chunk_info).await?; 257 Ok(()) 258 } 259 260 pub async fn send_create_stream(&mut self, transaction_id: &f64) -> Result<(), SessionError> { 261 let mut netconnection = NetConnection::new(BytesWriter::new()); 262 let data = netconnection.create_stream(transaction_id)?; 263 264 let mut chunk_info = ChunkInfo::new( 265 csid_type::COMMAND_AMF0_AMF3, 266 chunk_type::TYPE_0, 267 0, 268 data.len() as u32, 269 msg_type_id::COMMAND_AMF0, 270 0, 271 data, 272 ); 273 274 self.packetizer.write_chunk(&mut chunk_info).await?; 275 276 Ok(()) 277 } 278 279 pub async fn send_delete_stream( 280 &mut self, 281 transaction_id: &f64, 282 stream_id: &f64, 283 ) -> Result<(), SessionError> { 284 let mut netstream = NetStream::new(BytesWriter::new()); 285 let data = netstream.delete_stream(transaction_id, stream_id)?; 286 287 let mut chunk_info = ChunkInfo::new( 288 csid_type::COMMAND_AMF0_AMF3, 289 chunk_type::TYPE_0, 290 0, 291 data.len() as u32, 292 msg_type_id::COMMAND_AMF0, 293 0, 294 data, 295 ); 296 297 self.packetizer.write_chunk(&mut chunk_info).await?; 298 Ok(()) 299 } 300 301 pub async fn send_publish( 302 &mut self, 303 transaction_id: &f64, 304 stream_name: &String, 305 stream_type: &String, 306 ) -> Result<(), SessionError> { 307 let mut netstream = NetStream::new(BytesWriter::new()); 308 let data = netstream.publish(transaction_id, stream_name, stream_type)?; 309 310 let mut chunk_info = ChunkInfo::new( 311 csid_type::COMMAND_AMF0_AMF3, 312 chunk_type::TYPE_0, 313 0, 314 data.len() as u32, 315 msg_type_id::COMMAND_AMF0, 316 0, 317 data, 318 ); 319 320 self.packetizer.write_chunk(&mut chunk_info).await?; 321 322 Ok(()) 323 } 324 325 pub async fn send_play( 326 &mut self, 327 transaction_id: &f64, 328 stream_name: &String, 329 start: &f64, 330 duration: &f64, 331 reset: &bool, 332 ) -> Result<(), SessionError> { 333 let mut netstream = NetStream::new(BytesWriter::new()); 334 let data = netstream.play(transaction_id, stream_name, start, duration, reset)?; 335 336 let mut chunk_info = ChunkInfo::new( 337 csid_type::COMMAND_AMF0_AMF3, 338 chunk_type::TYPE_0, 339 0, 340 data.len() as u32, 341 msg_type_id::COMMAND_AMF0, 342 0, 343 data, 344 ); 345 346 self.packetizer.write_chunk(&mut chunk_info).await?; 347 348 Ok(()) 349 } 350 351 pub fn send_set_chunk_size(&mut self) -> Result<(), SessionError> { 352 let mut controlmessage = ControlMessages::new(AsyncBytesWriter::new(self.io.clone())); 353 controlmessage.write_set_chunk_size(CHUNK_SIZE)?; 354 Ok(()) 355 } 356 357 pub fn send_window_acknowledgement_size( 358 &mut self, 359 window_size: u32, 360 ) -> Result<(), SessionError> { 361 let mut controlmessage = ControlMessages::new(AsyncBytesWriter::new(self.io.clone())); 362 controlmessage.write_window_acknowledgement_size(window_size)?; 363 Ok(()) 364 } 365 366 pub async fn send_set_buffer_length(&mut self, stream_id: u32, ms: u32) -> Result<(), SessionError> { 367 let mut eventmessages = EventMessages::new(AsyncBytesWriter::new(self.io.clone())); 368 eventmessages.set_buffer_length(stream_id, ms).await?; 369 370 Ok(()) 371 } 372 373 pub async fn send_audio(&mut self, data: BytesMut) -> Result<(), SessionError> { 374 let mut chunk_info = ChunkInfo::new( 375 csid_type::AUDIO, 376 chunk_type::TYPE_0, 377 0, 378 data.len() as u32, 379 msg_type_id::AUDIO, 380 0, 381 data, 382 ); 383 384 self.packetizer.write_chunk(&mut chunk_info).await?; 385 386 Ok(()) 387 } 388 389 pub async fn send_video(&mut self, data: BytesMut) -> Result<(), SessionError> { 390 let mut chunk_info = ChunkInfo::new( 391 csid_type::VIDEO, 392 chunk_type::TYPE_0, 393 0, 394 data.len() as u32, 395 msg_type_id::VIDEO, 396 0, 397 data, 398 ); 399 400 self.packetizer.write_chunk(&mut chunk_info).await?; 401 402 Ok(()) 403 } 404 405 pub fn on_result_connect(&mut self) -> Result<(), SessionError> { 406 self.state = ClientSessionState::CreateStream; 407 Ok(()) 408 } 409 410 pub fn on_result_create_stream(&mut self) -> Result<(), SessionError> { 411 match self.client_type { 412 ClientType::Play => { 413 self.state = ClientSessionState::Play; 414 } 415 ClientType::Publish => { 416 self.state = ClientSessionState::PublishingContent; 417 } 418 } 419 Ok(()) 420 } 421 422 pub fn on_set_chunk_size(&mut self, chunk_size: &mut u32) -> Result<(), SessionError> { 423 self.unpacketizer 424 .update_max_chunk_size(chunk_size.clone() as usize); 425 Ok(()) 426 } 427 428 pub fn on_set_peer_bandwidth(&mut self) -> Result<(), SessionError> { 429 self.send_window_acknowledgement_size(250000)?; 430 Ok(()) 431 } 432 pub fn on_error(&mut self) -> Result<(), SessionError> { 433 Ok(()) 434 } 435 436 pub fn on_status(&mut self, obj: &HashMap<String, Amf0ValueType>) -> Result<(), SessionError> { 437 Ok(()) 438 } 439 } 440