1 use super::errors::WebRTCError;
2 use super::errors::WebRTCErrorValue;
3 use bytes::BytesMut;
4 use std::sync::Arc;
5 use streamhub::define::{PacketData, PacketDataSender};
6
7 use tokio::time::Duration;
8 use webrtc::api::interceptor_registry::register_default_interceptors;
9 use webrtc::api::media_engine::MediaEngine;
10 use webrtc::api::APIBuilder;
11 use webrtc::ice_transport::ice_connection_state::RTCIceConnectionState;
12 use webrtc::ice_transport::ice_server::RTCIceServer;
13 use webrtc::interceptor::registry::Registry;
14 use webrtc::peer_connection::configuration::RTCConfiguration;
15 use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
16 use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
17 use webrtc::peer_connection::RTCPeerConnection;
18 use webrtc::rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndication;
19 use webrtc::rtp_transceiver::rtp_codec::RTPCodecType;
20 use webrtc::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection;
21 use webrtc::rtp_transceiver::RTCRtpTransceiverInit;
22 use webrtc::util::Marshal;
23
24 pub type Result<T> = std::result::Result<T, WebRTCError>;
25
handle_whip( offer: RTCSessionDescription, sender: PacketDataSender, ) -> Result<(RTCSessionDescription, Arc<RTCPeerConnection>)>26 pub async fn handle_whip(
27 offer: RTCSessionDescription,
28 sender: PacketDataSender,
29 ) -> Result<(RTCSessionDescription, Arc<RTCPeerConnection>)> {
30 // Create a MediaEngine object to configure the supported codec
31 let mut m = MediaEngine::default();
32
33 m.register_default_codecs()?;
34
35 // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline.
36 // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection`
37 // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry
38 // for each PeerConnection.
39 let mut registry = Registry::new();
40
41 // Use the default set of Interceptors
42 registry = register_default_interceptors(registry, &mut m)?;
43
44 // Create the API object with the MediaEngine
45 let api = APIBuilder::new()
46 .with_media_engine(m)
47 .with_interceptor_registry(registry)
48 .build();
49
50 // Prepare the configuration
51 let config = RTCConfiguration {
52 ice_servers: vec![RTCIceServer {
53 urls: vec!["stun:stun.l.google.com:19302".to_owned()],
54 ..Default::default()
55 }],
56 ..Default::default()
57 };
58
59 // Create a new RTCPeerConnection
60 let peer_connection = Arc::new(api.new_peer_connection(config).await?);
61
62 // Allow us to receive 1 audio track, and 1 video track
63 peer_connection
64 .add_transceiver_from_kind(
65 RTPCodecType::Audio,
66 Some(RTCRtpTransceiverInit {
67 direction: RTCRtpTransceiverDirection::Recvonly,
68 send_encodings: Vec::new(),
69 }),
70 )
71 .await?;
72 peer_connection
73 .add_transceiver_from_kind(
74 RTPCodecType::Video,
75 Some(RTCRtpTransceiverInit {
76 direction: RTCRtpTransceiverDirection::Recvonly,
77 send_encodings: Vec::new(),
78 }),
79 )
80 .await?;
81
82 // Set a handler for when a new remote track starts, this handler will forward data to
83 // our UDP listeners.
84 // In your application this is where you would handle/process audio/video
85 let pc = Arc::downgrade(&peer_connection);
86 peer_connection.on_track(Box::new(move |track, _, _| {
87 // Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
88 let media_ssrc = track.ssrc();
89 let pc2 = pc.clone();
90 tokio::spawn(async move {
91 let mut result = Result::<usize>::Ok(0);
92 while result.is_ok() {
93 let timeout = tokio::time::sleep(Duration::from_secs(3));
94 tokio::pin!(timeout);
95
96 tokio::select! {
97 _ = timeout.as_mut() =>{
98 if let Some(pc) = pc2.upgrade(){
99 result = pc.write_rtcp(&[Box::new(PictureLossIndication{
100 sender_ssrc: 0,
101 media_ssrc,
102 })]).await.map_err(Into::into);
103 }else{
104 break;
105 }
106 }
107 };
108 }
109 });
110 let sender_clone = sender.clone();
111
112 tokio::spawn(async move {
113 let mut b = vec![0u8; 3000];
114
115 while let Ok((rtp_packet, _)) = track.read(&mut b).await {
116 // Update the PayloadType
117 //rtp_packet.header.payload_type = c.payload_type;
118
119 // Marshal into original buffer with updated PayloadType
120
121 let n = rtp_packet.marshal_to(&mut b)?;
122
123 match rtp_packet.header.payload_type {
124 //video h264
125 96 => {
126 let video_packet = PacketData::Video {
127 timestamp: rtp_packet.header.timestamp,
128 data: BytesMut::from(&b[..n]),
129 };
130 if let Err(err) = sender_clone.send(video_packet) {
131 log::error!("send video packet error: {}", err);
132 }
133 }
134 //aac
135 97 | 111 => {
136 let audio_packet = PacketData::Audio {
137 timestamp: rtp_packet.header.timestamp,
138 data: BytesMut::from(&b[..n]),
139 };
140 if let Err(err) = sender_clone.send(audio_packet) {
141 log::error!("send audio packet error: {}", err);
142 }
143 }
144 _ => {}
145 }
146 }
147
148 Result::<()>::Ok(())
149 });
150
151 Box::pin(async {})
152 }));
153
154 // Set the handler for ICE connection state
155 // This will notify you when the peer has connected/disconnected
156 peer_connection.on_ice_connection_state_change(Box::new(
157 move |connection_state: RTCIceConnectionState| {
158 log::info!("Connection State has changed {connection_state}");
159 if connection_state == RTCIceConnectionState::Connected {
160 log::info!("Ctrl+C the remote client to stop the demo");
161 }
162 Box::pin(async {})
163 },
164 ));
165
166 // Set the handler for Peer connection state
167 // This will notify you when the peer has connected/disconnected
168
169 peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
170 log::info!("Peer Connection State has changed: {s}");
171
172 if s == RTCPeerConnectionState::Failed {
173 // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
174 // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
175 // Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
176 println!("Peer Connection has gone to failed exiting: Done forwarding");
177 }
178
179 Box::pin(async {})
180 }));
181
182 // Set the remote SessionDescription
183 peer_connection.set_remote_description(offer).await?;
184
185 // Create an answer
186 let answer = peer_connection.create_answer(None).await?;
187
188 // Create channel that is blocked until ICE Gathering is complete
189 let mut gather_complete = peer_connection.gathering_complete_promise().await;
190
191 // Sets the LocalDescription, and starts our UDP listeners
192 peer_connection.set_local_description(answer).await?;
193
194 // Block until ICE Gathering is complete, disabling trickle ICE
195 // we do this because we only can exchange one signaling message
196 // in a production application you should exchange ICE Candidates via OnICECandidate
197 let _ = gather_complete.recv().await;
198
199 // Output the answer in base64 so we can paste it in browser
200 if let Some(local_desc) = peer_connection.local_description().await {
201 Ok((local_desc, peer_connection))
202 } else {
203 Err(WebRTCError {
204 value: WebRTCErrorValue::CanNotGetLocalDescription,
205 })
206 }
207 }
208