1 use { 2 super::{ 3 define::{SessionSubType, SessionType}, 4 errors::{SessionError, SessionErrorValue}, 5 }, 6 crate::{ 7 channels::define::{ 8 ChannelData, ChannelDataConsumer, ChannelDataProducer, ChannelEvent, 9 ChannelEventProducer, 10 }, 11 chunk::{ 12 define::{chunk_type, csid_type}, 13 packetizer::ChunkPacketizer, 14 ChunkInfo, 15 }, 16 messages::define::msg_type_id, 17 }, 18 bytes::BytesMut, 19 networkio::networkio::NetworkIO, 20 std::{sync::Arc, time::Duration}, 21 tokio::{ 22 sync::{mpsc, oneshot, Mutex}, 23 time::sleep, 24 }, 25 }; 26 27 pub struct SessionInfo { 28 pub session_id: u64, 29 pub session_sub_type: SessionSubType, 30 } 31 pub struct Common { 32 packetizer: ChunkPacketizer, 33 34 data_consumer: ChannelDataConsumer, 35 data_producer: ChannelDataProducer, 36 37 event_producer: ChannelEventProducer, 38 session_type: SessionType, 39 } 40 41 impl Common { 42 pub fn new( 43 net_io: Arc<Mutex<NetworkIO>>, 44 event_producer: ChannelEventProducer, 45 session_type: SessionType, 46 ) -> Self { 47 //only used for init,since I don't found a better way to deal with this. 48 let (init_producer, init_consumer) = mpsc::unbounded_channel(); 49 50 Self { 51 packetizer: ChunkPacketizer::new(Arc::clone(&net_io)), 52 53 data_producer: init_producer, 54 data_consumer: init_consumer, 55 56 event_producer, 57 session_type, 58 } 59 } 60 pub async fn send_channel_data(&mut self) -> Result<(), SessionError> { 61 loop { 62 if let Some(data) = self.data_consumer.recv().await { 63 match data { 64 ChannelData::Audio { timestamp, data } => { 65 self.send_audio(data, timestamp).await?; 66 } 67 ChannelData::Video { timestamp, data } => { 68 self.send_video(data, timestamp).await?; 69 } 70 ChannelData::MetaData { timestamp, data } => { 71 self.send_metadata(data, timestamp).await?; 72 } 73 } 74 } 75 } 76 } 77 78 pub async fn send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> { 79 let mut chunk_info = ChunkInfo::new( 80 csid_type::AUDIO, 81 chunk_type::TYPE_0, 82 timestamp, 83 data.len() as u32, 84 msg_type_id::AUDIO, 85 0, 86 data, 87 ); 88 89 self.packetizer.write_chunk(&mut chunk_info).await?; 90 91 Ok(()) 92 } 93 94 pub async fn send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> { 95 let mut chunk_info = ChunkInfo::new( 96 csid_type::VIDEO, 97 chunk_type::TYPE_0, 98 timestamp, 99 data.len() as u32, 100 msg_type_id::VIDEO, 101 0, 102 data, 103 ); 104 105 self.packetizer.write_chunk(&mut chunk_info).await?; 106 107 Ok(()) 108 } 109 110 pub async fn send_metadata( 111 &mut self, 112 data: BytesMut, 113 timestamp: u32, 114 ) -> Result<(), SessionError> { 115 let mut chunk_info = ChunkInfo::new( 116 csid_type::DATA_AMF0_AMF3, 117 chunk_type::TYPE_0, 118 timestamp, 119 data.len() as u32, 120 msg_type_id::DATA_AMF0, 121 0, 122 data, 123 ); 124 125 self.packetizer.write_chunk(&mut chunk_info).await?; 126 Ok(()) 127 } 128 129 pub fn on_video_data( 130 &mut self, 131 data: &mut BytesMut, 132 timestamp: &u32, 133 ) -> Result<(), SessionError> { 134 let data = ChannelData::Video { 135 timestamp: timestamp.clone(), 136 data: data.clone(), 137 }; 138 139 //print!("receive video data\n"); 140 match self.data_producer.send(data) { 141 Ok(_) => {} 142 Err(err) => { 143 print!("send video err {}\n", err); 144 return Err(SessionError { 145 value: SessionErrorValue::SendChannelDataErr, 146 }); 147 } 148 } 149 150 Ok(()) 151 } 152 153 pub fn on_audio_data( 154 &mut self, 155 data: &mut BytesMut, 156 timestamp: &u32, 157 ) -> Result<(), SessionError> { 158 let data = ChannelData::Audio { 159 timestamp: timestamp.clone(), 160 data: data.clone(), 161 }; 162 163 match self.data_producer.send(data) { 164 Ok(_) => {} 165 Err(err) => { 166 print!("receive audio err {}\n", err); 167 return Err(SessionError { 168 value: SessionErrorValue::SendChannelDataErr, 169 }); 170 } 171 } 172 173 Ok(()) 174 } 175 176 pub fn on_meta_data( 177 &mut self, 178 body: &mut BytesMut, 179 timestamp: &u32, 180 ) -> Result<(), SessionError> { 181 let data = ChannelData::MetaData { 182 timestamp: timestamp.clone(), 183 data: body.clone(), 184 }; 185 186 match self.data_producer.send(data) { 187 Ok(_) => {} 188 Err(_) => { 189 return Err(SessionError { 190 value: SessionErrorValue::SendChannelDataErr, 191 }) 192 } 193 } 194 195 Ok(()) 196 } 197 198 fn get_session_info(&mut self, session_id: u64) -> SessionInfo { 199 match self.session_type { 200 SessionType::Client => SessionInfo { 201 session_id: session_id, 202 session_sub_type: SessionSubType::Publisher, 203 }, 204 SessionType::Server => SessionInfo { 205 session_id: session_id, 206 session_sub_type: SessionSubType::Player, 207 }, 208 } 209 } 210 211 pub async fn subscribe_from_channels( 212 &mut self, 213 app_name: String, 214 stream_name: String, 215 session_id: u64, 216 ) -> Result<(), SessionError> { 217 print!( 218 "subscribe info............{} {} {}\n", 219 app_name, 220 stream_name.clone(), 221 session_id 222 ); 223 224 let mut retry_count: u8 = 0; 225 226 loop { 227 let (sender, receiver) = oneshot::channel(); 228 229 let subscribe_event = ChannelEvent::Subscribe { 230 app_name: app_name.clone(), 231 stream_name: stream_name.clone(), 232 session_info: self.get_session_info(session_id), 233 responder: sender, 234 }; 235 let rv = self.event_producer.send(subscribe_event); 236 match rv { 237 Err(_) => { 238 return Err(SessionError { 239 value: SessionErrorValue::ChannelEventSendErr, 240 }) 241 } 242 _ => {} 243 } 244 245 match receiver.await { 246 Ok(consumer) => { 247 self.data_consumer = consumer; 248 break; 249 } 250 Err(_) => { 251 if retry_count > 10 { 252 return Err(SessionError { 253 value: SessionErrorValue::SubscribeCountLimitReach, 254 }); 255 } 256 } 257 } 258 259 sleep(Duration::from_millis(800)).await; 260 retry_count = retry_count + 1; 261 } 262 263 Ok(()) 264 } 265 266 pub async fn unsubscribe_from_channels( 267 &mut self, 268 app_name: String, 269 stream_name: String, 270 session_id: u64, 271 ) -> Result<(), SessionError> { 272 let subscribe_event = ChannelEvent::UnSubscribe { 273 app_name, 274 stream_name, 275 session_info: self.get_session_info(session_id), 276 }; 277 if let Err(err) = self.event_producer.send(subscribe_event) { 278 print!("unsubscribe_from_channels err {}\n", err) 279 } 280 281 Ok(()) 282 } 283 284 pub async fn publish_to_channels( 285 &mut self, 286 app_name: String, 287 stream_name: String, 288 ) -> Result<(), SessionError> { 289 let (sender, receiver) = oneshot::channel(); 290 let publish_event = ChannelEvent::Publish { 291 app_name, 292 stream_name, 293 responder: sender, 294 }; 295 296 let rv = self.event_producer.send(publish_event); 297 match rv { 298 Err(_) => { 299 return Err(SessionError { 300 value: SessionErrorValue::ChannelEventSendErr, 301 }) 302 } 303 _ => {} 304 } 305 306 match receiver.await { 307 Ok(producer) => { 308 //print!("set producer before\n"); 309 self.data_producer = producer; 310 //print!("set producer after\n"); 311 } 312 Err(err) => { 313 print!("publish_to_channels err{}\n", err) 314 } 315 } 316 Ok(()) 317 } 318 319 pub async fn unpublish_to_channels( 320 &mut self, 321 app_name: String, 322 stream_name: String, 323 ) -> Result<(), SessionError> { 324 let unpublish_event = ChannelEvent::UnPublish { 325 app_name, 326 stream_name, 327 }; 328 329 let rv = self.event_producer.send(unpublish_event); 330 match rv { 331 Err(_) => { 332 println!("unpublish_to_channels error."); 333 return Err(SessionError { 334 value: SessionErrorValue::ChannelEventSendErr, 335 }); 336 } 337 _ => { 338 println!("unpublish_to_channels successfully.") 339 } 340 } 341 Ok(()) 342 } 343 } 344