1 use super::errors::WebRTCError;
2 use super::errors::WebRTCErrorValue;
3
4 use std::sync::Arc;
5 use streamhub::define::PacketData;
6 use streamhub::define::PacketDataReceiver;
7
8 use webrtc::api::interceptor_registry::register_default_interceptors;
9 use webrtc::api::media_engine::{MediaEngine, MIME_TYPE_H264, MIME_TYPE_OPUS};
10 use webrtc::api::APIBuilder;
11 use webrtc::ice_transport::ice_connection_state::RTCIceConnectionState;
12 use webrtc::ice_transport::ice_server::RTCIceServer;
13 use webrtc::interceptor::registry::Registry;
14 use webrtc::peer_connection::configuration::RTCConfiguration;
15
16 use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
17 use webrtc::peer_connection::RTCPeerConnection;
18
19 use tokio::sync::broadcast;
20 use webrtc::rtp_transceiver::rtp_codec::RTCRtpCodecCapability;
21 use webrtc::track::track_local::track_local_static_rtp::TrackLocalStaticRTP;
22 use webrtc::track::track_local::TrackLocal;
23 use webrtc::track::track_local::TrackLocalWriter;
24
25 pub type Result<T> = std::result::Result<T, WebRTCError>;
26 use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
27
handle_whep( offer: RTCSessionDescription, mut receiver: PacketDataReceiver, state_sender: broadcast::Sender<RTCPeerConnectionState>, ) -> Result<(RTCSessionDescription, Arc<RTCPeerConnection>)>28 pub async fn handle_whep(
29 offer: RTCSessionDescription,
30 mut receiver: PacketDataReceiver,
31 state_sender: broadcast::Sender<RTCPeerConnectionState>,
32 ) -> Result<(RTCSessionDescription, Arc<RTCPeerConnection>)> {
33 // Everything below is the WebRTC-rs API! Thanks for using it ❤️.
34
35 // Create a MediaEngine object to configure the supported codec
36 let mut m = MediaEngine::default();
37
38 m.register_default_codecs()?;
39
40 // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline.
41 // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection`
42 // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry
43 // for each PeerConnection.
44 let mut registry = Registry::new();
45
46 // Use the default set of Interceptors
47 registry = register_default_interceptors(registry, &mut m)?;
48
49 // Create the API object with the MediaEngine
50 let api = APIBuilder::new()
51 .with_media_engine(m)
52 .with_interceptor_registry(registry)
53 .build();
54
55 // Prepare the configuration
56 let config = RTCConfiguration {
57 ice_servers: vec![RTCIceServer {
58 urls: vec!["stun:stun.l.google.com:19302".to_owned()],
59 ..Default::default()
60 }],
61 ..Default::default()
62 };
63
64 // Create a new RTCPeerConnection
65 let peer_connection = Arc::new(api.new_peer_connection(config).await?);
66
67 // Create Track that we send video back to browser on
68 let video_track = Arc::new(TrackLocalStaticRTP::new(
69 RTCRtpCodecCapability {
70 mime_type: MIME_TYPE_H264.to_owned(),
71 ..Default::default()
72 },
73 "video".to_owned(),
74 "webrtc-rs".to_owned(),
75 ));
76
77 // Create Track that we send video back to browser on
78 let audio_track = Arc::new(TrackLocalStaticRTP::new(
79 RTCRtpCodecCapability {
80 mime_type: MIME_TYPE_OPUS.to_owned(),
81 ..Default::default()
82 },
83 "audio".to_owned(),
84 "webrtc-rs".to_owned(),
85 ));
86
87 // Add this newly created track to the PeerConnection
88 let rtp_sender = peer_connection
89 .add_track(Arc::clone(&video_track) as Arc<dyn TrackLocal + Send + Sync>)
90 .await?;
91
92 let _ = peer_connection
93 .add_track(Arc::clone(&audio_track) as Arc<dyn TrackLocal + Send + Sync>)
94 .await?;
95
96 // Read incoming RTCP packets
97 // Before these packets are returned they are processed by interceptors. For things
98 // like NACK this needs to be called.
99 tokio::spawn(async move {
100 let mut rtcp_buf = vec![0u8; 1500];
101 while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {}
102 Result::<()>::Ok(())
103 });
104
105 // Set the handler for ICE connection state
106 // This will notify you when the peer has connected/disconnected
107 peer_connection.on_ice_connection_state_change(Box::new(
108 move |connection_state: RTCIceConnectionState| {
109 log::info!("Connection State has changed {connection_state}");
110 if connection_state == RTCIceConnectionState::Failed {
111 // let _ = done_tx1.try_send(());
112 }
113 Box::pin(async {})
114 },
115 ));
116
117 // Set the handler for Peer connection state
118 // This will notify you when the peer has connected/disconnected
119 let mut state_receiver = state_sender.subscribe();
120 peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
121 log::info!("Peer Connection State has changed: {s}");
122
123 if s == RTCPeerConnectionState::Failed {
124 // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
125 // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
126 // Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
127 log::info!("Peer Connection has gone to failed exiting: Done forwarding");
128 // let _ = done_tx2.try_send(());
129 }
130 if let Err(err) = state_sender.send(s) {
131 log::error!("on_peer_connection_state_change send state err: {}", err);
132 }
133
134 Box::pin(async {})
135 }));
136
137 // Set the remote SessionDescription
138 peer_connection.set_remote_description(offer).await?;
139
140 // Create an answer
141 let answer = peer_connection.create_answer(None).await?;
142
143 // Create channel that is blocked until ICE Gathering is complete
144 let mut gather_complete = peer_connection.gathering_complete_promise().await;
145
146 // Sets the LocalDescription, and starts our UDP listeners
147 peer_connection.set_local_description(answer).await?;
148
149 // Block until ICE Gathering is complete, disabling trickle ICE
150 // we do this because we only can exchange one signaling message
151 // in a production application you should exchange ICE Candidates via OnICECandidate
152 let _ = gather_complete.recv().await;
153
154 // Read RTP packets forever and send them to the WebRTC Client
155 tokio::spawn(async move {
156 loop {
157 tokio::select! {
158 av_data = receiver.recv() =>{
159 if let Some(data) = av_data {
160 match data {
161 PacketData::Video { timestamp: _, data } => {
162 if let Err(err) = video_track.write(&data[..]).await {
163 log::error!("send video data error: {}", err);
164 }
165 }
166 PacketData::Audio { timestamp: _, data } => {
167 if let Err(err) = audio_track.write(&data[..]).await {
168 log::error!("send audio data error: {}", err);
169 }
170 }
171 }
172 }
173 }
174 pc_state = state_receiver.recv() =>{
175 if let Ok(state) = pc_state{
176 if state == RTCPeerConnectionState::Closed {
177 break;
178 }
179 }
180 }
181 }
182 }
183 });
184
185 // Output the answer in base64 so we can paste it in browser
186 if let Some(local_desc) = peer_connection.local_description().await {
187 Ok((local_desc, peer_connection))
188 } else {
189 Err(WebRTCError {
190 value: WebRTCErrorValue::CanNotGetLocalDescription,
191 })
192 }
193 }
194