xref: /xiu/library/streamhub/src/define.rs (revision a4ef5d6c)
1 use crate::utils;
2 
3 use {
4     super::errors::ChannelError,
5     crate::statistics::StreamStatistics,
6     crate::stream::StreamIdentifier,
7     async_trait::async_trait,
8     bytes::BytesMut,
9     serde::ser::SerializeStruct,
10     serde::Serialize,
11     serde::Serializer,
12     std::fmt,
13     std::sync::Arc,
14     tokio::sync::{broadcast, mpsc, oneshot},
15     utils::Uuid,
16 };
17 
18 #[derive(Debug, Serialize, Clone, Eq, PartialEq)]
19 pub enum SubscribeType {
20     /* Remote client request playing rtmp stream.*/
21     PlayerRtmp,
22     /* Remote client request playing http-flv stream.*/
23     PlayerHttpFlv,
24     /* Remote client request playing hls stream.*/
25     PlayerHls,
26     /* Remote/local client request playing rtsp stream.*/
27     PlayerRtsp,
28     /* Local client request playing webrtc stream, it's used for protocol remux.*/
29     PlayerWebrtc,
30     /* Remote client request playing rtsp or webrtc(whep) raw rtp stream.*/
31     PlayerRtp,
32     GenerateHls,
33     /* Local client *subscribe* from local rtmp session
34     and *publish* (relay push) the stream to remote server.*/
35     PublisherRtmp,
36 }
37 
38 //session publish type
39 #[derive(Debug, Serialize, Clone, Eq, PartialEq)]
40 pub enum PublishType {
41     /* Receive rtmp stream from remote push client */
42     PushRtmp,
43     /* Local client *publish* the rtmp stream to local session,
44     the rtmp stream is *subscribed* (pull) from remote server.*/
45     RelayRtmp,
46     /* Receive rtsp stream from remote push client */
47     PushRtsp,
48     RelayRtsp,
49     /* Receive webrtc stream from remote push client(whip),  */
50     PushWebRTC,
51     /* It used for publishing raw rtp data of rtsp/whbrtc(whip) */
52     PushRtp,
53 }
54 
55 #[derive(Debug, Serialize, Clone)]
56 pub struct NotifyInfo {
57     pub request_url: String,
58     pub remote_addr: String,
59 }
60 
61 #[derive(Debug, Clone)]
62 pub struct SubscriberInfo {
63     pub id: Uuid,
64     pub sub_type: SubscribeType,
65     pub notify_info: NotifyInfo,
66     pub sub_data_type: SubDataType,
67 }
68 
69 impl Serialize for SubscriberInfo {
serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: Serializer,70     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
71     where
72         S: Serializer,
73     {
74         // 3 is the number of fields in the struct.
75         let mut state = serializer.serialize_struct("SubscriberInfo", 3)?;
76 
77         state.serialize_field("id", &self.id.to_string())?;
78         state.serialize_field("sub_type", &self.sub_type)?;
79         state.serialize_field("notify_info", &self.notify_info)?;
80         state.end()
81     }
82 }
83 
84 #[derive(Debug, Clone)]
85 pub struct PublisherInfo {
86     pub id: Uuid,
87     pub pub_type: PublishType,
88     pub pub_data_type: PubDataType,
89     pub notify_info: NotifyInfo,
90 }
91 
92 impl Serialize for PublisherInfo {
serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: Serializer,93     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
94     where
95         S: Serializer,
96     {
97         // 3 is the number of fields in the struct.
98         let mut state = serializer.serialize_struct("PublisherInfo", 3)?;
99 
100         state.serialize_field("id", &self.id.to_string())?;
101         state.serialize_field("sub_type", &self.pub_type)?;
102         state.serialize_field("notify_info", &self.notify_info)?;
103         state.end()
104     }
105 }
106 
107 #[derive(Clone, PartialEq)]
108 pub enum VideoCodecType {
109     H264,
110     H265,
111 }
112 
113 #[derive(Clone)]
114 pub struct MediaInfo {
115     pub audio_clock_rate: u32,
116     pub video_clock_rate: u32,
117     pub vcodec: VideoCodecType,
118 }
119 
120 #[derive(Clone)]
121 pub enum FrameData {
122     Video { timestamp: u32, data: BytesMut },
123     Audio { timestamp: u32, data: BytesMut },
124     MetaData { timestamp: u32, data: BytesMut },
125     MediaInfo { media_info: MediaInfo },
126 }
127 
128 //Used to pass rtp raw data.
129 #[derive(Clone)]
130 pub enum PacketData {
131     Video { timestamp: u32, data: BytesMut },
132     Audio { timestamp: u32, data: BytesMut },
133 }
134 
135 //used to save data which needs to be transferred between client/server sessions
136 #[derive(Clone)]
137 pub enum Information {
138     Sdp { data: String },
139 }
140 
141 //used to transfer a/v frame between different protocols(rtmp/rtsp/webrtc/http-flv/hls)
142 //or send a/v frame data from publisher to subscribers.
143 pub type FrameDataSender = mpsc::UnboundedSender<FrameData>;
144 pub type FrameDataReceiver = mpsc::UnboundedReceiver<FrameData>;
145 
146 //used to transfer rtp packet data,it includles the following directions:
147 // rtsp(publisher)->stream hub->rtsp(subscriber)
148 // webrtc(publisher whip)->stream hub->webrtc(subscriber whep)
149 pub type PacketDataSender = mpsc::UnboundedSender<PacketData>;
150 pub type PacketDataReceiver = mpsc::UnboundedReceiver<PacketData>;
151 
152 pub type InformationSender = mpsc::UnboundedSender<Information>;
153 pub type InformationReceiver = mpsc::UnboundedReceiver<Information>;
154 
155 pub type StreamHubEventSender = mpsc::UnboundedSender<StreamHubEvent>;
156 pub type StreamHubEventReceiver = mpsc::UnboundedReceiver<StreamHubEvent>;
157 
158 pub type BroadcastEventSender = broadcast::Sender<BroadcastEvent>;
159 pub type BroadcastEventReceiver = broadcast::Receiver<BroadcastEvent>;
160 
161 pub type TransmitterEventSender = mpsc::UnboundedSender<TransmitterEvent>;
162 pub type TransmitterEventReceiver = mpsc::UnboundedReceiver<TransmitterEvent>;
163 
164 pub type AvStatisticSender = mpsc::UnboundedSender<StreamStatistics>;
165 pub type AvStatisticReceiver = mpsc::UnboundedReceiver<StreamStatistics>;
166 
167 pub type StreamStatisticSizeSender = oneshot::Sender<usize>;
168 pub type StreamStatisticSizeReceiver = oneshot::Receiver<usize>;
169 
170 pub type SubEventExecuteResultSender = oneshot::Sender<Result<DataReceiver, ChannelError>>;
171 pub type PubEventExecuteResultSender =
172     oneshot::Sender<Result<(Option<FrameDataSender>, Option<PacketDataSender>), ChannelError>>;
173 
174 #[async_trait]
175 pub trait TStreamHandler: Send + Sync {
send_prior_data( &self, sender: DataSender, sub_type: SubscribeType, ) -> Result<(), ChannelError>176     async fn send_prior_data(
177         &self,
178         sender: DataSender,
179         sub_type: SubscribeType,
180     ) -> Result<(), ChannelError>;
get_statistic_data(&self) -> Option<StreamStatistics>181     async fn get_statistic_data(&self) -> Option<StreamStatistics>;
send_information(&self, sender: InformationSender)182     async fn send_information(&self, sender: InformationSender);
183 }
184 
185 //A publisher can publish one or two kinds of av stream at a time.
186 pub struct DataReceiver {
187     pub frame_receiver: Option<FrameDataReceiver>,
188     pub packet_receiver: Option<PacketDataReceiver>,
189 }
190 
191 //A subscriber only needs to subscribe to one type of stream at a time
192 #[derive(Debug, Clone)]
193 pub enum DataSender {
194     Frame { sender: FrameDataSender },
195     Packet { sender: PacketDataSender },
196 }
197 //we can only sub one kind of stream.
198 #[derive(Debug, Clone, Serialize)]
199 pub enum SubDataType {
200     Frame,
201     Packet,
202 }
203 //we can pub frame or packet or both.
204 #[derive(Debug, Clone, Serialize)]
205 pub enum PubDataType {
206     Frame,
207     Packet,
208     Both,
209 }
210 
211 #[derive(Serialize)]
212 pub enum StreamHubEvent {
213     Subscribe {
214         identifier: StreamIdentifier,
215         info: SubscriberInfo,
216         #[serde(skip_serializing)]
217         result_sender: SubEventExecuteResultSender,
218     },
219     UnSubscribe {
220         identifier: StreamIdentifier,
221         info: SubscriberInfo,
222     },
223     Publish {
224         identifier: StreamIdentifier,
225         info: PublisherInfo,
226         #[serde(skip_serializing)]
227         result_sender: PubEventExecuteResultSender,
228         #[serde(skip_serializing)]
229         stream_handler: Arc<dyn TStreamHandler>,
230     },
231     UnPublish {
232         identifier: StreamIdentifier,
233         info: PublisherInfo,
234     },
235     #[serde(skip_serializing)]
236     ApiStatistic {
237         data_sender: AvStatisticSender,
238         size_sender: StreamStatisticSizeSender,
239     },
240     #[serde(skip_serializing)]
241     ApiKickClient { id: Uuid },
242 
243     #[serde(skip_serializing)]
244     Request {
245         identifier: StreamIdentifier,
246         sender: InformationSender,
247     },
248 }
249 
250 #[derive(Debug)]
251 pub enum TransmitterEvent {
252     Subscribe {
253         sender: DataSender,
254         info: SubscriberInfo,
255     },
256     UnSubscribe {
257         info: SubscriberInfo,
258     },
259     UnPublish {},
260 
261     Api {
262         sender: AvStatisticSender,
263     },
264     Request {
265         sender: InformationSender,
266     },
267 }
268 
269 impl fmt::Display for TransmitterEvent {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result270     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
271         write!(f, "{:?}", *self)
272     }
273 }
274 
275 #[derive(Debug, Clone)]
276 pub enum BroadcastEvent {
277     /*Need publish(push) a stream to other rtmp server*/
278     Publish { identifier: StreamIdentifier },
279     UnPublish { identifier: StreamIdentifier },
280     /*Need subscribe(pull) a stream from other rtmp server*/
281     Subscribe { identifier: StreamIdentifier },
282     UnSubscribe { identifier: StreamIdentifier },
283 }
284 
285 //Used for kickoff
286 #[derive(Debug, Clone)]
287 pub enum PubSubInfo {
288     Subscribe {
289         identifier: StreamIdentifier,
290         sub_info: SubscriberInfo,
291     },
292 
293     Publish {
294         identifier: StreamIdentifier,
295     },
296 }
297