1 use streamhub::define::StreamHubEventSender; 2 3 use super::session::WebRTCServerSession; 4 5 use super::http::define::http_method_name; 6 use std::collections::HashMap; 7 use std::net::SocketAddr; 8 use std::sync::Arc; 9 use streamhub::utils::Uuid; 10 use tokio::io::Error; 11 use tokio::net::TcpListener; 12 use tokio::sync::Mutex; 13 14 pub struct WebRTCServer { 15 address: String, 16 event_producer: StreamHubEventSender, 17 uuid_2_sessions: Arc<Mutex<HashMap<Uuid, Arc<Mutex<WebRTCServerSession>>>>>, 18 } 19 20 impl WebRTCServer { new(address: String, event_producer: StreamHubEventSender) -> Self21 pub fn new(address: String, event_producer: StreamHubEventSender) -> Self { 22 Self { 23 address, 24 event_producer, 25 uuid_2_sessions: Arc::new(Mutex::new(HashMap::new())), 26 } 27 } 28 run(&mut self) -> Result<(), Error>29 pub async fn run(&mut self) -> Result<(), Error> { 30 let socket_addr: &SocketAddr = &self.address.parse().unwrap(); 31 let listener = TcpListener::bind(socket_addr).await?; 32 33 log::info!("WebRTC server listening on tcp://{}", socket_addr); 34 loop { 35 let (tcp_stream, _) = listener.accept().await?; 36 let session = Arc::new(Mutex::new(WebRTCServerSession::new( 37 tcp_stream, 38 self.event_producer.clone(), 39 ))); 40 let uuid_2_sessions = self.uuid_2_sessions.clone(); 41 tokio::spawn(async move { 42 let mut session_unlock = session.lock().await; 43 if let Err(err) = session_unlock.run(uuid_2_sessions.clone()).await { 44 log::error!("session run error, err: {}", err); 45 } 46 47 if let Some(http_request_data) = &session_unlock.http_request_data { 48 let mut uuid_2_session_unlock = uuid_2_sessions.lock().await; 49 50 match http_request_data.method.as_str() { 51 http_method_name::POST => { 52 if let Some(uuid) = session_unlock.session_id { 53 uuid_2_session_unlock.insert(uuid, session.clone()); 54 } 55 } 56 http_method_name::OPTIONS => {} 57 http_method_name::PATCH => {} 58 http_method_name::DELETE => {} 59 _ => {} 60 } 61 } 62 }); 63 } 64 } 65 } 66