1 use {
2 super::httpflv::HttpFlv,
3 futures::channel::mpsc::unbounded,
4 hyper::{
5 server::conn::AddrStream,
6 service::{make_service_fn, service_fn},
7 Body, Request, Response, Server, StatusCode,
8 },
9 std::net::SocketAddr,
10 streamhub::define::StreamHubEventSender,
11 };
12
13 type GenericError = Box<dyn std::error::Error + Send + Sync>;
14 type Result<T> = std::result::Result<T, GenericError>;
15 static NOTFOUND: &[u8] = b"Not Found";
16
handle_connection( req: Request<Body>, event_producer: StreamHubEventSender, remote_addr: SocketAddr, ) -> Result<Response<Body>>17 async fn handle_connection(
18 req: Request<Body>,
19 event_producer: StreamHubEventSender, // event_producer: ChannelEventProducer
20 remote_addr: SocketAddr,
21 ) -> Result<Response<Body>> {
22 let path = req.uri().path();
23
24 match path.find(".flv") {
25 Some(index) if index > 0 => {
26 let (left, _) = path.split_at(index);
27 let rv: Vec<_> = left.split('/').collect();
28
29 let app_name = String::from(rv[1]);
30 let stream_name = String::from(rv[2]);
31
32 let (http_response_data_producer, http_response_data_consumer) = unbounded();
33
34 let mut flv_hanlder = HttpFlv::new(
35 app_name,
36 stream_name,
37 event_producer,
38 http_response_data_producer,
39 req.uri().to_string(),
40 remote_addr,
41 );
42
43 tokio::spawn(async move {
44 if let Err(err) = flv_hanlder.run().await {
45 log::error!("flv handler run error {}", err);
46 }
47 });
48
49 let mut resp = Response::new(Body::wrap_stream(http_response_data_consumer));
50 resp.headers_mut()
51 .insert("Access-Control-Allow-Origin", "*".parse().unwrap());
52
53 Ok(resp)
54 }
55
56 _ => Ok(Response::builder()
57 .status(StatusCode::NOT_FOUND)
58 .body(NOTFOUND.into())
59 .unwrap()),
60 }
61 }
62
run(event_producer: StreamHubEventSender, port: usize) -> Result<()>63 pub async fn run(event_producer: StreamHubEventSender, port: usize) -> Result<()> {
64 let listen_address = format!("0.0.0.0:{port}");
65 let sock_addr = listen_address.parse().unwrap();
66
67 let new_service = make_service_fn(move |socket: &AddrStream| {
68 let remote_addr = socket.remote_addr();
69 let flv_copy = event_producer.clone();
70 async move {
71 Ok::<_, GenericError>(service_fn(move |req| {
72 handle_connection(req, flv_copy.clone(), remote_addr)
73 }))
74 }
75 });
76
77 let server = Server::bind(&sock_addr).serve(new_service);
78
79 log::info!("Httpflv server listening on http://{}", sock_addr);
80
81 server.await?;
82
83 Ok(())
84 }
85