1e05ab47bSHarlanC use {
2a3d19cccSHarlanC super::httpflv::HttpFlv,
3a3d19cccSHarlanC futures::channel::mpsc::unbounded,
4a3d19cccSHarlanC hyper::{
5976f65a6SHarlan server::conn::AddrStream,
6a3d19cccSHarlanC service::{make_service_fn, service_fn},
7a3d19cccSHarlanC Body, Request, Response, Server, StatusCode,
8e05ab47bSHarlanC },
9976f65a6SHarlan std::net::SocketAddr,
108e71d710SHarlan streamhub::define::StreamHubEventSender,
11e05ab47bSHarlanC };
12e05ab47bSHarlanC
135377b641SHarlanC type GenericError = Box<dyn std::error::Error + Send + Sync>;
140c504437SHarlanC type Result<T> = std::result::Result<T, GenericError>;
155377b641SHarlanC static NOTFOUND: &[u8] = b"Not Found";
16e05ab47bSHarlanC
handle_connection( req: Request<Body>, event_producer: StreamHubEventSender, remote_addr: SocketAddr, ) -> Result<Response<Body>>17e05ab47bSHarlanC async fn handle_connection(
18e05ab47bSHarlanC req: Request<Body>,
198e71d710SHarlan event_producer: StreamHubEventSender, // event_producer: ChannelEventProducer
20976f65a6SHarlan remote_addr: SocketAddr,
21e05ab47bSHarlanC ) -> Result<Response<Body>> {
2287d8387fSHarlanC let path = req.uri().path();
230c504437SHarlanC
2487d8387fSHarlanC match path.find(".flv") {
2587d8387fSHarlanC Some(index) if index > 0 => {
2687d8387fSHarlanC let (left, _) = path.split_at(index);
2785c0af6aSLuca Barbato let rv: Vec<_> = left.split('/').collect();
28e05ab47bSHarlanC
2995a688c2SHarlanC let app_name = String::from(rv[1]);
3095a688c2SHarlanC let stream_name = String::from(rv[2]);
31e05ab47bSHarlanC
3295a688c2SHarlanC let (http_response_data_producer, http_response_data_consumer) = unbounded();
33e05ab47bSHarlanC
34e05ab47bSHarlanC let mut flv_hanlder = HttpFlv::new(
35e05ab47bSHarlanC app_name,
36e05ab47bSHarlanC stream_name,
37e05ab47bSHarlanC event_producer,
38e05ab47bSHarlanC http_response_data_producer,
39f159b276SHarlan req.uri().to_string(),
40976f65a6SHarlan remote_addr,
41e05ab47bSHarlanC );
42e05ab47bSHarlanC
4395a688c2SHarlanC tokio::spawn(async move {
4495a688c2SHarlanC if let Err(err) = flv_hanlder.run().await {
45*69de9bbdSHarlanC log::error!("flv handler run error {}", err);
4695a688c2SHarlanC }
4795a688c2SHarlanC });
48e05ab47bSHarlanC
4995a688c2SHarlanC let mut resp = Response::new(Body::wrap_stream(http_response_data_consumer));
5095a688c2SHarlanC resp.headers_mut()
5195a688c2SHarlanC .insert("Access-Control-Allow-Origin", "*".parse().unwrap());
52e05ab47bSHarlanC
53e05ab47bSHarlanC Ok(resp)
545377b641SHarlanC }
5587d8387fSHarlanC
5687d8387fSHarlanC _ => Ok(Response::builder()
5787d8387fSHarlanC .status(StatusCode::NOT_FOUND)
5887d8387fSHarlanC .body(NOTFOUND.into())
5987d8387fSHarlanC .unwrap()),
605377b641SHarlanC }
615377b641SHarlanC }
625377b641SHarlanC
run(event_producer: StreamHubEventSender, port: usize) -> Result<()>638e71d710SHarlan pub async fn run(event_producer: StreamHubEventSender, port: usize) -> Result<()> {
64bd35295bSHarlan let listen_address = format!("0.0.0.0:{port}");
650d3b29c8SHarlanC let sock_addr = listen_address.parse().unwrap();
6687d8387fSHarlanC
67976f65a6SHarlan let new_service = make_service_fn(move |socket: &AddrStream| {
68976f65a6SHarlan let remote_addr = socket.remote_addr();
69e05ab47bSHarlanC let flv_copy = event_producer.clone();
70976f65a6SHarlan async move {
71e05ab47bSHarlanC Ok::<_, GenericError>(service_fn(move |req| {
72976f65a6SHarlan handle_connection(req, flv_copy.clone(), remote_addr)
73e05ab47bSHarlanC }))
74e05ab47bSHarlanC }
755377b641SHarlanC });
765377b641SHarlanC
770d3b29c8SHarlanC let server = Server::bind(&sock_addr).serve(new_service);
7888325f54SHarlanC
7988325f54SHarlanC log::info!("Httpflv server listening on http://{}", sock_addr);
8053a2e033SHarlanC
81783195d9SHarlanC server.await?;
820c504437SHarlanC
83783195d9SHarlanC Ok(())
845377b641SHarlanC }
85