xref: /xiu/protocol/webrtc/src/whep.rs (revision 80f20d70)
1 use super::errors::WebRTCError;
2 use super::errors::WebRTCErrorValue;
3 
4 use std::sync::Arc;
5 use streamhub::define::PacketData;
6 use streamhub::define::PacketDataReceiver;
7 
8 use webrtc::api::interceptor_registry::register_default_interceptors;
9 use webrtc::api::media_engine::{MediaEngine, MIME_TYPE_H264, MIME_TYPE_OPUS};
10 use webrtc::api::APIBuilder;
11 use webrtc::ice_transport::ice_connection_state::RTCIceConnectionState;
12 use webrtc::ice_transport::ice_server::RTCIceServer;
13 use webrtc::interceptor::registry::Registry;
14 use webrtc::peer_connection::configuration::RTCConfiguration;
15 
16 use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
17 use webrtc::peer_connection::RTCPeerConnection;
18 
19 use tokio::sync::broadcast;
20 use webrtc::rtp_transceiver::rtp_codec::RTCRtpCodecCapability;
21 use webrtc::track::track_local::track_local_static_rtp::TrackLocalStaticRTP;
22 use webrtc::track::track_local::TrackLocal;
23 use webrtc::track::track_local::TrackLocalWriter;
24 
25 pub type Result<T> = std::result::Result<T, WebRTCError>;
26 use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
27 
handle_whep( offer: RTCSessionDescription, mut receiver: PacketDataReceiver, state_sender: broadcast::Sender<RTCPeerConnectionState>, ) -> Result<(RTCSessionDescription, Arc<RTCPeerConnection>)>28 pub async fn handle_whep(
29     offer: RTCSessionDescription,
30     mut receiver: PacketDataReceiver,
31     state_sender: broadcast::Sender<RTCPeerConnectionState>,
32 ) -> Result<(RTCSessionDescription, Arc<RTCPeerConnection>)> {
33     // Everything below is the WebRTC-rs API! Thanks for using it ❤️.
34 
35     // Create a MediaEngine object to configure the supported codec
36     let mut m = MediaEngine::default();
37 
38     m.register_default_codecs()?;
39 
40     // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline.
41     // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection`
42     // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry
43     // for each PeerConnection.
44     let mut registry = Registry::new();
45 
46     // Use the default set of Interceptors
47     registry = register_default_interceptors(registry, &mut m)?;
48 
49     // Create the API object with the MediaEngine
50     let api = APIBuilder::new()
51         .with_media_engine(m)
52         .with_interceptor_registry(registry)
53         .build();
54 
55     // Prepare the configuration
56     let config = RTCConfiguration {
57         ice_servers: vec![RTCIceServer {
58             urls: vec!["stun:stun.l.google.com:19302".to_owned()],
59             ..Default::default()
60         }],
61         ..Default::default()
62     };
63 
64     // Create a new RTCPeerConnection
65     let peer_connection = Arc::new(api.new_peer_connection(config).await?);
66 
67     // Create Track that we send video back to browser on
68     let video_track = Arc::new(TrackLocalStaticRTP::new(
69         RTCRtpCodecCapability {
70             mime_type: MIME_TYPE_H264.to_owned(),
71             ..Default::default()
72         },
73         "video".to_owned(),
74         "webrtc-rs".to_owned(),
75     ));
76 
77     // Create Track that we send video back to browser on
78     let audio_track = Arc::new(TrackLocalStaticRTP::new(
79         RTCRtpCodecCapability {
80             mime_type: MIME_TYPE_OPUS.to_owned(),
81             ..Default::default()
82         },
83         "audio".to_owned(),
84         "webrtc-rs".to_owned(),
85     ));
86 
87     // Add this newly created track to the PeerConnection
88     let rtp_sender = peer_connection
89         .add_track(Arc::clone(&video_track) as Arc<dyn TrackLocal + Send + Sync>)
90         .await?;
91 
92     let _ = peer_connection
93         .add_track(Arc::clone(&audio_track) as Arc<dyn TrackLocal + Send + Sync>)
94         .await?;
95 
96     // Read incoming RTCP packets
97     // Before these packets are returned they are processed by interceptors. For things
98     // like NACK this needs to be called.
99     tokio::spawn(async move {
100         let mut rtcp_buf = vec![0u8; 1500];
101         while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {}
102         Result::<()>::Ok(())
103     });
104 
105     // Set the handler for ICE connection state
106     // This will notify you when the peer has connected/disconnected
107     peer_connection.on_ice_connection_state_change(Box::new(
108         move |connection_state: RTCIceConnectionState| {
109             log::info!("Connection State has changed {connection_state}");
110             if connection_state == RTCIceConnectionState::Failed {
111                 // let _ = done_tx1.try_send(());
112             }
113             Box::pin(async {})
114         },
115     ));
116 
117     // Set the handler for Peer connection state
118     // This will notify you when the peer has connected/disconnected
119     let mut state_receiver = state_sender.subscribe();
120     peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
121         log::info!("Peer Connection State has changed: {s}");
122 
123         if s == RTCPeerConnectionState::Failed {
124             // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
125             // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
126             // Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
127             log::info!("Peer Connection has gone to failed exiting: Done forwarding");
128             // let _ = done_tx2.try_send(());
129         }
130         if let Err(err) = state_sender.send(s) {
131             log::error!("on_peer_connection_state_change send state err: {}", err);
132         }
133 
134         Box::pin(async {})
135     }));
136 
137     // Set the remote SessionDescription
138     peer_connection.set_remote_description(offer).await?;
139 
140     // Create an answer
141     let answer = peer_connection.create_answer(None).await?;
142 
143     // Create channel that is blocked until ICE Gathering is complete
144     let mut gather_complete = peer_connection.gathering_complete_promise().await;
145 
146     // Sets the LocalDescription, and starts our UDP listeners
147     peer_connection.set_local_description(answer).await?;
148 
149     // Block until ICE Gathering is complete, disabling trickle ICE
150     // we do this because we only can exchange one signaling message
151     // in a production application you should exchange ICE Candidates via OnICECandidate
152     let _ = gather_complete.recv().await;
153 
154     // Read RTP packets forever and send them to the WebRTC Client
155     tokio::spawn(async move {
156         loop {
157             tokio::select! {
158                 av_data = receiver.recv() =>{
159                     if let Some(data) = av_data {
160                         match data {
161                             PacketData::Video { timestamp: _, data } => {
162                                 if let Err(err) = video_track.write(&data[..]).await {
163                                     log::error!("send video data error: {}", err);
164                                 }
165                             }
166                             PacketData::Audio { timestamp: _, data } => {
167                                 if let Err(err) = audio_track.write(&data[..]).await {
168                                     log::error!("send audio data error: {}", err);
169                                 }
170                             }
171                         }
172                     }
173                 }
174                 pc_state = state_receiver.recv() =>{
175                     if let Ok(state) = pc_state{
176                         if state == RTCPeerConnectionState::Closed {
177                             break;
178                         }
179                     }
180                 }
181             }
182         }
183     });
184 
185     // Output the answer in base64 so we can paste it in browser
186     if let Some(local_desc) = peer_connection.local_description().await {
187         Ok((local_desc, peer_connection))
188     } else {
189         Err(WebRTCError {
190             value: WebRTCErrorValue::CanNotGetLocalDescription,
191         })
192     }
193 }
194