1 use { 2 super::httpflv::HttpFlv, 3 futures::channel::mpsc::unbounded, 4 hyper::{ 5 server::conn::AddrStream, 6 service::{make_service_fn, service_fn}, 7 Body, Request, Response, Server, StatusCode, 8 }, 9 rtmp::channels::define::ChannelEventProducer, 10 std::net::SocketAddr, 11 }; 12 13 type GenericError = Box<dyn std::error::Error + Send + Sync>; 14 type Result<T> = std::result::Result<T, GenericError>; 15 static NOTFOUND: &[u8] = b"Not Found"; 16 17 async fn handle_connection( 18 req: Request<Body>, 19 event_producer: ChannelEventProducer, // event_producer: ChannelEventProducer 20 remote_addr: SocketAddr, 21 ) -> Result<Response<Body>> { 22 let path = req.uri().path(); 23 24 match path.find(".flv") { 25 Some(index) if index > 0 => { 26 let (left, _) = path.split_at(index); 27 let rv: Vec<_> = left.split('/').collect(); 28 29 let app_name = String::from(rv[1]); 30 let stream_name = String::from(rv[2]); 31 32 let (http_response_data_producer, http_response_data_consumer) = unbounded(); 33 34 let mut flv_hanlder = HttpFlv::new( 35 app_name, 36 stream_name, 37 event_producer, 38 http_response_data_producer, 39 req.uri().to_string(), 40 remote_addr, 41 ); 42 43 tokio::spawn(async move { 44 if let Err(err) = flv_hanlder.run().await { 45 log::error!("flv handler run error {}\n", err); 46 } 47 }); 48 49 let mut resp = Response::new(Body::wrap_stream(http_response_data_consumer)); 50 resp.headers_mut() 51 .insert("Access-Control-Allow-Origin", "*".parse().unwrap()); 52 53 Ok(resp) 54 } 55 56 _ => Ok(Response::builder() 57 .status(StatusCode::NOT_FOUND) 58 .body(NOTFOUND.into()) 59 .unwrap()), 60 } 61 } 62 63 pub async fn run(event_producer: ChannelEventProducer, port: usize) -> Result<()> { 64 let listen_address = format!("0.0.0.0:{port}"); 65 let sock_addr = listen_address.parse().unwrap(); 66 67 let new_service = make_service_fn(move |socket: &AddrStream| { 68 let remote_addr = socket.remote_addr(); 69 let flv_copy = event_producer.clone(); 70 async move { 71 Ok::<_, GenericError>(service_fn(move |req| { 72 handle_connection(req, flv_copy.clone(), remote_addr) 73 })) 74 } 75 }); 76 77 let server = Server::bind(&sock_addr).serve(new_service); 78 79 log::info!("Httpflv server listening on http://{}", sock_addr); 80 81 server.await?; 82 83 Ok(()) 84 } 85