xref: /xiu/protocol/webrtc/src/whip.rs (revision 80f20d70)
1 use super::errors::WebRTCError;
2 use super::errors::WebRTCErrorValue;
3 use bytes::BytesMut;
4 use std::sync::Arc;
5 use streamhub::define::{PacketData, PacketDataSender};
6 
7 use tokio::time::Duration;
8 use webrtc::api::interceptor_registry::register_default_interceptors;
9 use webrtc::api::media_engine::MediaEngine;
10 use webrtc::api::APIBuilder;
11 use webrtc::ice_transport::ice_connection_state::RTCIceConnectionState;
12 use webrtc::ice_transport::ice_server::RTCIceServer;
13 use webrtc::interceptor::registry::Registry;
14 use webrtc::peer_connection::configuration::RTCConfiguration;
15 use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
16 use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
17 use webrtc::peer_connection::RTCPeerConnection;
18 use webrtc::rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndication;
19 use webrtc::rtp_transceiver::rtp_codec::RTPCodecType;
20 use webrtc::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection;
21 use webrtc::rtp_transceiver::RTCRtpTransceiverInit;
22 use webrtc::util::Marshal;
23 
24 pub type Result<T> = std::result::Result<T, WebRTCError>;
25 
handle_whip( offer: RTCSessionDescription, sender: PacketDataSender, ) -> Result<(RTCSessionDescription, Arc<RTCPeerConnection>)>26 pub async fn handle_whip(
27     offer: RTCSessionDescription,
28     sender: PacketDataSender,
29 ) -> Result<(RTCSessionDescription, Arc<RTCPeerConnection>)> {
30     // Create a MediaEngine object to configure the supported codec
31     let mut m = MediaEngine::default();
32 
33     m.register_default_codecs()?;
34 
35     // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline.
36     // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection`
37     // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry
38     // for each PeerConnection.
39     let mut registry = Registry::new();
40 
41     // Use the default set of Interceptors
42     registry = register_default_interceptors(registry, &mut m)?;
43 
44     // Create the API object with the MediaEngine
45     let api = APIBuilder::new()
46         .with_media_engine(m)
47         .with_interceptor_registry(registry)
48         .build();
49 
50     // Prepare the configuration
51     let config = RTCConfiguration {
52         ice_servers: vec![RTCIceServer {
53             urls: vec!["stun:stun.l.google.com:19302".to_owned()],
54             ..Default::default()
55         }],
56         ..Default::default()
57     };
58 
59     // Create a new RTCPeerConnection
60     let peer_connection = Arc::new(api.new_peer_connection(config).await?);
61 
62     // Allow us to receive 1 audio track, and 1 video track
63     peer_connection
64         .add_transceiver_from_kind(
65             RTPCodecType::Audio,
66             Some(RTCRtpTransceiverInit {
67                 direction: RTCRtpTransceiverDirection::Recvonly,
68                 send_encodings: Vec::new(),
69             }),
70         )
71         .await?;
72     peer_connection
73         .add_transceiver_from_kind(
74             RTPCodecType::Video,
75             Some(RTCRtpTransceiverInit {
76                 direction: RTCRtpTransceiverDirection::Recvonly,
77                 send_encodings: Vec::new(),
78             }),
79         )
80         .await?;
81 
82     // Set a handler for when a new remote track starts, this handler will forward data to
83     // our UDP listeners.
84     // In your application this is where you would handle/process audio/video
85     let pc = Arc::downgrade(&peer_connection);
86     peer_connection.on_track(Box::new(move |track, _, _| {
87         // Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
88         let media_ssrc = track.ssrc();
89         let pc2 = pc.clone();
90         tokio::spawn(async move {
91             let mut result = Result::<usize>::Ok(0);
92             while result.is_ok() {
93                 let timeout = tokio::time::sleep(Duration::from_secs(3));
94                 tokio::pin!(timeout);
95 
96                 tokio::select! {
97                     _ = timeout.as_mut() =>{
98                         if let Some(pc) = pc2.upgrade(){
99                             result = pc.write_rtcp(&[Box::new(PictureLossIndication{
100                                 sender_ssrc: 0,
101                                 media_ssrc,
102                             })]).await.map_err(Into::into);
103                         }else{
104                             break;
105                         }
106                     }
107                 };
108             }
109         });
110         let sender_clone = sender.clone();
111 
112         tokio::spawn(async move {
113             let mut b = vec![0u8; 3000];
114 
115             while let Ok((rtp_packet, _)) = track.read(&mut b).await {
116                 // Update the PayloadType
117                 //rtp_packet.header.payload_type = c.payload_type;
118 
119                 // Marshal into original buffer with updated PayloadType
120 
121                 let n = rtp_packet.marshal_to(&mut b)?;
122 
123                 match rtp_packet.header.payload_type {
124                     //video h264
125                     96 => {
126                         let video_packet = PacketData::Video {
127                             timestamp: rtp_packet.header.timestamp,
128                             data: BytesMut::from(&b[..n]),
129                         };
130                         if let Err(err) = sender_clone.send(video_packet) {
131                             log::error!("send video packet error: {}", err);
132                         }
133                     }
134                     //aac
135                     97 | 111 => {
136                         let audio_packet = PacketData::Audio {
137                             timestamp: rtp_packet.header.timestamp,
138                             data: BytesMut::from(&b[..n]),
139                         };
140                         if let Err(err) = sender_clone.send(audio_packet) {
141                             log::error!("send audio packet error: {}", err);
142                         }
143                     }
144                     _ => {}
145                 }
146             }
147 
148             Result::<()>::Ok(())
149         });
150 
151         Box::pin(async {})
152     }));
153 
154     // Set the handler for ICE connection state
155     // This will notify you when the peer has connected/disconnected
156     peer_connection.on_ice_connection_state_change(Box::new(
157         move |connection_state: RTCIceConnectionState| {
158             log::info!("Connection State has changed {connection_state}");
159             if connection_state == RTCIceConnectionState::Connected {
160                 log::info!("Ctrl+C the remote client to stop the demo");
161             }
162             Box::pin(async {})
163         },
164     ));
165 
166     // Set the handler for Peer connection state
167     // This will notify you when the peer has connected/disconnected
168 
169     peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
170         log::info!("Peer Connection State has changed: {s}");
171 
172         if s == RTCPeerConnectionState::Failed {
173             // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
174             // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
175             // Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
176             println!("Peer Connection has gone to failed exiting: Done forwarding");
177         }
178 
179         Box::pin(async {})
180     }));
181 
182     // Set the remote SessionDescription
183     peer_connection.set_remote_description(offer).await?;
184 
185     // Create an answer
186     let answer = peer_connection.create_answer(None).await?;
187 
188     // Create channel that is blocked until ICE Gathering is complete
189     let mut gather_complete = peer_connection.gathering_complete_promise().await;
190 
191     // Sets the LocalDescription, and starts our UDP listeners
192     peer_connection.set_local_description(answer).await?;
193 
194     // Block until ICE Gathering is complete, disabling trickle ICE
195     // we do this because we only can exchange one signaling message
196     // in a production application you should exchange ICE Candidates via OnICECandidate
197     let _ = gather_complete.recv().await;
198 
199     // Output the answer in base64 so we can paste it in browser
200     if let Some(local_desc) = peer_connection.local_description().await {
201         Ok((local_desc, peer_connection))
202     } else {
203         Err(WebRTCError {
204             value: WebRTCErrorValue::CanNotGetLocalDescription,
205         })
206     }
207 }
208