xref: /xiu/protocol/rtmp/src/session/common.rs (revision a4ef5d6c)
1 use streamhub::define::DataSender;
2 use tokio::sync::oneshot;
3 
4 use {
5     super::{
6         define::SessionType,
7         errors::{SessionError, SessionErrorValue},
8     },
9     crate::{
10         cache::errors::CacheError,
11         cache::Cache,
12         chunk::{
13             define::{chunk_type, csid_type},
14             packetizer::ChunkPacketizer,
15             ChunkInfo,
16         },
17         messages::define::msg_type_id,
18     },
19     async_trait::async_trait,
20     bytes::BytesMut,
21     std::fmt,
22     std::{net::SocketAddr, sync::Arc},
23     streamhub::{
24         define::{
25             FrameData, FrameDataReceiver, FrameDataSender, InformationSender, NotifyInfo,
26             PublishType, PublisherInfo, StreamHubEvent, StreamHubEventSender, SubscribeType,
27             SubscriberInfo, TStreamHandler,
28         },
29         errors::{ChannelError, ChannelErrorValue},
30         statistics::StreamStatistics,
31         stream::StreamIdentifier,
32         utils::Uuid,
33     },
34     tokio::sync::{mpsc, Mutex},
35 };
36 
37 pub struct Common {
38     //only Server Subscriber or Client Publisher needs to send out trunck data.
39     packetizer: Option<ChunkPacketizer>,
40 
41     data_receiver: FrameDataReceiver,
42     data_sender: FrameDataSender,
43 
44     event_producer: StreamHubEventSender,
45     pub session_type: SessionType,
46 
47     /*save the client side socket connected to the SeverSession */
48     remote_addr: Option<SocketAddr>,
49     /*request URL from client*/
50     pub request_url: String,
51     pub stream_handler: Arc<RtmpStreamHandler>,
52 }
53 
54 impl Common {
new( packetizer: Option<ChunkPacketizer>, event_producer: StreamHubEventSender, session_type: SessionType, remote_addr: Option<SocketAddr>, ) -> Self55     pub fn new(
56         packetizer: Option<ChunkPacketizer>,
57         event_producer: StreamHubEventSender,
58         session_type: SessionType,
59         remote_addr: Option<SocketAddr>,
60     ) -> Self {
61         //only used for init,since I don't found a better way to deal with this.
62         let (init_producer, init_consumer) = mpsc::unbounded_channel();
63 
64         Self {
65             packetizer,
66 
67             data_sender: init_producer,
68             data_receiver: init_consumer,
69 
70             event_producer,
71             session_type,
72             remote_addr,
73             request_url: String::default(),
74             stream_handler: Arc::new(RtmpStreamHandler::new()),
75             //cache: None,
76         }
77     }
send_channel_data(&mut self) -> Result<(), SessionError>78     pub async fn send_channel_data(&mut self) -> Result<(), SessionError> {
79         let mut retry_times = 0;
80         loop {
81             if let Some(data) = self.data_receiver.recv().await {
82                 match data {
83                     FrameData::Audio { timestamp, data } => {
84                         self.send_audio(data, timestamp).await?;
85                     }
86                     FrameData::Video { timestamp, data } => {
87                         self.send_video(data, timestamp).await?;
88                     }
89                     FrameData::MetaData { timestamp, data } => {
90                         self.send_metadata(data, timestamp).await?;
91                     }
92                     _ => {}
93                 }
94             } else {
95                 retry_times += 1;
96                 log::debug!(
97                     "send_channel_data: no data receives ,retry {} times!",
98                     retry_times
99                 );
100 
101                 if retry_times > 10 {
102                     return Err(SessionError {
103                         value: SessionErrorValue::NoMediaDataReceived,
104                     });
105                 }
106             }
107         }
108     }
109 
send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError>110     pub async fn send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> {
111         let mut chunk_info = ChunkInfo::new(
112             csid_type::AUDIO,
113             chunk_type::TYPE_0,
114             timestamp,
115             data.len() as u32,
116             msg_type_id::AUDIO,
117             0,
118             data,
119         );
120 
121         if let Some(packetizer) = &mut self.packetizer {
122             packetizer.write_chunk(&mut chunk_info).await?;
123         }
124 
125         Ok(())
126     }
127 
send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError>128     pub async fn send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> {
129         let mut chunk_info = ChunkInfo::new(
130             csid_type::VIDEO,
131             chunk_type::TYPE_0,
132             timestamp,
133             data.len() as u32,
134             msg_type_id::VIDEO,
135             0,
136             data,
137         );
138 
139         if let Some(packetizer) = &mut self.packetizer {
140             packetizer.write_chunk(&mut chunk_info).await?;
141         }
142 
143         Ok(())
144     }
145 
send_metadata( &mut self, data: BytesMut, timestamp: u32, ) -> Result<(), SessionError>146     pub async fn send_metadata(
147         &mut self,
148         data: BytesMut,
149         timestamp: u32,
150     ) -> Result<(), SessionError> {
151         let mut chunk_info = ChunkInfo::new(
152             csid_type::DATA_AMF0_AMF3,
153             chunk_type::TYPE_0,
154             timestamp,
155             data.len() as u32,
156             msg_type_id::DATA_AMF0,
157             0,
158             data,
159         );
160 
161         if let Some(packetizer) = &mut self.packetizer {
162             packetizer.write_chunk(&mut chunk_info).await?;
163         }
164 
165         Ok(())
166     }
167 
on_video_data( &mut self, data: &mut BytesMut, timestamp: &u32, ) -> Result<(), SessionError>168     pub async fn on_video_data(
169         &mut self,
170         data: &mut BytesMut,
171         timestamp: &u32,
172     ) -> Result<(), SessionError> {
173         let channel_data = FrameData::Video {
174             timestamp: *timestamp,
175             data: data.clone(),
176         };
177 
178         match self.data_sender.send(channel_data) {
179             Ok(_) => {}
180             Err(err) => {
181                 log::error!("send video err: {}", err);
182                 return Err(SessionError {
183                     value: SessionErrorValue::SendFrameDataErr,
184                 });
185             }
186         }
187 
188         self.stream_handler
189             .save_video_data(data, *timestamp)
190             .await?;
191 
192         Ok(())
193     }
194 
on_audio_data( &mut self, data: &mut BytesMut, timestamp: &u32, ) -> Result<(), SessionError>195     pub async fn on_audio_data(
196         &mut self,
197         data: &mut BytesMut,
198         timestamp: &u32,
199     ) -> Result<(), SessionError> {
200         let channel_data = FrameData::Audio {
201             timestamp: *timestamp,
202             data: data.clone(),
203         };
204 
205         match self.data_sender.send(channel_data) {
206             Ok(_) => {}
207             Err(err) => {
208                 log::error!("receive audio err {}", err);
209                 return Err(SessionError {
210                     value: SessionErrorValue::SendFrameDataErr,
211                 });
212             }
213         }
214 
215         self.stream_handler
216             .save_audio_data(data, *timestamp)
217             .await?;
218 
219         Ok(())
220     }
221 
on_meta_data( &mut self, data: &mut BytesMut, timestamp: &u32, ) -> Result<(), SessionError>222     pub async fn on_meta_data(
223         &mut self,
224         data: &mut BytesMut,
225         timestamp: &u32,
226     ) -> Result<(), SessionError> {
227         let channel_data = FrameData::MetaData {
228             timestamp: *timestamp,
229             data: data.clone(),
230         };
231 
232         match self.data_sender.send(channel_data) {
233             Ok(_) => {}
234             Err(_) => {
235                 return Err(SessionError {
236                     value: SessionErrorValue::SendFrameDataErr,
237                 })
238             }
239         }
240 
241         self.stream_handler.save_metadata(data, *timestamp).await;
242 
243         Ok(())
244     }
245 
get_subscriber_info(&mut self, sub_id: Uuid) -> SubscriberInfo246     fn get_subscriber_info(&mut self, sub_id: Uuid) -> SubscriberInfo {
247         let remote_addr = if let Some(addr) = self.remote_addr {
248             addr.to_string()
249         } else {
250             String::from("unknown")
251         };
252 
253         let sub_type = match self.session_type {
254             SessionType::Client => SubscribeType::PublisherRtmp,
255             SessionType::Server => SubscribeType::PlayerRtmp,
256         };
257 
258         SubscriberInfo {
259             id: sub_id,
260             /*rtmp local client subscribe from local rtmp session
261             and publish(relay) the rtmp steam to remote RTMP server*/
262             sub_type,
263             sub_data_type: streamhub::define::SubDataType::Frame,
264             notify_info: NotifyInfo {
265                 request_url: self.request_url.clone(),
266                 remote_addr,
267             },
268         }
269     }
270 
get_publisher_info(&mut self, sub_id: Uuid) -> PublisherInfo271     fn get_publisher_info(&mut self, sub_id: Uuid) -> PublisherInfo {
272         let remote_addr = if let Some(addr) = self.remote_addr {
273             addr.to_string()
274         } else {
275             String::from("unknown")
276         };
277 
278         let pub_type = match self.session_type {
279             SessionType::Client => PublishType::RelayRtmp,
280             SessionType::Server => PublishType::PushRtmp,
281         };
282 
283         PublisherInfo {
284             id: sub_id,
285             pub_type,
286             pub_data_type: streamhub::define::PubDataType::Frame,
287             notify_info: NotifyInfo {
288                 request_url: self.request_url.clone(),
289                 remote_addr,
290             },
291         }
292     }
293 
294     /*Subscribe from local channels and then send data to retmote common player or local RTMP relay push client*/
subscribe_from_channels( &mut self, app_name: String, stream_name: String, sub_id: Uuid, ) -> Result<(), SessionError>295     pub async fn subscribe_from_channels(
296         &mut self,
297         app_name: String,
298         stream_name: String,
299         sub_id: Uuid,
300     ) -> Result<(), SessionError> {
301         log::info!(
302             "subscribe_from_channels, app_name: {} stream_name: {} subscribe_id: {}",
303             app_name,
304             stream_name,
305             sub_id
306         );
307 
308         let identifier = StreamIdentifier::Rtmp {
309             app_name,
310             stream_name,
311         };
312 
313         let (event_result_sender, event_result_receiver) = oneshot::channel();
314 
315         let subscribe_event = StreamHubEvent::Subscribe {
316             identifier,
317             info: self.get_subscriber_info(sub_id),
318             result_sender: event_result_sender,
319         };
320         let rv = self.event_producer.send(subscribe_event);
321 
322         if rv.is_err() {
323             return Err(SessionError {
324                 value: SessionErrorValue::StreamHubEventSendErr,
325             });
326         }
327 
328         let recv = event_result_receiver.await??;
329         self.data_receiver = recv.frame_receiver.unwrap();
330 
331         Ok(())
332     }
333 
unsubscribe_from_channels( &mut self, app_name: String, stream_name: String, sub_id: Uuid, ) -> Result<(), SessionError>334     pub async fn unsubscribe_from_channels(
335         &mut self,
336         app_name: String,
337         stream_name: String,
338         sub_id: Uuid,
339     ) -> Result<(), SessionError> {
340         let identifier = StreamIdentifier::Rtmp {
341             app_name,
342             stream_name,
343         };
344 
345         let subscribe_event = StreamHubEvent::UnSubscribe {
346             identifier,
347             info: self.get_subscriber_info(sub_id),
348         };
349         if let Err(err) = self.event_producer.send(subscribe_event) {
350             log::error!("unsubscribe_from_channels err {}", err);
351         }
352 
353         Ok(())
354     }
355 
356     /*Begin to receive stream data from remote RTMP push client or local RTMP relay pull client*/
publish_to_channels( &mut self, app_name: String, stream_name: String, pub_id: Uuid, gop_num: usize, ) -> Result<(), SessionError>357     pub async fn publish_to_channels(
358         &mut self,
359         app_name: String,
360         stream_name: String,
361         pub_id: Uuid,
362         gop_num: usize,
363     ) -> Result<(), SessionError> {
364         self.stream_handler
365             .set_cache(Cache::new(app_name.clone(), stream_name.clone(), gop_num))
366             .await;
367 
368         let (event_result_sender, event_result_receiver) = oneshot::channel();
369         let publish_event = StreamHubEvent::Publish {
370             identifier: StreamIdentifier::Rtmp {
371                 app_name,
372                 stream_name,
373             },
374             info: self.get_publisher_info(pub_id),
375             stream_handler: self.stream_handler.clone(),
376             result_sender: event_result_sender,
377         };
378 
379         if self.event_producer.send(publish_event).is_err() {
380             return Err(SessionError {
381                 value: SessionErrorValue::StreamHubEventSendErr,
382             });
383         }
384 
385         let result = event_result_receiver.await??;
386         self.data_sender = result.0.unwrap();
387         Ok(())
388     }
389 
unpublish_to_channels( &mut self, app_name: String, stream_name: String, pub_id: Uuid, ) -> Result<(), SessionError>390     pub async fn unpublish_to_channels(
391         &mut self,
392         app_name: String,
393         stream_name: String,
394         pub_id: Uuid,
395     ) -> Result<(), SessionError> {
396         log::info!(
397             "unpublish_to_channels, app_name:{}, stream_name:{}",
398             app_name,
399             stream_name
400         );
401         let unpublish_event = StreamHubEvent::UnPublish {
402             identifier: StreamIdentifier::Rtmp {
403                 app_name: app_name.clone(),
404                 stream_name: stream_name.clone(),
405             },
406             info: self.get_publisher_info(pub_id),
407         };
408 
409         match self.event_producer.send(unpublish_event) {
410             Err(_) => {
411                 log::error!(
412                     "unpublish_to_channels error.app_name: {}, stream_name: {}",
413                     app_name,
414                     stream_name
415                 );
416                 return Err(SessionError {
417                     value: SessionErrorValue::StreamHubEventSendErr,
418                 });
419             }
420             _ => {
421                 log::info!(
422                     "unpublish_to_channels successfully.app_name: {}, stream_name: {}",
423                     app_name,
424                     stream_name
425                 );
426             }
427         }
428         Ok(())
429     }
430 }
431 
432 #[derive(Default)]
433 pub struct RtmpStreamHandler {
434     /*cache is used to save RTMP sequence/gops/meta data
435     which needs to be send to client(player) */
436     /*The cache will be used in different threads(save
437     cache in one thread and send cache data to different clients
438     in other threads) */
439     pub cache: Mutex<Option<Cache>>,
440 }
441 
442 impl RtmpStreamHandler {
new() -> Self443     pub fn new() -> Self {
444         Self {
445             cache: Mutex::new(None),
446         }
447     }
448 
set_cache(&self, cache: Cache)449     pub async fn set_cache(&self, cache: Cache) {
450         *self.cache.lock().await = Some(cache);
451     }
452 
save_video_data( &self, chunk_body: &BytesMut, timestamp: u32, ) -> Result<(), CacheError>453     pub async fn save_video_data(
454         &self,
455         chunk_body: &BytesMut,
456         timestamp: u32,
457     ) -> Result<(), CacheError> {
458         if let Some(cache) = &mut *self.cache.lock().await {
459             cache.save_video_data(chunk_body, timestamp).await?;
460         }
461         Ok(())
462     }
463 
save_audio_data( &self, chunk_body: &BytesMut, timestamp: u32, ) -> Result<(), CacheError>464     pub async fn save_audio_data(
465         &self,
466         chunk_body: &BytesMut,
467         timestamp: u32,
468     ) -> Result<(), CacheError> {
469         if let Some(cache) = &mut *self.cache.lock().await {
470             cache.save_audio_data(chunk_body, timestamp).await?;
471         }
472         Ok(())
473     }
474 
save_metadata(&self, chunk_body: &BytesMut, timestamp: u32)475     pub async fn save_metadata(&self, chunk_body: &BytesMut, timestamp: u32) {
476         if let Some(cache) = &mut *self.cache.lock().await {
477             cache.save_metadata(chunk_body, timestamp);
478         }
479     }
480 }
481 
482 #[async_trait]
483 impl TStreamHandler for RtmpStreamHandler {
send_prior_data( &self, data_sender: DataSender, sub_type: SubscribeType, ) -> Result<(), ChannelError>484     async fn send_prior_data(
485         &self,
486         data_sender: DataSender,
487         sub_type: SubscribeType,
488     ) -> Result<(), ChannelError> {
489         let sender = match data_sender {
490             DataSender::Frame { sender } => sender,
491             DataSender::Packet { sender: _ } => {
492                 return Err(ChannelError {
493                     value: ChannelErrorValue::NotCorrectDataSenderType,
494                 });
495             }
496         };
497         if let Some(cache) = &mut *self.cache.lock().await {
498             if let Some(meta_body_data) = cache.get_metadata() {
499                 sender.send(meta_body_data).map_err(|_| ChannelError {
500                     value: ChannelErrorValue::SendError,
501                 })?;
502             }
503             if let Some(audio_seq_data) = cache.get_audio_seq() {
504                 sender.send(audio_seq_data).map_err(|_| ChannelError {
505                     value: ChannelErrorValue::SendError,
506                 })?;
507             }
508             if let Some(video_seq_data) = cache.get_video_seq() {
509                 sender.send(video_seq_data).map_err(|_| ChannelError {
510                     value: ChannelErrorValue::SendError,
511                 })?;
512             }
513             match sub_type {
514                 SubscribeType::PlayerRtmp
515                 | SubscribeType::PlayerHttpFlv
516                 | SubscribeType::PlayerHls
517                 | SubscribeType::GenerateHls => {
518                     if let Some(gops_data) = cache.get_gops_data() {
519                         for gop in gops_data {
520                             for channel_data in gop.get_frame_data() {
521                                 sender.send(channel_data).map_err(|_| ChannelError {
522                                     value: ChannelErrorValue::SendError,
523                                 })?;
524                             }
525                         }
526                     }
527                 }
528                 _ => {}
529             }
530         }
531 
532         Ok(())
533     }
get_statistic_data(&self) -> Option<StreamStatistics>534     async fn get_statistic_data(&self) -> Option<StreamStatistics> {
535         if let Some(cache) = &mut *self.cache.lock().await {
536             return Some(cache.av_statistics.get_avstatistic_data().await);
537         }
538 
539         None
540     }
541 
send_information(&self, _: InformationSender)542     async fn send_information(&self, _: InformationSender) {}
543 }
544 
545 impl fmt::Debug for Common {
fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error>546     fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
547         write!(fmt, "S2 {{ member: {:?} }}", self.request_url)
548     }
549 }
550