xref: /xiu/protocol/httpflv/src/server.rs (revision 69de9bbd)
1 use {
2     super::httpflv::HttpFlv,
3     futures::channel::mpsc::unbounded,
4     hyper::{
5         server::conn::AddrStream,
6         service::{make_service_fn, service_fn},
7         Body, Request, Response, Server, StatusCode,
8     },
9     std::net::SocketAddr,
10     streamhub::define::StreamHubEventSender,
11 };
12 
13 type GenericError = Box<dyn std::error::Error + Send + Sync>;
14 type Result<T> = std::result::Result<T, GenericError>;
15 static NOTFOUND: &[u8] = b"Not Found";
16 
handle_connection( req: Request<Body>, event_producer: StreamHubEventSender, remote_addr: SocketAddr, ) -> Result<Response<Body>>17 async fn handle_connection(
18     req: Request<Body>,
19     event_producer: StreamHubEventSender, // event_producer: ChannelEventProducer
20     remote_addr: SocketAddr,
21 ) -> Result<Response<Body>> {
22     let path = req.uri().path();
23 
24     match path.find(".flv") {
25         Some(index) if index > 0 => {
26             let (left, _) = path.split_at(index);
27             let rv: Vec<_> = left.split('/').collect();
28 
29             let app_name = String::from(rv[1]);
30             let stream_name = String::from(rv[2]);
31 
32             let (http_response_data_producer, http_response_data_consumer) = unbounded();
33 
34             let mut flv_hanlder = HttpFlv::new(
35                 app_name,
36                 stream_name,
37                 event_producer,
38                 http_response_data_producer,
39                 req.uri().to_string(),
40                 remote_addr,
41             );
42 
43             tokio::spawn(async move {
44                 if let Err(err) = flv_hanlder.run().await {
45                     log::error!("flv handler run error {}", err);
46                 }
47             });
48 
49             let mut resp = Response::new(Body::wrap_stream(http_response_data_consumer));
50             resp.headers_mut()
51                 .insert("Access-Control-Allow-Origin", "*".parse().unwrap());
52 
53             Ok(resp)
54         }
55 
56         _ => Ok(Response::builder()
57             .status(StatusCode::NOT_FOUND)
58             .body(NOTFOUND.into())
59             .unwrap()),
60     }
61 }
62 
run(event_producer: StreamHubEventSender, port: usize) -> Result<()>63 pub async fn run(event_producer: StreamHubEventSender, port: usize) -> Result<()> {
64     let listen_address = format!("0.0.0.0:{port}");
65     let sock_addr = listen_address.parse().unwrap();
66 
67     let new_service = make_service_fn(move |socket: &AddrStream| {
68         let remote_addr = socket.remote_addr();
69         let flv_copy = event_producer.clone();
70         async move {
71             Ok::<_, GenericError>(service_fn(move |req| {
72                 handle_connection(req, flv_copy.clone(), remote_addr)
73             }))
74         }
75     });
76 
77     let server = Server::bind(&sock_addr).serve(new_service);
78 
79     log::info!("Httpflv server listening on http://{}", sock_addr);
80 
81     server.await?;
82 
83     Ok(())
84 }
85