1 use { 2 super::{ 3 define, 4 errors::{SessionError, SessionErrorValue}, 5 }, 6 crate::{ 7 amf0::Amf0ValueType, 8 channels::define::{ 9 ChannelData, ChannelDataConsumer, ChannelDataProducer, ChannelEvent, 10 ChannelEventProducer, 11 }, 12 chunk::{ 13 define::{chunk_type, csid_type, CHUNK_SIZE}, 14 packetizer::ChunkPacketizer, 15 unpacketizer::{ChunkUnpacketizer, UnpackResult}, 16 ChunkInfo, 17 }, 18 config, 19 handshake::handshake::{ServerHandshakeState, SimpleHandshakeServer}, 20 messages::{ 21 define::{msg_type_id, RtmpMessageData}, 22 parser::MessageParser, 23 }, 24 netconnection::commands::NetConnection, 25 netstream::writer::NetStreamWriter, 26 protocol_control_messages::writer::ProtocolControlMessagesWriter, 27 user_control_messages::writer::EventMessagesWriter, 28 }, 29 bytes::BytesMut, 30 networkio::{ 31 bytes_writer::{AsyncBytesWriter, BytesWriter}, 32 networkio::NetworkIO, 33 }, 34 std::{collections::HashMap, sync::Arc}, 35 tokio::{ 36 net::TcpStream, 37 sync::{mpsc, oneshot, Mutex}, 38 }, 39 }; 40 41 pub struct Common { 42 packetizer: ChunkPacketizer, 43 44 data_consumer: ChannelDataConsumer, 45 data_producer: ChannelDataProducer, 46 47 event_producer: ChannelEventProducer, 48 } 49 50 impl Common { 51 pub fn new(net_io: Arc<Mutex<NetworkIO>>, event_producer: ChannelEventProducer) -> Self { 52 //only used for init,since I don't found a better way to deal with this. 53 let (init_producer, init_consumer) = mpsc::unbounded_channel(); 54 55 Self { 56 packetizer: ChunkPacketizer::new(Arc::clone(&net_io)), 57 58 data_producer: init_producer, 59 data_consumer: init_consumer, 60 61 event_producer, 62 } 63 } 64 pub async fn send_channel_data(&mut self) -> Result<(), SessionError> { 65 loop { 66 if let Some(data) = self.data_consumer.recv().await { 67 match data { 68 ChannelData::Audio { timestamp, data } => { 69 //print!("send audio data\n"); 70 self.send_audio(data, timestamp).await?; 71 } 72 ChannelData::Video { timestamp, data } => { 73 //print!("send video data\n"); 74 self.send_video(data, timestamp).await?; 75 } 76 ChannelData::MetaData { body } => { 77 print!("send meta data\n"); 78 self.send_metadata(body).await?; 79 } 80 } 81 } 82 } 83 } 84 85 pub async fn send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> { 86 let mut chunk_info = ChunkInfo::new( 87 csid_type::AUDIO, 88 chunk_type::TYPE_0, 89 timestamp, 90 data.len() as u32, 91 msg_type_id::AUDIO, 92 0, 93 data, 94 ); 95 96 self.packetizer.write_chunk(&mut chunk_info).await?; 97 98 Ok(()) 99 } 100 101 pub async fn send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> { 102 let mut chunk_info = ChunkInfo::new( 103 csid_type::VIDEO, 104 chunk_type::TYPE_0, 105 timestamp, 106 data.len() as u32, 107 msg_type_id::VIDEO, 108 0, 109 data, 110 ); 111 112 self.packetizer.write_chunk(&mut chunk_info).await?; 113 114 Ok(()) 115 } 116 117 pub async fn send_metadata(&mut self, data: BytesMut) -> Result<(), SessionError> { 118 let mut chunk_info = ChunkInfo::new( 119 csid_type::DATA_AMF0_AMF3, 120 chunk_type::TYPE_0, 121 0, 122 data.len() as u32, 123 msg_type_id::DATA_AMF0, 124 0, 125 data, 126 ); 127 128 self.packetizer.write_chunk(&mut chunk_info).await?; 129 Ok(()) 130 } 131 132 pub fn on_video_data( 133 &mut self, 134 data: &mut BytesMut, 135 timestamp: &u32, 136 ) -> Result<(), SessionError> { 137 let data = ChannelData::Video { 138 timestamp: timestamp.clone(), 139 data: data.clone(), 140 }; 141 142 //print!("receive video data\n"); 143 match self.data_producer.send(data) { 144 Ok(_) => {} 145 Err(err) => { 146 print!("send video err {}\n", err); 147 return Err(SessionError { 148 value: SessionErrorValue::SendChannelDataErr, 149 }); 150 } 151 } 152 153 Ok(()) 154 } 155 156 pub fn on_audio_data( 157 &mut self, 158 data: &mut BytesMut, 159 timestamp: &u32, 160 ) -> Result<(), SessionError> { 161 let data = ChannelData::Audio { 162 timestamp: timestamp.clone(), 163 data: data.clone(), 164 }; 165 166 match self.data_producer.send(data) { 167 Ok(_) => {} 168 Err(err) => { 169 print!("receive audio err {}\n", err); 170 return Err(SessionError { 171 value: SessionErrorValue::SendChannelDataErr, 172 }); 173 } 174 } 175 176 Ok(()) 177 } 178 179 pub fn on_amf_data(&mut self, body: &mut BytesMut) -> Result<(), SessionError> { 180 let data = ChannelData::MetaData { body: body.clone() }; 181 182 match self.data_producer.send(data) { 183 Ok(_) => {} 184 Err(_) => { 185 return Err(SessionError { 186 value: SessionErrorValue::SendChannelDataErr, 187 }) 188 } 189 } 190 191 Ok(()) 192 } 193 194 pub async fn subscribe_from_channels( 195 &mut self, 196 app_name: String, 197 stream_name: String, 198 session_id: u64, 199 ) -> Result<(), SessionError> { 200 print!( 201 "subscribe info............{} {} {}\n", 202 app_name, 203 stream_name.clone(), 204 session_id 205 ); 206 207 let (sender, receiver) = oneshot::channel(); 208 let subscribe_event = ChannelEvent::Subscribe { 209 app_name: app_name, 210 stream_name, 211 session_id: session_id, 212 responder: sender, 213 }; 214 215 let rv = self.event_producer.send(subscribe_event); 216 match rv { 217 Err(_) => { 218 return Err(SessionError { 219 value: SessionErrorValue::ChannelEventSendErr, 220 }) 221 } 222 _ => {} 223 } 224 225 match receiver.await { 226 Ok(consumer) => { 227 self.data_consumer = consumer; 228 } 229 Err(_) => {} 230 } 231 Ok(()) 232 } 233 234 pub async fn unsubscribe_from_channels( 235 &mut self, 236 app_name: String, 237 stream_name: String, 238 session_id: u64, 239 ) -> Result<(), SessionError> { 240 let subscribe_event = ChannelEvent::UnSubscribe { 241 app_name, 242 stream_name, 243 session_id, 244 }; 245 246 println!("unsubscribe_from_channels"); 247 let _ = self.event_producer.send(subscribe_event); 248 249 Ok(()) 250 } 251 252 pub async fn publish_to_channels( 253 &mut self, 254 app_name: String, 255 stream_name: String, 256 ) -> Result<(), SessionError> { 257 let (sender, receiver) = oneshot::channel(); 258 let publish_event = ChannelEvent::Publish { 259 app_name, 260 stream_name, 261 responder: sender, 262 }; 263 264 let rv = self.event_producer.send(publish_event); 265 match rv { 266 Err(_) => { 267 return Err(SessionError { 268 value: SessionErrorValue::ChannelEventSendErr, 269 }) 270 } 271 _ => {} 272 } 273 274 match receiver.await { 275 Ok(producer) => { 276 print!("set producer before\n"); 277 self.data_producer = producer; 278 print!("set producer after\n"); 279 } 280 Err(err) => { 281 print!("publish_to_channels err{}\n", err) 282 } 283 } 284 Ok(()) 285 } 286 287 pub async fn unpublish_to_channels( 288 &mut self, 289 app_name: String, 290 stream_name: String, 291 ) -> Result<(), SessionError> { 292 let unpublish_event = ChannelEvent::UnPublish { 293 app_name, 294 stream_name, 295 }; 296 297 let rv = self.event_producer.send(unpublish_event); 298 match rv { 299 Err(_) => { 300 println!("unpublish_to_channels error."); 301 return Err(SessionError { 302 value: SessionErrorValue::ChannelEventSendErr, 303 }); 304 } 305 _ => { 306 println!("unpublish_to_channels successfully.") 307 } 308 } 309 Ok(()) 310 } 311 } 312