1 use crate::utils; 2 3 use { 4 super::errors::ChannelError, 5 crate::statistics::StreamStatistics, 6 crate::stream::StreamIdentifier, 7 async_trait::async_trait, 8 bytes::BytesMut, 9 serde::ser::SerializeStruct, 10 serde::Serialize, 11 serde::Serializer, 12 std::fmt, 13 std::sync::Arc, 14 tokio::sync::{broadcast, mpsc, oneshot}, 15 utils::Uuid, 16 }; 17 18 #[derive(Debug, Serialize, Clone, Eq, PartialEq)] 19 pub enum SubscribeType { 20 /* Remote client request playing rtmp stream.*/ 21 PlayerRtmp, 22 /* Remote client request playing http-flv stream.*/ 23 PlayerHttpFlv, 24 /* Remote client request playing hls stream.*/ 25 PlayerHls, 26 /* Remote/local client request playing rtsp stream.*/ 27 PlayerRtsp, 28 /* Local client request playing webrtc stream, it's used for protocol remux.*/ 29 PlayerWebrtc, 30 /* Remote client request playing rtsp or webrtc(whep) raw rtp stream.*/ 31 PlayerRtp, 32 GenerateHls, 33 /* Local client *subscribe* from local rtmp session 34 and *publish* (relay push) the stream to remote server.*/ 35 PublisherRtmp, 36 } 37 38 //session publish type 39 #[derive(Debug, Serialize, Clone, Eq, PartialEq)] 40 pub enum PublishType { 41 /* Receive rtmp stream from remote push client */ 42 PushRtmp, 43 /* Local client *publish* the rtmp stream to local session, 44 the rtmp stream is *subscribed* (pull) from remote server.*/ 45 RelayRtmp, 46 /* Receive rtsp stream from remote push client */ 47 PushRtsp, 48 RelayRtsp, 49 /* Receive webrtc stream from remote push client(whip), */ 50 PushWebRTC, 51 /* It used for publishing raw rtp data of rtsp/whbrtc(whip) */ 52 PushRtp, 53 } 54 55 #[derive(Debug, Serialize, Clone)] 56 pub struct NotifyInfo { 57 pub request_url: String, 58 pub remote_addr: String, 59 } 60 61 #[derive(Debug, Clone)] 62 pub struct SubscriberInfo { 63 pub id: Uuid, 64 pub sub_type: SubscribeType, 65 pub notify_info: NotifyInfo, 66 pub sub_data_type: SubDataType, 67 } 68 69 impl Serialize for SubscriberInfo { serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: Serializer,70 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> 71 where 72 S: Serializer, 73 { 74 // 3 is the number of fields in the struct. 75 let mut state = serializer.serialize_struct("SubscriberInfo", 3)?; 76 77 state.serialize_field("id", &self.id.to_string())?; 78 state.serialize_field("sub_type", &self.sub_type)?; 79 state.serialize_field("notify_info", &self.notify_info)?; 80 state.end() 81 } 82 } 83 84 #[derive(Debug, Clone)] 85 pub struct PublisherInfo { 86 pub id: Uuid, 87 pub pub_type: PublishType, 88 pub pub_data_type: PubDataType, 89 pub notify_info: NotifyInfo, 90 } 91 92 impl Serialize for PublisherInfo { serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: Serializer,93 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> 94 where 95 S: Serializer, 96 { 97 // 3 is the number of fields in the struct. 98 let mut state = serializer.serialize_struct("PublisherInfo", 3)?; 99 100 state.serialize_field("id", &self.id.to_string())?; 101 state.serialize_field("sub_type", &self.pub_type)?; 102 state.serialize_field("notify_info", &self.notify_info)?; 103 state.end() 104 } 105 } 106 107 #[derive(Clone, PartialEq)] 108 pub enum VideoCodecType { 109 H264, 110 H265, 111 } 112 113 #[derive(Clone)] 114 pub struct MediaInfo { 115 pub audio_clock_rate: u32, 116 pub video_clock_rate: u32, 117 pub vcodec: VideoCodecType, 118 } 119 120 #[derive(Clone)] 121 pub enum FrameData { 122 Video { timestamp: u32, data: BytesMut }, 123 Audio { timestamp: u32, data: BytesMut }, 124 MetaData { timestamp: u32, data: BytesMut }, 125 MediaInfo { media_info: MediaInfo }, 126 } 127 128 //Used to pass rtp raw data. 129 #[derive(Clone)] 130 pub enum PacketData { 131 Video { timestamp: u32, data: BytesMut }, 132 Audio { timestamp: u32, data: BytesMut }, 133 } 134 135 //used to save data which needs to be transferred between client/server sessions 136 #[derive(Clone)] 137 pub enum Information { 138 Sdp { data: String }, 139 } 140 141 //used to transfer a/v frame between different protocols(rtmp/rtsp/webrtc/http-flv/hls) 142 //or send a/v frame data from publisher to subscribers. 143 pub type FrameDataSender = mpsc::UnboundedSender<FrameData>; 144 pub type FrameDataReceiver = mpsc::UnboundedReceiver<FrameData>; 145 146 //used to transfer rtp packet data,it includles the following directions: 147 // rtsp(publisher)->stream hub->rtsp(subscriber) 148 // webrtc(publisher whip)->stream hub->webrtc(subscriber whep) 149 pub type PacketDataSender = mpsc::UnboundedSender<PacketData>; 150 pub type PacketDataReceiver = mpsc::UnboundedReceiver<PacketData>; 151 152 pub type InformationSender = mpsc::UnboundedSender<Information>; 153 pub type InformationReceiver = mpsc::UnboundedReceiver<Information>; 154 155 pub type StreamHubEventSender = mpsc::UnboundedSender<StreamHubEvent>; 156 pub type StreamHubEventReceiver = mpsc::UnboundedReceiver<StreamHubEvent>; 157 158 pub type BroadcastEventSender = broadcast::Sender<BroadcastEvent>; 159 pub type BroadcastEventReceiver = broadcast::Receiver<BroadcastEvent>; 160 161 pub type TransmitterEventSender = mpsc::UnboundedSender<TransmitterEvent>; 162 pub type TransmitterEventReceiver = mpsc::UnboundedReceiver<TransmitterEvent>; 163 164 pub type AvStatisticSender = mpsc::UnboundedSender<StreamStatistics>; 165 pub type AvStatisticReceiver = mpsc::UnboundedReceiver<StreamStatistics>; 166 167 pub type StreamStatisticSizeSender = oneshot::Sender<usize>; 168 pub type StreamStatisticSizeReceiver = oneshot::Receiver<usize>; 169 170 pub type SubEventExecuteResultSender = oneshot::Sender<Result<DataReceiver, ChannelError>>; 171 pub type PubEventExecuteResultSender = 172 oneshot::Sender<Result<(Option<FrameDataSender>, Option<PacketDataSender>), ChannelError>>; 173 174 #[async_trait] 175 pub trait TStreamHandler: Send + Sync { send_prior_data( &self, sender: DataSender, sub_type: SubscribeType, ) -> Result<(), ChannelError>176 async fn send_prior_data( 177 &self, 178 sender: DataSender, 179 sub_type: SubscribeType, 180 ) -> Result<(), ChannelError>; get_statistic_data(&self) -> Option<StreamStatistics>181 async fn get_statistic_data(&self) -> Option<StreamStatistics>; send_information(&self, sender: InformationSender)182 async fn send_information(&self, sender: InformationSender); 183 } 184 185 //A publisher can publish one or two kinds of av stream at a time. 186 pub struct DataReceiver { 187 pub frame_receiver: Option<FrameDataReceiver>, 188 pub packet_receiver: Option<PacketDataReceiver>, 189 } 190 191 //A subscriber only needs to subscribe to one type of stream at a time 192 #[derive(Debug, Clone)] 193 pub enum DataSender { 194 Frame { sender: FrameDataSender }, 195 Packet { sender: PacketDataSender }, 196 } 197 //we can only sub one kind of stream. 198 #[derive(Debug, Clone, Serialize)] 199 pub enum SubDataType { 200 Frame, 201 Packet, 202 } 203 //we can pub frame or packet or both. 204 #[derive(Debug, Clone, Serialize)] 205 pub enum PubDataType { 206 Frame, 207 Packet, 208 Both, 209 } 210 211 #[derive(Serialize)] 212 pub enum StreamHubEvent { 213 Subscribe { 214 identifier: StreamIdentifier, 215 info: SubscriberInfo, 216 #[serde(skip_serializing)] 217 result_sender: SubEventExecuteResultSender, 218 }, 219 UnSubscribe { 220 identifier: StreamIdentifier, 221 info: SubscriberInfo, 222 }, 223 Publish { 224 identifier: StreamIdentifier, 225 info: PublisherInfo, 226 #[serde(skip_serializing)] 227 result_sender: PubEventExecuteResultSender, 228 #[serde(skip_serializing)] 229 stream_handler: Arc<dyn TStreamHandler>, 230 }, 231 UnPublish { 232 identifier: StreamIdentifier, 233 info: PublisherInfo, 234 }, 235 #[serde(skip_serializing)] 236 ApiStatistic { 237 data_sender: AvStatisticSender, 238 size_sender: StreamStatisticSizeSender, 239 }, 240 #[serde(skip_serializing)] 241 ApiKickClient { id: Uuid }, 242 243 #[serde(skip_serializing)] 244 Request { 245 identifier: StreamIdentifier, 246 sender: InformationSender, 247 }, 248 } 249 250 #[derive(Debug)] 251 pub enum TransmitterEvent { 252 Subscribe { 253 sender: DataSender, 254 info: SubscriberInfo, 255 }, 256 UnSubscribe { 257 info: SubscriberInfo, 258 }, 259 UnPublish {}, 260 261 Api { 262 sender: AvStatisticSender, 263 }, 264 Request { 265 sender: InformationSender, 266 }, 267 } 268 269 impl fmt::Display for TransmitterEvent { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result270 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 271 write!(f, "{:?}", *self) 272 } 273 } 274 275 #[derive(Debug, Clone)] 276 pub enum BroadcastEvent { 277 /*Need publish(push) a stream to other rtmp server*/ 278 Publish { identifier: StreamIdentifier }, 279 UnPublish { identifier: StreamIdentifier }, 280 /*Need subscribe(pull) a stream from other rtmp server*/ 281 Subscribe { identifier: StreamIdentifier }, 282 UnSubscribe { identifier: StreamIdentifier }, 283 } 284 285 //Used for kickoff 286 #[derive(Debug, Clone)] 287 pub enum PubSubInfo { 288 Subscribe { 289 identifier: StreamIdentifier, 290 sub_info: SubscriberInfo, 291 }, 292 293 Publish { 294 identifier: StreamIdentifier, 295 }, 296 } 297