1 use { 2 super::{ 3 define::{SessionSubType, SessionType}, 4 errors::{SessionError, SessionErrorValue}, 5 }, 6 crate::{ 7 amf0::Amf0ValueType, 8 channels::define::{ 9 ChannelData, ChannelDataConsumer, ChannelDataProducer, ChannelEvent, 10 ChannelEventProducer, 11 }, 12 chunk::{ 13 define::{chunk_type, csid_type, CHUNK_SIZE}, 14 packetizer::ChunkPacketizer, 15 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 16 ChunkInfo, 17 }, 18 config, 19 handshake::handshake::{ServerHandshakeState, SimpleHandshakeServer}, 20 messages::{ 21 define::{msg_type_id, RtmpMessageData}, 22 parser::MessageParser, 23 }, 24 netconnection::commands::NetConnection, 25 netstream::writer::NetStreamWriter, 26 protocol_control_messages::writer::ProtocolControlMessagesWriter, 27 user_control_messages::writer::EventMessagesWriter, 28 utils::print::print, 29 }, 30 bytes::BytesMut, 31 networkio::{ 32 bytes_writer::{AsyncBytesWriter, BytesWriter}, 33 networkio::NetworkIO, 34 }, 35 std::{collections::HashMap, sync::Arc}, 36 tokio::{ 37 net::TcpStream, 38 sync::{mpsc, oneshot, Mutex}, 39 }, 40 }; 41 42 pub struct SessionInfo { 43 pub session_id: u64, 44 pub session_sub_type: SessionSubType, 45 } 46 pub struct Common { 47 packetizer: ChunkPacketizer, 48 49 data_consumer: ChannelDataConsumer, 50 data_producer: ChannelDataProducer, 51 52 event_producer: ChannelEventProducer, 53 session_type: SessionType, 54 } 55 56 impl Common { 57 pub fn new( 58 net_io: Arc<Mutex<NetworkIO>>, 59 event_producer: ChannelEventProducer, 60 session_type: SessionType, 61 ) -> Self { 62 //only used for init,since I don't found a better way to deal with this. 63 let (init_producer, init_consumer) = mpsc::unbounded_channel(); 64 65 Self { 66 packetizer: ChunkPacketizer::new(Arc::clone(&net_io)), 67 68 data_producer: init_producer, 69 data_consumer: init_consumer, 70 71 event_producer, 72 session_type, 73 } 74 } 75 pub async fn send_channel_data(&mut self) -> Result<(), SessionError> { 76 loop { 77 if let Some(data) = self.data_consumer.recv().await { 78 match data { 79 ChannelData::Audio { timestamp, data } => { 80 self.send_audio(data, timestamp).await?; 81 } 82 ChannelData::Video { timestamp, data } => { 83 self.send_video(data, timestamp).await?; 84 } 85 ChannelData::MetaData { body } => { 86 self.send_metadata(body).await?; 87 } 88 } 89 } 90 } 91 } 92 93 pub async fn send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> { 94 let mut chunk_info = ChunkInfo::new( 95 csid_type::AUDIO, 96 chunk_type::TYPE_0, 97 timestamp, 98 data.len() as u32, 99 msg_type_id::AUDIO, 100 0, 101 data, 102 ); 103 104 self.packetizer.write_chunk(&mut chunk_info).await?; 105 106 Ok(()) 107 } 108 109 pub async fn send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> { 110 let mut chunk_info = ChunkInfo::new( 111 csid_type::VIDEO, 112 chunk_type::TYPE_0, 113 timestamp, 114 data.len() as u32, 115 msg_type_id::VIDEO, 116 0, 117 data, 118 ); 119 120 self.packetizer.write_chunk(&mut chunk_info).await?; 121 122 Ok(()) 123 } 124 125 pub async fn send_metadata(&mut self, data: BytesMut) -> Result<(), SessionError> { 126 let mut chunk_info = ChunkInfo::new( 127 csid_type::DATA_AMF0_AMF3, 128 chunk_type::TYPE_0, 129 0, 130 data.len() as u32, 131 msg_type_id::DATA_AMF0, 132 0, 133 data, 134 ); 135 136 self.packetizer.write_chunk(&mut chunk_info).await?; 137 Ok(()) 138 } 139 140 pub fn on_video_data( 141 &mut self, 142 data: &mut BytesMut, 143 timestamp: &u32, 144 ) -> Result<(), SessionError> { 145 let data = ChannelData::Video { 146 timestamp: timestamp.clone(), 147 data: data.clone(), 148 }; 149 150 //print!("receive video data\n"); 151 match self.data_producer.send(data) { 152 Ok(_) => {} 153 Err(err) => { 154 print!("send video err {}\n", err); 155 return Err(SessionError { 156 value: SessionErrorValue::SendChannelDataErr, 157 }); 158 } 159 } 160 161 Ok(()) 162 } 163 164 pub fn on_audio_data( 165 &mut self, 166 data: &mut BytesMut, 167 timestamp: &u32, 168 ) -> Result<(), SessionError> { 169 let data = ChannelData::Audio { 170 timestamp: timestamp.clone(), 171 data: data.clone(), 172 }; 173 174 match self.data_producer.send(data) { 175 Ok(_) => {} 176 Err(err) => { 177 print!("receive audio err {}\n", err); 178 return Err(SessionError { 179 value: SessionErrorValue::SendChannelDataErr, 180 }); 181 } 182 } 183 184 Ok(()) 185 } 186 187 pub fn on_amf_data(&mut self, body: &mut BytesMut) -> Result<(), SessionError> { 188 let data = ChannelData::MetaData { body: body.clone() }; 189 190 match self.data_producer.send(data) { 191 Ok(_) => {} 192 Err(_) => { 193 return Err(SessionError { 194 value: SessionErrorValue::SendChannelDataErr, 195 }) 196 } 197 } 198 199 Ok(()) 200 } 201 202 fn get_session_info(&mut self, session_id: u64) -> SessionInfo { 203 match self.session_type { 204 SessionType::Client => SessionInfo { 205 session_id: session_id, 206 session_sub_type: SessionSubType::Publisher, 207 }, 208 SessionType::Server => SessionInfo { 209 session_id: session_id, 210 session_sub_type: SessionSubType::Player, 211 }, 212 } 213 } 214 215 pub async fn subscribe_from_channels( 216 &mut self, 217 app_name: String, 218 stream_name: String, 219 session_id: u64, 220 ) -> Result<(), SessionError> { 221 print!( 222 "subscribe info............{} {} {}\n", 223 app_name, 224 stream_name.clone(), 225 session_id 226 ); 227 228 let (sender, receiver) = oneshot::channel(); 229 230 let subscribe_event = ChannelEvent::Subscribe { 231 app_name: app_name, 232 stream_name, 233 session_info: self.get_session_info(session_id), 234 responder: sender, 235 }; 236 237 let rv = self.event_producer.send(subscribe_event); 238 match rv { 239 Err(_) => { 240 return Err(SessionError { 241 value: SessionErrorValue::ChannelEventSendErr, 242 }) 243 } 244 _ => {} 245 } 246 247 match receiver.await { 248 Ok(consumer) => { 249 self.data_consumer = consumer; 250 } 251 Err(_) => {} 252 } 253 Ok(()) 254 } 255 256 pub async fn unsubscribe_from_channels( 257 &mut self, 258 app_name: String, 259 stream_name: String, 260 session_id: u64, 261 ) -> Result<(), SessionError> { 262 let subscribe_event = ChannelEvent::UnSubscribe { 263 app_name, 264 stream_name, 265 session_info: self.get_session_info(session_id), 266 }; 267 if let Err(err) = self.event_producer.send(subscribe_event) { 268 print!("unsubscribe_from_channels err {}\n", err) 269 } 270 271 Ok(()) 272 } 273 274 pub async fn publish_to_channels( 275 &mut self, 276 app_name: String, 277 stream_name: String, 278 connect_command_object: HashMap<String, Amf0ValueType>, 279 ) -> Result<(), SessionError> { 280 let (sender, receiver) = oneshot::channel(); 281 let publish_event = ChannelEvent::Publish { 282 app_name, 283 stream_name, 284 responder: sender, 285 connect_command_object, 286 }; 287 288 let rv = self.event_producer.send(publish_event); 289 match rv { 290 Err(_) => { 291 return Err(SessionError { 292 value: SessionErrorValue::ChannelEventSendErr, 293 }) 294 } 295 _ => {} 296 } 297 298 match receiver.await { 299 Ok(producer) => { 300 //print!("set producer before\n"); 301 self.data_producer = producer; 302 //print!("set producer after\n"); 303 } 304 Err(err) => { 305 print!("publish_to_channels err{}\n", err) 306 } 307 } 308 Ok(()) 309 } 310 311 pub async fn unpublish_to_channels( 312 &mut self, 313 app_name: String, 314 stream_name: String, 315 ) -> Result<(), SessionError> { 316 let unpublish_event = ChannelEvent::UnPublish { 317 app_name, 318 stream_name, 319 }; 320 321 let rv = self.event_producer.send(unpublish_event); 322 match rv { 323 Err(_) => { 324 println!("unpublish_to_channels error."); 325 return Err(SessionError { 326 value: SessionErrorValue::ChannelEventSendErr, 327 }); 328 } 329 _ => { 330 println!("unpublish_to_channels successfully.") 331 } 332 } 333 Ok(()) 334 } 335 } 336