1 pub mod define; 2 pub mod errors; 3 use super::rtsp_codec; 4 use crate::global_trait::Marshal; 5 use crate::global_trait::Unmarshal; 6 use crate::http::RtspResponse; 7 use crate::rtp::define::ANNEXB_NALU_START_CODE; 8 use crate::rtp::utils::Marshal as RtpMarshal; 9 10 use crate::rtp::RtpPacket; 11 use crate::rtsp_range::RtspRange; 12 13 use crate::sdp::fmtp::Fmtp; 14 15 use crate::rtsp_codec::RtspCodecInfo; 16 use crate::rtsp_track::RtspTrack; 17 use crate::rtsp_track::TrackType; 18 use crate::rtsp_transport::ProtocolType; 19 use crate::rtsp_transport::RtspTransport; 20 21 use byteorder::BigEndian; 22 use bytes::BytesMut; 23 use bytesio::bytes_reader::BytesReader; 24 use bytesio::bytes_writer::AsyncBytesWriter; 25 26 use bytesio::bytes_writer::BytesWriter; 27 use bytesio::bytesio::UdpIO; 28 use errors::SessionError; 29 use errors::SessionErrorValue; 30 use http::StatusCode; 31 use streamhub::define::DataSender; 32 use streamhub::define::MediaInfo; 33 use streamhub::define::VideoCodecType; 34 use tokio::sync::oneshot; 35 36 use super::http::RtspRequest; 37 use super::rtp::errors::UnPackerError; 38 use super::sdp::Sdp; 39 40 use async_trait::async_trait; 41 use bytesio::bytesio::TNetIO; 42 use bytesio::bytesio::TcpIO; 43 use define::rtsp_method_name; 44 45 use std::collections::HashMap; 46 use std::sync::Arc; 47 use tokio::sync::mpsc; 48 49 use streamhub::{ 50 define::{ 51 FrameData, Information, InformationSender, NotifyInfo, PublishType, PublisherInfo, 52 StreamHubEvent, StreamHubEventSender, SubscribeType, SubscriberInfo, TStreamHandler, 53 }, 54 errors::{ChannelError, ChannelErrorValue}, 55 statistics::StreamStatistics, 56 stream::StreamIdentifier, 57 utils::{RandomDigitCount, Uuid}, 58 }; 59 use tokio::net::TcpStream; 60 use tokio::sync::Mutex; 61 62 pub struct RtspServerSession { 63 io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, 64 reader: BytesReader, 65 writer: AsyncBytesWriter, 66 67 tracks: HashMap<TrackType, RtspTrack>, 68 sdp: Sdp, 69 pub session_id: Option<Uuid>, 70 71 stream_handler: Arc<RtspStreamHandler>, 72 event_producer: StreamHubEventSender, 73 } 74 75 pub struct InterleavedBinaryData { 76 channel_identifier: u8, 77 length: u16, 78 } 79 80 impl InterleavedBinaryData { 81 // 10.12 Embedded (Interleaved) Binary Data 82 // Stream data such as RTP packets is encapsulated by an ASCII dollar 83 // sign (24 hexadecimal), followed by a one-byte channel identifier, 84 // followed by the length of the encapsulated binary data as a binary, 85 // two-byte integer in network byte order new(reader: &mut BytesReader) -> Result<Option<Self>, SessionError>86 fn new(reader: &mut BytesReader) -> Result<Option<Self>, SessionError> { 87 let is_dollar_sign = reader.advance_u8()? == 0x24; 88 log::debug!("dollar sign: {}", is_dollar_sign); 89 if is_dollar_sign { 90 reader.read_u8()?; 91 let channel_identifier = reader.read_u8()?; 92 log::debug!("channel_identifier: {}", channel_identifier); 93 let length = reader.read_u16::<BigEndian>()?; 94 log::debug!("length: {}", length); 95 return Ok(Some(InterleavedBinaryData { 96 channel_identifier, 97 length, 98 })); 99 } 100 Ok(None) 101 } 102 } 103 104 impl RtspServerSession { new(stream: TcpStream, event_producer: StreamHubEventSender) -> Self105 pub fn new(stream: TcpStream, event_producer: StreamHubEventSender) -> Self { 106 // let remote_addr = if let Ok(addr) = stream.peer_addr() { 107 // log::info!("server session: {}", addr.to_string()); 108 // Some(addr) 109 // } else { 110 // None 111 // }; 112 113 let net_io: Box<dyn TNetIO + Send + Sync> = Box::new(TcpIO::new(stream)); 114 let io = Arc::new(Mutex::new(net_io)); 115 116 Self { 117 io: io.clone(), 118 reader: BytesReader::new(BytesMut::default()), 119 writer: AsyncBytesWriter::new(io), 120 tracks: HashMap::new(), 121 sdp: Sdp::default(), 122 session_id: None, 123 event_producer, 124 stream_handler: Arc::new(RtspStreamHandler::new()), 125 } 126 } 127 run(&mut self) -> Result<(), SessionError>128 pub async fn run(&mut self) -> Result<(), SessionError> { 129 loop { 130 while self.reader.len() < 4 { 131 let data = self.io.lock().await.read().await?; 132 self.reader.extend_from_slice(&data[..]); 133 } 134 135 if let Ok(data) = InterleavedBinaryData::new(&mut self.reader) { 136 match data { 137 Some(a) => { 138 if self.reader.len() < a.length as usize { 139 let data = self.io.lock().await.read().await?; 140 self.reader.extend_from_slice(&data[..]); 141 } 142 self.on_rtp_over_rtsp_message(a.channel_identifier, a.length as usize) 143 .await?; 144 } 145 None => { 146 self.on_rtsp_message().await?; 147 } 148 } 149 } 150 } 151 } 152 on_rtp_over_rtsp_message( &mut self, channel_identifier: u8, length: usize, ) -> Result<(), SessionError>153 async fn on_rtp_over_rtsp_message( 154 &mut self, 155 channel_identifier: u8, 156 length: usize, 157 ) -> Result<(), SessionError> { 158 let mut cur_reader = BytesReader::new(self.reader.read_bytes(length)?); 159 160 for track in self.tracks.values_mut() { 161 if let Some(interleaveds) = track.transport.interleaved { 162 let rtp_identifier = interleaveds[0]; 163 let rtcp_identifier = interleaveds[1]; 164 165 if channel_identifier == rtp_identifier { 166 track.on_rtp(&mut cur_reader).await?; 167 } else if channel_identifier == rtcp_identifier { 168 track.on_rtcp(&mut cur_reader, self.io.clone()).await; 169 } 170 } 171 } 172 Ok(()) 173 } 174 175 //publish stream: OPTIONS->ANNOUNCE->SETUP->RECORD->TEARDOWN 176 //subscribe stream: OPTIONS->DESCRIBE->SETUP->PLAY->TEARDOWN on_rtsp_message(&mut self) -> Result<(), SessionError>177 async fn on_rtsp_message(&mut self) -> Result<(), SessionError> { 178 let data = self.reader.extract_remaining_bytes(); 179 180 if let Some(rtsp_request) = RtspRequest::unmarshal(std::str::from_utf8(&data)?) { 181 match rtsp_request.method.as_str() { 182 rtsp_method_name::OPTIONS => { 183 self.handle_options(&rtsp_request).await?; 184 } 185 rtsp_method_name::DESCRIBE => { 186 self.handle_describe(&rtsp_request).await?; 187 } 188 rtsp_method_name::ANNOUNCE => { 189 self.handle_announce(&rtsp_request).await?; 190 } 191 rtsp_method_name::SETUP => { 192 self.handle_setup(&rtsp_request).await?; 193 } 194 rtsp_method_name::PLAY => { 195 if self.handle_play(&rtsp_request).await.is_err() { 196 self.unsubscribe_from_stream_hub(rtsp_request.path)?; 197 } 198 } 199 rtsp_method_name::RECORD => { 200 self.handle_record(&rtsp_request).await?; 201 } 202 rtsp_method_name::TEARDOWN => { 203 self.handle_teardown(&rtsp_request)?; 204 } 205 rtsp_method_name::PAUSE => {} 206 rtsp_method_name::GET_PARAMETER => {} 207 rtsp_method_name::SET_PARAMETER => {} 208 rtsp_method_name::REDIRECT => {} 209 210 _ => {} 211 } 212 } 213 214 Ok(()) 215 } 216 handle_options(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError>217 async fn handle_options(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> { 218 let status_code = http::StatusCode::OK; 219 let mut response = Self::gen_response(status_code, rtsp_request); 220 let public_str = rtsp_method_name::ARRAY.join(","); 221 response.headers.insert("Public".to_string(), public_str); 222 self.send_response(&response).await?; 223 224 Ok(()) 225 } 226 handle_describe(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError>227 async fn handle_describe(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> { 228 let status_code = http::StatusCode::OK; 229 230 // The sender is used for sending sdp information from the server session to client session 231 // receiver is used to receive the sdp information 232 let (sender, mut receiver) = mpsc::unbounded_channel(); 233 234 let request_event = StreamHubEvent::Request { 235 identifier: StreamIdentifier::Rtsp { 236 stream_path: rtsp_request.path.clone(), 237 }, 238 sender, 239 }; 240 241 if self.event_producer.send(request_event).is_err() { 242 return Err(SessionError { 243 value: SessionErrorValue::StreamHubEventSendErr, 244 }); 245 } 246 247 if let Some(Information::Sdp { data }) = receiver.recv().await { 248 if let Some(sdp) = Sdp::unmarshal(&data) { 249 self.sdp = sdp; 250 //it can new tracks when get the sdp information; 251 self.new_tracks()?; 252 } 253 } 254 255 let mut response = Self::gen_response(status_code, rtsp_request); 256 let sdp = self.sdp.marshal(); 257 log::debug!("sdp: {}", sdp); 258 response.body = Some(sdp); 259 response 260 .headers 261 .insert("Content-Type".to_string(), "application/sdp".to_string()); 262 self.send_response(&response).await?; 263 264 Ok(()) 265 } 266 handle_announce(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError>267 async fn handle_announce(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> { 268 if let Some(request_body) = &rtsp_request.body { 269 if let Some(sdp) = Sdp::unmarshal(request_body) { 270 self.sdp = sdp.clone(); 271 self.stream_handler.set_sdp(sdp).await; 272 } 273 } 274 275 //new tracks for publish session 276 self.new_tracks()?; 277 278 let (event_result_sender, event_result_receiver) = oneshot::channel(); 279 280 let publish_event = StreamHubEvent::Publish { 281 identifier: StreamIdentifier::Rtsp { 282 stream_path: rtsp_request.path.clone(), 283 }, 284 result_sender: event_result_sender, 285 info: self.get_publisher_info(), 286 stream_handler: self.stream_handler.clone(), 287 }; 288 289 if self.event_producer.send(publish_event).is_err() { 290 return Err(SessionError { 291 value: SessionErrorValue::StreamHubEventSendErr, 292 }); 293 } 294 295 let sender = event_result_receiver.await??.0.unwrap(); 296 297 for track in self.tracks.values_mut() { 298 let sender_out = sender.clone(); 299 let mut rtp_channel_guard = track.rtp_channel.lock().await; 300 301 rtp_channel_guard.on_frame_handler(Box::new( 302 move |msg: FrameData| -> Result<(), UnPackerError> { 303 if let Err(err) = sender_out.send(msg) { 304 log::error!("send frame error: {}", err); 305 } 306 Ok(()) 307 }, 308 )); 309 310 let rtcp_channel = Arc::clone(&track.rtcp_channel); 311 rtp_channel_guard.on_packet_for_rtcp_handler(Box::new(move |packet: RtpPacket| { 312 let rtcp_channel_in = Arc::clone(&rtcp_channel); 313 Box::pin(async move { 314 rtcp_channel_in.lock().await.on_packet(packet); 315 }) 316 })); 317 } 318 319 let status_code = http::StatusCode::OK; 320 let response = Self::gen_response(status_code, rtsp_request); 321 self.send_response(&response).await?; 322 323 Ok(()) 324 } 325 handle_setup(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError>326 async fn handle_setup(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> { 327 let status_code = http::StatusCode::OK; 328 let mut response = Self::gen_response(status_code, rtsp_request); 329 330 for track in self.tracks.values_mut() { 331 if !rtsp_request.url.contains(&track.media_control) { 332 continue; 333 } 334 335 if let Some(transport_data) = rtsp_request.get_header(&"Transport".to_string()) { 336 if self.session_id.is_none() { 337 self.session_id = Some(Uuid::new(RandomDigitCount::Zero)); 338 } 339 340 let transport = RtspTransport::unmarshal(transport_data); 341 342 if let Some(mut trans) = transport { 343 let mut rtp_server_port: Option<u16> = None; 344 let mut rtcp_server_port: Option<u16> = None; 345 346 match trans.protocol_type { 347 ProtocolType::TCP => { 348 track.create_packer(self.io.clone()).await; 349 } 350 ProtocolType::UDP => { 351 let (rtp_port, rtcp_port) = 352 if let Some(client_ports) = trans.client_port { 353 (client_ports[0], client_ports[1]) 354 } else { 355 log::error!("should not be here!!"); 356 (0, 0) 357 }; 358 359 let address = rtsp_request.address.clone(); 360 if let Some(rtp_io) = UdpIO::new(address.clone(), rtp_port, 0).await { 361 rtp_server_port = rtp_io.get_local_port(); 362 363 let box_udp_io: Box<dyn TNetIO + Send + Sync> = Box::new(rtp_io); 364 //if mode is empty then it is a player session. 365 if trans.transport_mod.is_none() { 366 track.create_packer(Arc::new(Mutex::new(box_udp_io))).await; 367 } else { 368 track.rtp_receive_loop(box_udp_io).await; 369 } 370 } 371 372 if let Some(rtcp_io) = 373 UdpIO::new(address.clone(), rtcp_port, rtp_server_port.unwrap() + 1) 374 .await 375 { 376 rtcp_server_port = rtcp_io.get_local_port(); 377 let box_rtcp_io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>> = 378 Arc::new(Mutex::new(Box::new(rtcp_io))); 379 track.rtcp_receive_loop(box_rtcp_io).await; 380 } 381 } 382 } 383 384 //tell client the udp ports of server side 385 let mut server_ports: [u16; 2] = [0, 0]; 386 if let Some(rtp_port) = rtp_server_port { 387 server_ports[0] = rtp_port; 388 } 389 if let Some(rtcp_server_port) = rtcp_server_port { 390 server_ports[1] = rtcp_server_port; 391 trans.server_port = Some(server_ports); 392 } 393 394 let new_transport_data = trans.marshal(); 395 response 396 .headers 397 .insert("Transport".to_string(), new_transport_data); 398 response 399 .headers 400 .insert("Session".to_string(), self.session_id.unwrap().to_string()); 401 402 track.set_transport(trans).await; 403 } 404 } 405 break; 406 } 407 408 self.send_response(&response).await?; 409 410 Ok(()) 411 } 412 handle_play(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError>413 async fn handle_play(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> { 414 for track in self.tracks.values_mut() { 415 let protocol_type = track.transport.protocol_type.clone(); 416 417 match protocol_type { 418 ProtocolType::TCP => { 419 let channel_identifer = if let Some(interleaveds) = track.transport.interleaved 420 { 421 interleaveds[0] 422 } else { 423 log::error!("handle_play:should not be here!!!"); 424 0 425 }; 426 427 track.rtp_channel.lock().await.on_packet_handler(Box::new( 428 move |io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, packet: RtpPacket| { 429 Box::pin(async move { 430 let msg = packet.marshal()?; 431 let mut bytes_writer = AsyncBytesWriter::new(io); 432 bytes_writer.write_u8(0x24)?; 433 bytes_writer.write_u8(channel_identifer)?; 434 bytes_writer.write_u16::<BigEndian>(msg.len() as u16)?; 435 bytes_writer.write(&msg)?; 436 bytes_writer.flush().await?; 437 Ok(()) 438 }) 439 }, 440 )); 441 } 442 ProtocolType::UDP => { 443 track.rtp_channel.lock().await.on_packet_handler(Box::new( 444 move |io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, packet: RtpPacket| { 445 Box::pin(async move { 446 let mut bytes_writer = AsyncBytesWriter::new(io); 447 448 let msg = packet.marshal()?; 449 bytes_writer.write(&msg)?; 450 bytes_writer.flush().await?; 451 Ok(()) 452 }) 453 }, 454 )); 455 } 456 } 457 } 458 459 let status_code = http::StatusCode::OK; 460 let response = Self::gen_response(status_code, rtsp_request); 461 462 self.send_response(&response).await?; 463 464 let (event_result_sender, event_result_receiver) = oneshot::channel(); 465 466 let publish_event = StreamHubEvent::Subscribe { 467 identifier: StreamIdentifier::Rtsp { 468 stream_path: rtsp_request.path.clone(), 469 }, 470 info: self.get_subscriber_info(), 471 result_sender: event_result_sender, 472 }; 473 474 if self.event_producer.send(publish_event).is_err() { 475 return Err(SessionError { 476 value: SessionErrorValue::StreamHubEventSendErr, 477 }); 478 } 479 480 let mut receiver = event_result_receiver.await??.frame_receiver.unwrap(); 481 482 let mut retry_times = 0; 483 loop { 484 if let Some(frame_data) = receiver.recv().await { 485 match frame_data { 486 FrameData::Audio { 487 timestamp, 488 mut data, 489 } => { 490 if let Some(audio_track) = self.tracks.get_mut(&TrackType::Audio) { 491 audio_track 492 .rtp_channel 493 .lock() 494 .await 495 .on_frame(&mut data, timestamp) 496 .await?; 497 } 498 } 499 FrameData::Video { 500 timestamp, 501 mut data, 502 } => { 503 if let Some(video_track) = self.tracks.get_mut(&TrackType::Video) { 504 video_track 505 .rtp_channel 506 .lock() 507 .await 508 .on_frame(&mut data, timestamp) 509 .await?; 510 } 511 } 512 _ => {} 513 } 514 } else { 515 retry_times += 1; 516 log::info!( 517 "send_channel_data: no data receives ,retry {} times!", 518 retry_times 519 ); 520 521 if retry_times > 10 { 522 return Err(SessionError { 523 value: SessionErrorValue::CannotReceiveFrameData, 524 }); 525 } 526 } 527 } 528 } 529 unsubscribe_from_stream_hub(&mut self, stream_path: String) -> Result<(), SessionError>530 pub fn unsubscribe_from_stream_hub(&mut self, stream_path: String) -> Result<(), SessionError> { 531 let identifier = StreamIdentifier::Rtsp { stream_path }; 532 533 let subscribe_event = StreamHubEvent::UnSubscribe { 534 identifier, 535 info: self.get_subscriber_info(), 536 }; 537 if let Err(err) = self.event_producer.send(subscribe_event) { 538 log::error!("unsubscribe_from_stream_hub err {}", err); 539 } 540 541 Ok(()) 542 } 543 handle_record(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError>544 async fn handle_record(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> { 545 if let Some(range_str) = rtsp_request.headers.get(&String::from("Range")) { 546 if let Some(range) = RtspRange::unmarshal(range_str) { 547 let status_code = http::StatusCode::OK; 548 let mut response = Self::gen_response(status_code, rtsp_request); 549 response 550 .headers 551 .insert(String::from("Range"), range.marshal()); 552 response 553 .headers 554 .insert("Session".to_string(), self.session_id.unwrap().to_string()); 555 556 self.send_response(&response).await?; 557 } 558 } 559 560 Ok(()) 561 } 562 handle_teardown(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError>563 fn handle_teardown(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> { 564 let stream_path = &rtsp_request.path; 565 let unpublish_event = StreamHubEvent::UnPublish { 566 identifier: StreamIdentifier::Rtsp { 567 stream_path: stream_path.clone(), 568 }, 569 info: self.get_publisher_info(), 570 }; 571 572 let rv = self.event_producer.send(unpublish_event); 573 match rv { 574 Err(_) => { 575 log::error!("unpublish_to_channels error.stream_name: {}", stream_path); 576 Err(SessionError { 577 value: SessionErrorValue::StreamHubEventSendErr, 578 }) 579 } 580 Ok(()) => { 581 log::info!( 582 "unpublish_to_channels successfully.stream name: {}", 583 stream_path 584 ); 585 Ok(()) 586 } 587 } 588 } 589 new_tracks(&mut self) -> Result<(), SessionError>590 fn new_tracks(&mut self) -> Result<(), SessionError> { 591 for media in &self.sdp.medias { 592 let media_control = if let Some(media_control_val) = media.attributes.get("control") { 593 media_control_val.clone() 594 } else { 595 String::from("") 596 }; 597 598 let media_name = &media.media_type; 599 log::info!("media_name: {}", media_name); 600 match media_name.as_str() { 601 "audio" => { 602 let codec_id = rtsp_codec::RTSP_CODEC_NAME_2_ID 603 .get(&media.rtpmap.encoding_name.to_lowercase().as_str()) 604 .unwrap() 605 .clone(); 606 let codec_info = RtspCodecInfo { 607 codec_id, 608 payload_type: media.rtpmap.payload_type as u8, 609 sample_rate: media.rtpmap.clock_rate, 610 channel_count: media.rtpmap.encoding_param.parse().unwrap(), 611 }; 612 613 log::info!("audio codec info: {:?}", codec_info); 614 615 let track = RtspTrack::new(TrackType::Audio, codec_info, media_control); 616 self.tracks.insert(TrackType::Audio, track); 617 } 618 "video" => { 619 let codec_id = rtsp_codec::RTSP_CODEC_NAME_2_ID 620 .get(&media.rtpmap.encoding_name.to_lowercase().as_str()) 621 .unwrap() 622 .clone(); 623 let codec_info = RtspCodecInfo { 624 codec_id, 625 payload_type: media.rtpmap.payload_type as u8, 626 sample_rate: media.rtpmap.clock_rate, 627 ..Default::default() 628 }; 629 let track = RtspTrack::new(TrackType::Video, codec_info, media_control); 630 self.tracks.insert(TrackType::Video, track); 631 } 632 _ => {} 633 } 634 } 635 Ok(()) 636 } 637 gen_response(status_code: StatusCode, rtsp_request: &RtspRequest) -> RtspResponse638 fn gen_response(status_code: StatusCode, rtsp_request: &RtspRequest) -> RtspResponse { 639 let reason_phrase = if let Some(reason) = status_code.canonical_reason() { 640 reason.to_string() 641 } else { 642 "".to_string() 643 }; 644 645 let mut response = RtspResponse { 646 version: "RTSP/1.0".to_string(), 647 status_code: status_code.as_u16(), 648 reason_phrase, 649 ..Default::default() 650 }; 651 652 if let Some(cseq) = rtsp_request.headers.get("CSeq") { 653 response 654 .headers 655 .insert("CSeq".to_string(), cseq.to_string()); 656 } 657 658 response 659 } 660 get_subscriber_info(&mut self) -> SubscriberInfo661 fn get_subscriber_info(&mut self) -> SubscriberInfo { 662 let id = if let Some(session_id) = &self.session_id { 663 *session_id 664 } else { 665 Uuid::new(RandomDigitCount::Zero) 666 }; 667 668 SubscriberInfo { 669 id, 670 sub_type: SubscribeType::PlayerRtsp, 671 sub_data_type: streamhub::define::SubDataType::Frame, 672 notify_info: NotifyInfo { 673 request_url: String::from(""), 674 remote_addr: String::from(""), 675 }, 676 } 677 } 678 get_publisher_info(&mut self) -> PublisherInfo679 fn get_publisher_info(&mut self) -> PublisherInfo { 680 let id = if let Some(session_id) = &self.session_id { 681 *session_id 682 } else { 683 Uuid::new(RandomDigitCount::Zero) 684 }; 685 686 PublisherInfo { 687 id, 688 pub_type: PublishType::PushRtsp, 689 pub_data_type: streamhub::define::PubDataType::Frame, 690 notify_info: NotifyInfo { 691 request_url: String::from(""), 692 remote_addr: String::from(""), 693 }, 694 } 695 } 696 send_response(&mut self, response: &RtspResponse) -> Result<(), SessionError>697 async fn send_response(&mut self, response: &RtspResponse) -> Result<(), SessionError> { 698 self.writer.write(response.marshal().as_bytes())?; 699 self.writer.flush().await?; 700 701 Ok(()) 702 } 703 } 704 705 #[derive(Default)] 706 pub struct RtspStreamHandler { 707 sdp: Mutex<Sdp>, 708 } 709 710 impl RtspStreamHandler { new() -> Self711 pub fn new() -> Self { 712 Self { 713 sdp: Mutex::new(Sdp::default()), 714 } 715 } set_sdp(&self, sdp: Sdp)716 pub async fn set_sdp(&self, sdp: Sdp) { 717 *self.sdp.lock().await = sdp; 718 } 719 } 720 721 #[async_trait] 722 impl TStreamHandler for RtspStreamHandler { send_prior_data( &self, data_sender: DataSender, sub_type: SubscribeType, ) -> Result<(), ChannelError>723 async fn send_prior_data( 724 &self, 725 data_sender: DataSender, 726 sub_type: SubscribeType, 727 ) -> Result<(), ChannelError> { 728 let sender = match data_sender { 729 DataSender::Frame { sender } => sender, 730 DataSender::Packet { sender: _ } => { 731 return Err(ChannelError { 732 value: ChannelErrorValue::NotCorrectDataSenderType, 733 }); 734 } 735 }; 736 match sub_type { 737 SubscribeType::PlayerRtmp => { 738 let sdp_info = self.sdp.lock().await; 739 let mut video_clock_rate: u32 = 0; 740 let mut audio_clock_rate: u32 = 0; 741 742 let mut vcodec: VideoCodecType = VideoCodecType::H264; 743 744 for media in &sdp_info.medias { 745 let mut bytes_writer = BytesWriter::new(); 746 if let Some(fmtp) = &media.fmtp { 747 match fmtp { 748 Fmtp::H264(data) => { 749 bytes_writer.write(&ANNEXB_NALU_START_CODE)?; 750 bytes_writer.write(&data.sps)?; 751 bytes_writer.write(&ANNEXB_NALU_START_CODE)?; 752 bytes_writer.write(&data.pps)?; 753 754 let frame_data = FrameData::Video { 755 timestamp: 0, 756 data: bytes_writer.extract_current_bytes(), 757 }; 758 if let Err(err) = sender.send(frame_data) { 759 log::error!("send sps/pps error: {}", err); 760 } 761 video_clock_rate = media.rtpmap.clock_rate; 762 } 763 Fmtp::H265(data) => { 764 bytes_writer.write(&ANNEXB_NALU_START_CODE)?; 765 bytes_writer.write(&data.sps)?; 766 bytes_writer.write(&ANNEXB_NALU_START_CODE)?; 767 bytes_writer.write(&data.pps)?; 768 bytes_writer.write(&ANNEXB_NALU_START_CODE)?; 769 bytes_writer.write(&data.vps)?; 770 771 let frame_data = FrameData::Video { 772 timestamp: 0, 773 data: bytes_writer.extract_current_bytes(), 774 }; 775 if let Err(err) = sender.send(frame_data) { 776 log::error!("send sps/pps/vps error: {}", err); 777 } 778 779 vcodec = VideoCodecType::H265; 780 } 781 Fmtp::Mpeg4(data) => { 782 let frame_data = FrameData::Audio { 783 timestamp: 0, 784 data: data.asc.clone(), 785 }; 786 787 if let Err(err) = sender.send(frame_data) { 788 log::error!("send asc error: {}", err); 789 } 790 791 audio_clock_rate = media.rtpmap.clock_rate; 792 } 793 } 794 } 795 } 796 797 if let Err(err) = sender.send(FrameData::MediaInfo { 798 media_info: MediaInfo { 799 audio_clock_rate, 800 video_clock_rate, 801 802 vcodec, 803 }, 804 }) { 805 log::error!("send media info error: {}", err); 806 } 807 } 808 SubscribeType::PlayerHls => {} 809 _ => {} 810 } 811 812 Ok(()) 813 } get_statistic_data(&self) -> Option<StreamStatistics>814 async fn get_statistic_data(&self) -> Option<StreamStatistics> { 815 None 816 } 817 send_information(&self, sender: InformationSender)818 async fn send_information(&self, sender: InformationSender) { 819 if let Err(err) = sender.send(Information::Sdp { 820 data: self.sdp.lock().await.marshal(), 821 }) { 822 log::error!("send_information of rtsp error: {}", err); 823 } 824 } 825 } 826