xref: /xiu/protocol/rtsp/src/session/mod.rs (revision a4ef5d6c)
1 pub mod define;
2 pub mod errors;
3 use super::rtsp_codec;
4 use crate::global_trait::Marshal;
5 use crate::global_trait::Unmarshal;
6 use crate::http::RtspResponse;
7 use crate::rtp::define::ANNEXB_NALU_START_CODE;
8 use crate::rtp::utils::Marshal as RtpMarshal;
9 
10 use crate::rtp::RtpPacket;
11 use crate::rtsp_range::RtspRange;
12 
13 use crate::sdp::fmtp::Fmtp;
14 
15 use crate::rtsp_codec::RtspCodecInfo;
16 use crate::rtsp_track::RtspTrack;
17 use crate::rtsp_track::TrackType;
18 use crate::rtsp_transport::ProtocolType;
19 use crate::rtsp_transport::RtspTransport;
20 
21 use byteorder::BigEndian;
22 use bytes::BytesMut;
23 use bytesio::bytes_reader::BytesReader;
24 use bytesio::bytes_writer::AsyncBytesWriter;
25 
26 use bytesio::bytes_writer::BytesWriter;
27 use bytesio::bytesio::UdpIO;
28 use errors::SessionError;
29 use errors::SessionErrorValue;
30 use http::StatusCode;
31 use streamhub::define::DataSender;
32 use streamhub::define::MediaInfo;
33 use streamhub::define::VideoCodecType;
34 use tokio::sync::oneshot;
35 
36 use super::http::RtspRequest;
37 use super::rtp::errors::UnPackerError;
38 use super::sdp::Sdp;
39 
40 use async_trait::async_trait;
41 use bytesio::bytesio::TNetIO;
42 use bytesio::bytesio::TcpIO;
43 use define::rtsp_method_name;
44 
45 use std::collections::HashMap;
46 use std::sync::Arc;
47 use tokio::sync::mpsc;
48 
49 use streamhub::{
50     define::{
51         FrameData, Information, InformationSender, NotifyInfo, PublishType, PublisherInfo,
52         StreamHubEvent, StreamHubEventSender, SubscribeType, SubscriberInfo, TStreamHandler,
53     },
54     errors::{ChannelError, ChannelErrorValue},
55     statistics::StreamStatistics,
56     stream::StreamIdentifier,
57     utils::{RandomDigitCount, Uuid},
58 };
59 use tokio::net::TcpStream;
60 use tokio::sync::Mutex;
61 
62 pub struct RtspServerSession {
63     io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>,
64     reader: BytesReader,
65     writer: AsyncBytesWriter,
66 
67     tracks: HashMap<TrackType, RtspTrack>,
68     sdp: Sdp,
69     pub session_id: Option<Uuid>,
70 
71     stream_handler: Arc<RtspStreamHandler>,
72     event_producer: StreamHubEventSender,
73 }
74 
75 pub struct InterleavedBinaryData {
76     channel_identifier: u8,
77     length: u16,
78 }
79 
80 impl InterleavedBinaryData {
81     // 10.12 Embedded (Interleaved) Binary Data
82     // Stream data such as RTP packets is encapsulated by an ASCII dollar
83     // sign (24 hexadecimal), followed by a one-byte channel identifier,
84     // followed by the length of the encapsulated binary data as a binary,
85     // two-byte integer in network byte order
new(reader: &mut BytesReader) -> Result<Option<Self>, SessionError>86     fn new(reader: &mut BytesReader) -> Result<Option<Self>, SessionError> {
87         let is_dollar_sign = reader.advance_u8()? == 0x24;
88         log::debug!("dollar sign: {}", is_dollar_sign);
89         if is_dollar_sign {
90             reader.read_u8()?;
91             let channel_identifier = reader.read_u8()?;
92             log::debug!("channel_identifier: {}", channel_identifier);
93             let length = reader.read_u16::<BigEndian>()?;
94             log::debug!("length: {}", length);
95             return Ok(Some(InterleavedBinaryData {
96                 channel_identifier,
97                 length,
98             }));
99         }
100         Ok(None)
101     }
102 }
103 
104 impl RtspServerSession {
new(stream: TcpStream, event_producer: StreamHubEventSender) -> Self105     pub fn new(stream: TcpStream, event_producer: StreamHubEventSender) -> Self {
106         // let remote_addr = if let Ok(addr) = stream.peer_addr() {
107         //     log::info!("server session: {}", addr.to_string());
108         //     Some(addr)
109         // } else {
110         //     None
111         // };
112 
113         let net_io: Box<dyn TNetIO + Send + Sync> = Box::new(TcpIO::new(stream));
114         let io = Arc::new(Mutex::new(net_io));
115 
116         Self {
117             io: io.clone(),
118             reader: BytesReader::new(BytesMut::default()),
119             writer: AsyncBytesWriter::new(io),
120             tracks: HashMap::new(),
121             sdp: Sdp::default(),
122             session_id: None,
123             event_producer,
124             stream_handler: Arc::new(RtspStreamHandler::new()),
125         }
126     }
127 
run(&mut self) -> Result<(), SessionError>128     pub async fn run(&mut self) -> Result<(), SessionError> {
129         loop {
130             while self.reader.len() < 4 {
131                 let data = self.io.lock().await.read().await?;
132                 self.reader.extend_from_slice(&data[..]);
133             }
134 
135             if let Ok(data) = InterleavedBinaryData::new(&mut self.reader) {
136                 match data {
137                     Some(a) => {
138                         if self.reader.len() < a.length as usize {
139                             let data = self.io.lock().await.read().await?;
140                             self.reader.extend_from_slice(&data[..]);
141                         }
142                         self.on_rtp_over_rtsp_message(a.channel_identifier, a.length as usize)
143                             .await?;
144                     }
145                     None => {
146                         self.on_rtsp_message().await?;
147                     }
148                 }
149             }
150         }
151     }
152 
on_rtp_over_rtsp_message( &mut self, channel_identifier: u8, length: usize, ) -> Result<(), SessionError>153     async fn on_rtp_over_rtsp_message(
154         &mut self,
155         channel_identifier: u8,
156         length: usize,
157     ) -> Result<(), SessionError> {
158         let mut cur_reader = BytesReader::new(self.reader.read_bytes(length)?);
159 
160         for track in self.tracks.values_mut() {
161             if let Some(interleaveds) = track.transport.interleaved {
162                 let rtp_identifier = interleaveds[0];
163                 let rtcp_identifier = interleaveds[1];
164 
165                 if channel_identifier == rtp_identifier {
166                     track.on_rtp(&mut cur_reader).await?;
167                 } else if channel_identifier == rtcp_identifier {
168                     track.on_rtcp(&mut cur_reader, self.io.clone()).await;
169                 }
170             }
171         }
172         Ok(())
173     }
174 
175     //publish stream: OPTIONS->ANNOUNCE->SETUP->RECORD->TEARDOWN
176     //subscribe stream: OPTIONS->DESCRIBE->SETUP->PLAY->TEARDOWN
on_rtsp_message(&mut self) -> Result<(), SessionError>177     async fn on_rtsp_message(&mut self) -> Result<(), SessionError> {
178         let data = self.reader.extract_remaining_bytes();
179 
180         if let Some(rtsp_request) = RtspRequest::unmarshal(std::str::from_utf8(&data)?) {
181             match rtsp_request.method.as_str() {
182                 rtsp_method_name::OPTIONS => {
183                     self.handle_options(&rtsp_request).await?;
184                 }
185                 rtsp_method_name::DESCRIBE => {
186                     self.handle_describe(&rtsp_request).await?;
187                 }
188                 rtsp_method_name::ANNOUNCE => {
189                     self.handle_announce(&rtsp_request).await?;
190                 }
191                 rtsp_method_name::SETUP => {
192                     self.handle_setup(&rtsp_request).await?;
193                 }
194                 rtsp_method_name::PLAY => {
195                     if self.handle_play(&rtsp_request).await.is_err() {
196                         self.unsubscribe_from_stream_hub(rtsp_request.path)?;
197                     }
198                 }
199                 rtsp_method_name::RECORD => {
200                     self.handle_record(&rtsp_request).await?;
201                 }
202                 rtsp_method_name::TEARDOWN => {
203                     self.handle_teardown(&rtsp_request)?;
204                 }
205                 rtsp_method_name::PAUSE => {}
206                 rtsp_method_name::GET_PARAMETER => {}
207                 rtsp_method_name::SET_PARAMETER => {}
208                 rtsp_method_name::REDIRECT => {}
209 
210                 _ => {}
211             }
212         }
213 
214         Ok(())
215     }
216 
handle_options(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError>217     async fn handle_options(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> {
218         let status_code = http::StatusCode::OK;
219         let mut response = Self::gen_response(status_code, rtsp_request);
220         let public_str = rtsp_method_name::ARRAY.join(",");
221         response.headers.insert("Public".to_string(), public_str);
222         self.send_response(&response).await?;
223 
224         Ok(())
225     }
226 
handle_describe(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError>227     async fn handle_describe(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> {
228         let status_code = http::StatusCode::OK;
229 
230         // The sender is used for sending sdp information from the server session to client session
231         // receiver is used to receive the sdp information
232         let (sender, mut receiver) = mpsc::unbounded_channel();
233 
234         let request_event = StreamHubEvent::Request {
235             identifier: StreamIdentifier::Rtsp {
236                 stream_path: rtsp_request.path.clone(),
237             },
238             sender,
239         };
240 
241         if self.event_producer.send(request_event).is_err() {
242             return Err(SessionError {
243                 value: SessionErrorValue::StreamHubEventSendErr,
244             });
245         }
246 
247         if let Some(Information::Sdp { data }) = receiver.recv().await {
248             if let Some(sdp) = Sdp::unmarshal(&data) {
249                 self.sdp = sdp;
250                 //it can new tracks when get the sdp information;
251                 self.new_tracks()?;
252             }
253         }
254 
255         let mut response = Self::gen_response(status_code, rtsp_request);
256         let sdp = self.sdp.marshal();
257         log::debug!("sdp: {}", sdp);
258         response.body = Some(sdp);
259         response
260             .headers
261             .insert("Content-Type".to_string(), "application/sdp".to_string());
262         self.send_response(&response).await?;
263 
264         Ok(())
265     }
266 
handle_announce(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError>267     async fn handle_announce(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> {
268         if let Some(request_body) = &rtsp_request.body {
269             if let Some(sdp) = Sdp::unmarshal(request_body) {
270                 self.sdp = sdp.clone();
271                 self.stream_handler.set_sdp(sdp).await;
272             }
273         }
274 
275         //new tracks for publish session
276         self.new_tracks()?;
277 
278         let (event_result_sender, event_result_receiver) = oneshot::channel();
279 
280         let publish_event = StreamHubEvent::Publish {
281             identifier: StreamIdentifier::Rtsp {
282                 stream_path: rtsp_request.path.clone(),
283             },
284             result_sender: event_result_sender,
285             info: self.get_publisher_info(),
286             stream_handler: self.stream_handler.clone(),
287         };
288 
289         if self.event_producer.send(publish_event).is_err() {
290             return Err(SessionError {
291                 value: SessionErrorValue::StreamHubEventSendErr,
292             });
293         }
294 
295         let sender = event_result_receiver.await??.0.unwrap();
296 
297         for track in self.tracks.values_mut() {
298             let sender_out = sender.clone();
299             let mut rtp_channel_guard = track.rtp_channel.lock().await;
300 
301             rtp_channel_guard.on_frame_handler(Box::new(
302                 move |msg: FrameData| -> Result<(), UnPackerError> {
303                     if let Err(err) = sender_out.send(msg) {
304                         log::error!("send frame error: {}", err);
305                     }
306                     Ok(())
307                 },
308             ));
309 
310             let rtcp_channel = Arc::clone(&track.rtcp_channel);
311             rtp_channel_guard.on_packet_for_rtcp_handler(Box::new(move |packet: RtpPacket| {
312                 let rtcp_channel_in = Arc::clone(&rtcp_channel);
313                 Box::pin(async move {
314                     rtcp_channel_in.lock().await.on_packet(packet);
315                 })
316             }));
317         }
318 
319         let status_code = http::StatusCode::OK;
320         let response = Self::gen_response(status_code, rtsp_request);
321         self.send_response(&response).await?;
322 
323         Ok(())
324     }
325 
handle_setup(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError>326     async fn handle_setup(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> {
327         let status_code = http::StatusCode::OK;
328         let mut response = Self::gen_response(status_code, rtsp_request);
329 
330         for track in self.tracks.values_mut() {
331             if !rtsp_request.url.contains(&track.media_control) {
332                 continue;
333             }
334 
335             if let Some(transport_data) = rtsp_request.get_header(&"Transport".to_string()) {
336                 if self.session_id.is_none() {
337                     self.session_id = Some(Uuid::new(RandomDigitCount::Zero));
338                 }
339 
340                 let transport = RtspTransport::unmarshal(transport_data);
341 
342                 if let Some(mut trans) = transport {
343                     let mut rtp_server_port: Option<u16> = None;
344                     let mut rtcp_server_port: Option<u16> = None;
345 
346                     match trans.protocol_type {
347                         ProtocolType::TCP => {
348                             track.create_packer(self.io.clone()).await;
349                         }
350                         ProtocolType::UDP => {
351                             let (rtp_port, rtcp_port) =
352                                 if let Some(client_ports) = trans.client_port {
353                                     (client_ports[0], client_ports[1])
354                                 } else {
355                                     log::error!("should not be here!!");
356                                     (0, 0)
357                                 };
358 
359                             let address = rtsp_request.address.clone();
360                             if let Some(rtp_io) = UdpIO::new(address.clone(), rtp_port, 0).await {
361                                 rtp_server_port = rtp_io.get_local_port();
362 
363                                 let box_udp_io: Box<dyn TNetIO + Send + Sync> = Box::new(rtp_io);
364                                 //if mode is empty then it is a player session.
365                                 if trans.transport_mod.is_none() {
366                                     track.create_packer(Arc::new(Mutex::new(box_udp_io))).await;
367                                 } else {
368                                     track.rtp_receive_loop(box_udp_io).await;
369                                 }
370                             }
371 
372                             if let Some(rtcp_io) =
373                                 UdpIO::new(address.clone(), rtcp_port, rtp_server_port.unwrap() + 1)
374                                     .await
375                             {
376                                 rtcp_server_port = rtcp_io.get_local_port();
377                                 let box_rtcp_io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>> =
378                                     Arc::new(Mutex::new(Box::new(rtcp_io)));
379                                 track.rtcp_receive_loop(box_rtcp_io).await;
380                             }
381                         }
382                     }
383 
384                     //tell client the udp ports of server side
385                     let mut server_ports: [u16; 2] = [0, 0];
386                     if let Some(rtp_port) = rtp_server_port {
387                         server_ports[0] = rtp_port;
388                     }
389                     if let Some(rtcp_server_port) = rtcp_server_port {
390                         server_ports[1] = rtcp_server_port;
391                         trans.server_port = Some(server_ports);
392                     }
393 
394                     let new_transport_data = trans.marshal();
395                     response
396                         .headers
397                         .insert("Transport".to_string(), new_transport_data);
398                     response
399                         .headers
400                         .insert("Session".to_string(), self.session_id.unwrap().to_string());
401 
402                     track.set_transport(trans).await;
403                 }
404             }
405             break;
406         }
407 
408         self.send_response(&response).await?;
409 
410         Ok(())
411     }
412 
handle_play(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError>413     async fn handle_play(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> {
414         for track in self.tracks.values_mut() {
415             let protocol_type = track.transport.protocol_type.clone();
416 
417             match protocol_type {
418                 ProtocolType::TCP => {
419                     let channel_identifer = if let Some(interleaveds) = track.transport.interleaved
420                     {
421                         interleaveds[0]
422                     } else {
423                         log::error!("handle_play:should not be here!!!");
424                         0
425                     };
426 
427                     track.rtp_channel.lock().await.on_packet_handler(Box::new(
428                         move |io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, packet: RtpPacket| {
429                             Box::pin(async move {
430                                 let msg = packet.marshal()?;
431                                 let mut bytes_writer = AsyncBytesWriter::new(io);
432                                 bytes_writer.write_u8(0x24)?;
433                                 bytes_writer.write_u8(channel_identifer)?;
434                                 bytes_writer.write_u16::<BigEndian>(msg.len() as u16)?;
435                                 bytes_writer.write(&msg)?;
436                                 bytes_writer.flush().await?;
437                                 Ok(())
438                             })
439                         },
440                     ));
441                 }
442                 ProtocolType::UDP => {
443                     track.rtp_channel.lock().await.on_packet_handler(Box::new(
444                         move |io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, packet: RtpPacket| {
445                             Box::pin(async move {
446                                 let mut bytes_writer = AsyncBytesWriter::new(io);
447 
448                                 let msg = packet.marshal()?;
449                                 bytes_writer.write(&msg)?;
450                                 bytes_writer.flush().await?;
451                                 Ok(())
452                             })
453                         },
454                     ));
455                 }
456             }
457         }
458 
459         let status_code = http::StatusCode::OK;
460         let response = Self::gen_response(status_code, rtsp_request);
461 
462         self.send_response(&response).await?;
463 
464         let (event_result_sender, event_result_receiver) = oneshot::channel();
465 
466         let publish_event = StreamHubEvent::Subscribe {
467             identifier: StreamIdentifier::Rtsp {
468                 stream_path: rtsp_request.path.clone(),
469             },
470             info: self.get_subscriber_info(),
471             result_sender: event_result_sender,
472         };
473 
474         if self.event_producer.send(publish_event).is_err() {
475             return Err(SessionError {
476                 value: SessionErrorValue::StreamHubEventSendErr,
477             });
478         }
479 
480         let mut receiver = event_result_receiver.await??.frame_receiver.unwrap();
481 
482         let mut retry_times = 0;
483         loop {
484             if let Some(frame_data) = receiver.recv().await {
485                 match frame_data {
486                     FrameData::Audio {
487                         timestamp,
488                         mut data,
489                     } => {
490                         if let Some(audio_track) = self.tracks.get_mut(&TrackType::Audio) {
491                             audio_track
492                                 .rtp_channel
493                                 .lock()
494                                 .await
495                                 .on_frame(&mut data, timestamp)
496                                 .await?;
497                         }
498                     }
499                     FrameData::Video {
500                         timestamp,
501                         mut data,
502                     } => {
503                         if let Some(video_track) = self.tracks.get_mut(&TrackType::Video) {
504                             video_track
505                                 .rtp_channel
506                                 .lock()
507                                 .await
508                                 .on_frame(&mut data, timestamp)
509                                 .await?;
510                         }
511                     }
512                     _ => {}
513                 }
514             } else {
515                 retry_times += 1;
516                 log::info!(
517                     "send_channel_data: no data receives ,retry {} times!",
518                     retry_times
519                 );
520 
521                 if retry_times > 10 {
522                     return Err(SessionError {
523                         value: SessionErrorValue::CannotReceiveFrameData,
524                     });
525                 }
526             }
527         }
528     }
529 
unsubscribe_from_stream_hub(&mut self, stream_path: String) -> Result<(), SessionError>530     pub fn unsubscribe_from_stream_hub(&mut self, stream_path: String) -> Result<(), SessionError> {
531         let identifier = StreamIdentifier::Rtsp { stream_path };
532 
533         let subscribe_event = StreamHubEvent::UnSubscribe {
534             identifier,
535             info: self.get_subscriber_info(),
536         };
537         if let Err(err) = self.event_producer.send(subscribe_event) {
538             log::error!("unsubscribe_from_stream_hub err {}", err);
539         }
540 
541         Ok(())
542     }
543 
handle_record(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError>544     async fn handle_record(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> {
545         if let Some(range_str) = rtsp_request.headers.get(&String::from("Range")) {
546             if let Some(range) = RtspRange::unmarshal(range_str) {
547                 let status_code = http::StatusCode::OK;
548                 let mut response = Self::gen_response(status_code, rtsp_request);
549                 response
550                     .headers
551                     .insert(String::from("Range"), range.marshal());
552                 response
553                     .headers
554                     .insert("Session".to_string(), self.session_id.unwrap().to_string());
555 
556                 self.send_response(&response).await?;
557             }
558         }
559 
560         Ok(())
561     }
562 
handle_teardown(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError>563     fn handle_teardown(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> {
564         let stream_path = &rtsp_request.path;
565         let unpublish_event = StreamHubEvent::UnPublish {
566             identifier: StreamIdentifier::Rtsp {
567                 stream_path: stream_path.clone(),
568             },
569             info: self.get_publisher_info(),
570         };
571 
572         let rv = self.event_producer.send(unpublish_event);
573         match rv {
574             Err(_) => {
575                 log::error!("unpublish_to_channels error.stream_name: {}", stream_path);
576                 Err(SessionError {
577                     value: SessionErrorValue::StreamHubEventSendErr,
578                 })
579             }
580             Ok(()) => {
581                 log::info!(
582                     "unpublish_to_channels successfully.stream name: {}",
583                     stream_path
584                 );
585                 Ok(())
586             }
587         }
588     }
589 
new_tracks(&mut self) -> Result<(), SessionError>590     fn new_tracks(&mut self) -> Result<(), SessionError> {
591         for media in &self.sdp.medias {
592             let media_control = if let Some(media_control_val) = media.attributes.get("control") {
593                 media_control_val.clone()
594             } else {
595                 String::from("")
596             };
597 
598             let media_name = &media.media_type;
599             log::info!("media_name: {}", media_name);
600             match media_name.as_str() {
601                 "audio" => {
602                     let codec_id = rtsp_codec::RTSP_CODEC_NAME_2_ID
603                         .get(&media.rtpmap.encoding_name.to_lowercase().as_str())
604                         .unwrap()
605                         .clone();
606                     let codec_info = RtspCodecInfo {
607                         codec_id,
608                         payload_type: media.rtpmap.payload_type as u8,
609                         sample_rate: media.rtpmap.clock_rate,
610                         channel_count: media.rtpmap.encoding_param.parse().unwrap(),
611                     };
612 
613                     log::info!("audio codec info: {:?}", codec_info);
614 
615                     let track = RtspTrack::new(TrackType::Audio, codec_info, media_control);
616                     self.tracks.insert(TrackType::Audio, track);
617                 }
618                 "video" => {
619                     let codec_id = rtsp_codec::RTSP_CODEC_NAME_2_ID
620                         .get(&media.rtpmap.encoding_name.to_lowercase().as_str())
621                         .unwrap()
622                         .clone();
623                     let codec_info = RtspCodecInfo {
624                         codec_id,
625                         payload_type: media.rtpmap.payload_type as u8,
626                         sample_rate: media.rtpmap.clock_rate,
627                         ..Default::default()
628                     };
629                     let track = RtspTrack::new(TrackType::Video, codec_info, media_control);
630                     self.tracks.insert(TrackType::Video, track);
631                 }
632                 _ => {}
633             }
634         }
635         Ok(())
636     }
637 
gen_response(status_code: StatusCode, rtsp_request: &RtspRequest) -> RtspResponse638     fn gen_response(status_code: StatusCode, rtsp_request: &RtspRequest) -> RtspResponse {
639         let reason_phrase = if let Some(reason) = status_code.canonical_reason() {
640             reason.to_string()
641         } else {
642             "".to_string()
643         };
644 
645         let mut response = RtspResponse {
646             version: "RTSP/1.0".to_string(),
647             status_code: status_code.as_u16(),
648             reason_phrase,
649             ..Default::default()
650         };
651 
652         if let Some(cseq) = rtsp_request.headers.get("CSeq") {
653             response
654                 .headers
655                 .insert("CSeq".to_string(), cseq.to_string());
656         }
657 
658         response
659     }
660 
get_subscriber_info(&mut self) -> SubscriberInfo661     fn get_subscriber_info(&mut self) -> SubscriberInfo {
662         let id = if let Some(session_id) = &self.session_id {
663             *session_id
664         } else {
665             Uuid::new(RandomDigitCount::Zero)
666         };
667 
668         SubscriberInfo {
669             id,
670             sub_type: SubscribeType::PlayerRtsp,
671             sub_data_type: streamhub::define::SubDataType::Frame,
672             notify_info: NotifyInfo {
673                 request_url: String::from(""),
674                 remote_addr: String::from(""),
675             },
676         }
677     }
678 
get_publisher_info(&mut self) -> PublisherInfo679     fn get_publisher_info(&mut self) -> PublisherInfo {
680         let id = if let Some(session_id) = &self.session_id {
681             *session_id
682         } else {
683             Uuid::new(RandomDigitCount::Zero)
684         };
685 
686         PublisherInfo {
687             id,
688             pub_type: PublishType::PushRtsp,
689             pub_data_type: streamhub::define::PubDataType::Frame,
690             notify_info: NotifyInfo {
691                 request_url: String::from(""),
692                 remote_addr: String::from(""),
693             },
694         }
695     }
696 
send_response(&mut self, response: &RtspResponse) -> Result<(), SessionError>697     async fn send_response(&mut self, response: &RtspResponse) -> Result<(), SessionError> {
698         self.writer.write(response.marshal().as_bytes())?;
699         self.writer.flush().await?;
700 
701         Ok(())
702     }
703 }
704 
705 #[derive(Default)]
706 pub struct RtspStreamHandler {
707     sdp: Mutex<Sdp>,
708 }
709 
710 impl RtspStreamHandler {
new() -> Self711     pub fn new() -> Self {
712         Self {
713             sdp: Mutex::new(Sdp::default()),
714         }
715     }
set_sdp(&self, sdp: Sdp)716     pub async fn set_sdp(&self, sdp: Sdp) {
717         *self.sdp.lock().await = sdp;
718     }
719 }
720 
721 #[async_trait]
722 impl TStreamHandler for RtspStreamHandler {
send_prior_data( &self, data_sender: DataSender, sub_type: SubscribeType, ) -> Result<(), ChannelError>723     async fn send_prior_data(
724         &self,
725         data_sender: DataSender,
726         sub_type: SubscribeType,
727     ) -> Result<(), ChannelError> {
728         let sender = match data_sender {
729             DataSender::Frame { sender } => sender,
730             DataSender::Packet { sender: _ } => {
731                 return Err(ChannelError {
732                     value: ChannelErrorValue::NotCorrectDataSenderType,
733                 });
734             }
735         };
736         match sub_type {
737             SubscribeType::PlayerRtmp => {
738                 let sdp_info = self.sdp.lock().await;
739                 let mut video_clock_rate: u32 = 0;
740                 let mut audio_clock_rate: u32 = 0;
741 
742                 let mut vcodec: VideoCodecType = VideoCodecType::H264;
743 
744                 for media in &sdp_info.medias {
745                     let mut bytes_writer = BytesWriter::new();
746                     if let Some(fmtp) = &media.fmtp {
747                         match fmtp {
748                             Fmtp::H264(data) => {
749                                 bytes_writer.write(&ANNEXB_NALU_START_CODE)?;
750                                 bytes_writer.write(&data.sps)?;
751                                 bytes_writer.write(&ANNEXB_NALU_START_CODE)?;
752                                 bytes_writer.write(&data.pps)?;
753 
754                                 let frame_data = FrameData::Video {
755                                     timestamp: 0,
756                                     data: bytes_writer.extract_current_bytes(),
757                                 };
758                                 if let Err(err) = sender.send(frame_data) {
759                                     log::error!("send sps/pps error: {}", err);
760                                 }
761                                 video_clock_rate = media.rtpmap.clock_rate;
762                             }
763                             Fmtp::H265(data) => {
764                                 bytes_writer.write(&ANNEXB_NALU_START_CODE)?;
765                                 bytes_writer.write(&data.sps)?;
766                                 bytes_writer.write(&ANNEXB_NALU_START_CODE)?;
767                                 bytes_writer.write(&data.pps)?;
768                                 bytes_writer.write(&ANNEXB_NALU_START_CODE)?;
769                                 bytes_writer.write(&data.vps)?;
770 
771                                 let frame_data = FrameData::Video {
772                                     timestamp: 0,
773                                     data: bytes_writer.extract_current_bytes(),
774                                 };
775                                 if let Err(err) = sender.send(frame_data) {
776                                     log::error!("send sps/pps/vps error: {}", err);
777                                 }
778 
779                                 vcodec = VideoCodecType::H265;
780                             }
781                             Fmtp::Mpeg4(data) => {
782                                 let frame_data = FrameData::Audio {
783                                     timestamp: 0,
784                                     data: data.asc.clone(),
785                                 };
786 
787                                 if let Err(err) = sender.send(frame_data) {
788                                     log::error!("send asc error: {}", err);
789                                 }
790 
791                                 audio_clock_rate = media.rtpmap.clock_rate;
792                             }
793                         }
794                     }
795                 }
796 
797                 if let Err(err) = sender.send(FrameData::MediaInfo {
798                     media_info: MediaInfo {
799                         audio_clock_rate,
800                         video_clock_rate,
801 
802                         vcodec,
803                     },
804                 }) {
805                     log::error!("send media info error: {}", err);
806                 }
807             }
808             SubscribeType::PlayerHls => {}
809             _ => {}
810         }
811 
812         Ok(())
813     }
get_statistic_data(&self) -> Option<StreamStatistics>814     async fn get_statistic_data(&self) -> Option<StreamStatistics> {
815         None
816     }
817 
send_information(&self, sender: InformationSender)818     async fn send_information(&self, sender: InformationSender) {
819         if let Err(err) = sender.send(Information::Sdp {
820             data: self.sdp.lock().await.marshal(),
821         }) {
822             log::error!("send_information of rtsp error: {}", err);
823         }
824     }
825 }
826