xref: /xiu/protocol/httpflv/src/server.rs (revision 69de9bbd)
1e05ab47bSHarlanC use {
2a3d19cccSHarlanC     super::httpflv::HttpFlv,
3a3d19cccSHarlanC     futures::channel::mpsc::unbounded,
4a3d19cccSHarlanC     hyper::{
5976f65a6SHarlan         server::conn::AddrStream,
6a3d19cccSHarlanC         service::{make_service_fn, service_fn},
7a3d19cccSHarlanC         Body, Request, Response, Server, StatusCode,
8e05ab47bSHarlanC     },
9976f65a6SHarlan     std::net::SocketAddr,
108e71d710SHarlan     streamhub::define::StreamHubEventSender,
11e05ab47bSHarlanC };
12e05ab47bSHarlanC 
135377b641SHarlanC type GenericError = Box<dyn std::error::Error + Send + Sync>;
140c504437SHarlanC type Result<T> = std::result::Result<T, GenericError>;
155377b641SHarlanC static NOTFOUND: &[u8] = b"Not Found";
16e05ab47bSHarlanC 
handle_connection( req: Request<Body>, event_producer: StreamHubEventSender, remote_addr: SocketAddr, ) -> Result<Response<Body>>17e05ab47bSHarlanC async fn handle_connection(
18e05ab47bSHarlanC     req: Request<Body>,
198e71d710SHarlan     event_producer: StreamHubEventSender, // event_producer: ChannelEventProducer
20976f65a6SHarlan     remote_addr: SocketAddr,
21e05ab47bSHarlanC ) -> Result<Response<Body>> {
2287d8387fSHarlanC     let path = req.uri().path();
230c504437SHarlanC 
2487d8387fSHarlanC     match path.find(".flv") {
2587d8387fSHarlanC         Some(index) if index > 0 => {
2687d8387fSHarlanC             let (left, _) = path.split_at(index);
2785c0af6aSLuca Barbato             let rv: Vec<_> = left.split('/').collect();
28e05ab47bSHarlanC 
2995a688c2SHarlanC             let app_name = String::from(rv[1]);
3095a688c2SHarlanC             let stream_name = String::from(rv[2]);
31e05ab47bSHarlanC 
3295a688c2SHarlanC             let (http_response_data_producer, http_response_data_consumer) = unbounded();
33e05ab47bSHarlanC 
34e05ab47bSHarlanC             let mut flv_hanlder = HttpFlv::new(
35e05ab47bSHarlanC                 app_name,
36e05ab47bSHarlanC                 stream_name,
37e05ab47bSHarlanC                 event_producer,
38e05ab47bSHarlanC                 http_response_data_producer,
39f159b276SHarlan                 req.uri().to_string(),
40976f65a6SHarlan                 remote_addr,
41e05ab47bSHarlanC             );
42e05ab47bSHarlanC 
4395a688c2SHarlanC             tokio::spawn(async move {
4495a688c2SHarlanC                 if let Err(err) = flv_hanlder.run().await {
45*69de9bbdSHarlanC                     log::error!("flv handler run error {}", err);
4695a688c2SHarlanC                 }
4795a688c2SHarlanC             });
48e05ab47bSHarlanC 
4995a688c2SHarlanC             let mut resp = Response::new(Body::wrap_stream(http_response_data_consumer));
5095a688c2SHarlanC             resp.headers_mut()
5195a688c2SHarlanC                 .insert("Access-Control-Allow-Origin", "*".parse().unwrap());
52e05ab47bSHarlanC 
53e05ab47bSHarlanC             Ok(resp)
545377b641SHarlanC         }
5587d8387fSHarlanC 
5687d8387fSHarlanC         _ => Ok(Response::builder()
5787d8387fSHarlanC             .status(StatusCode::NOT_FOUND)
5887d8387fSHarlanC             .body(NOTFOUND.into())
5987d8387fSHarlanC             .unwrap()),
605377b641SHarlanC     }
615377b641SHarlanC }
625377b641SHarlanC 
run(event_producer: StreamHubEventSender, port: usize) -> Result<()>638e71d710SHarlan pub async fn run(event_producer: StreamHubEventSender, port: usize) -> Result<()> {
64bd35295bSHarlan     let listen_address = format!("0.0.0.0:{port}");
650d3b29c8SHarlanC     let sock_addr = listen_address.parse().unwrap();
6687d8387fSHarlanC 
67976f65a6SHarlan     let new_service = make_service_fn(move |socket: &AddrStream| {
68976f65a6SHarlan         let remote_addr = socket.remote_addr();
69e05ab47bSHarlanC         let flv_copy = event_producer.clone();
70976f65a6SHarlan         async move {
71e05ab47bSHarlanC             Ok::<_, GenericError>(service_fn(move |req| {
72976f65a6SHarlan                 handle_connection(req, flv_copy.clone(), remote_addr)
73e05ab47bSHarlanC             }))
74e05ab47bSHarlanC         }
755377b641SHarlanC     });
765377b641SHarlanC 
770d3b29c8SHarlanC     let server = Server::bind(&sock_addr).serve(new_service);
7888325f54SHarlanC 
7988325f54SHarlanC     log::info!("Httpflv server listening on http://{}", sock_addr);
8053a2e033SHarlanC 
81783195d9SHarlanC     server.await?;
820c504437SHarlanC 
83783195d9SHarlanC     Ok(())
845377b641SHarlanC }
85