1 use { 2 super::httpflv::HttpFlv, 3 futures::channel::mpsc::unbounded, 4 hyper::{ 5 service::{make_service_fn, service_fn}, 6 Body, Request, Response, Server, StatusCode, 7 }, 8 rtmp::channels::define::ChannelEventProducer, 9 }; 10 11 type GenericError = Box<dyn std::error::Error + Send + Sync>; 12 type Result<T> = std::result::Result<T, GenericError>; 13 static NOTFOUND: &[u8] = b"Not Found"; 14 15 async fn handle_connection( 16 req: Request<Body>, 17 event_producer: ChannelEventProducer, // event_producer: ChannelEventProducer 18 ) -> Result<Response<Body>> { 19 let path = req.uri().path(); 20 21 match path.find(".flv") { 22 Some(index) if index > 0 => { 23 let (left, _) = path.split_at(index); 24 let rv: Vec<_> = left.split("/").collect(); 25 26 let app_name = String::from(rv[1]); 27 let stream_name = String::from(rv[2]); 28 29 let (http_response_data_producer, http_response_data_consumer) = unbounded(); 30 31 let mut flv_hanlder = HttpFlv::new( 32 app_name, 33 stream_name, 34 event_producer, 35 http_response_data_producer, 36 ); 37 38 tokio::spawn(async move { 39 if let Err(err) = flv_hanlder.run().await { 40 log::error!("flv handler run error {}\n", err); 41 } 42 }); 43 44 let mut resp = Response::new(Body::wrap_stream(http_response_data_consumer)); 45 resp.headers_mut() 46 .insert("Access-Control-Allow-Origin", "*".parse().unwrap()); 47 48 Ok(resp) 49 } 50 51 _ => Ok(Response::builder() 52 .status(StatusCode::NOT_FOUND) 53 .body(NOTFOUND.into()) 54 .unwrap()), 55 } 56 } 57 58 pub async fn run(event_producer: ChannelEventProducer, port: u32) -> Result<()> { 59 let listen_address = format!("0.0.0.0:{}", port); 60 let sock_addr = listen_address.parse().unwrap(); 61 62 let new_service = make_service_fn(move |_| { 63 let flv_copy = event_producer.clone(); 64 async { 65 Ok::<_, GenericError>(service_fn(move |req| { 66 handle_connection(req, flv_copy.clone()) 67 })) 68 } 69 }); 70 71 let server = Server::bind(&sock_addr).serve(new_service); 72 73 log::info!("Httpflv server listening on http://{}", sock_addr); 74 75 server.await?; 76 77 Ok(()) 78 } 79