xref: /xiu/protocol/httpflv/src/server.rs (revision a3d19ccc)
1 use {
2     super::httpflv::HttpFlv,
3     futures::channel::mpsc::unbounded,
4     hyper::{
5         service::{make_service_fn, service_fn},
6         Body, Request, Response, Server, StatusCode,
7     },
8     rtmp::channels::define::ChannelEventProducer,
9 };
10 
11 type GenericError = Box<dyn std::error::Error + Send + Sync>;
12 type Result<T> = std::result::Result<T, GenericError>;
13 static NOTFOUND: &[u8] = b"Not Found";
14 
15 async fn handle_connection(
16     req: Request<Body>,
17     event_producer: ChannelEventProducer, // event_producer: ChannelEventProducer
18 ) -> Result<Response<Body>> {
19     let path = req.uri().path();
20 
21     match path.find(".flv") {
22         Some(index) if index > 0 => {
23             println!("{}: {}", index, path);
24             let (left, _) = path.split_at(index);
25             println!("11{}: {}", index, left);
26             let rv: Vec<_> = left.split("/").collect();
27             for s in rv.clone() {
28                 println!("22{}: {}", index, s);
29             }
30 
31             let app_name = String::from(rv[1]);
32             let stream_name = String::from(rv[2]);
33 
34             let (http_response_data_producer, http_response_data_consumer) = unbounded();
35 
36             let mut flv_hanlder = HttpFlv::new(
37                 app_name,
38                 stream_name,
39                 event_producer,
40                 http_response_data_producer,
41             );
42 
43             tokio::spawn(async move {
44                 if let Err(err) = flv_hanlder.run().await {
45                     log::error!("pull client error {}\n", err);
46                 }
47             });
48 
49             let mut resp = Response::new(Body::wrap_stream(http_response_data_consumer));
50 
51             resp.headers_mut()
52                 .insert("Access-Control-Allow-Origin", "*".parse().unwrap());
53 
54             Ok(resp)
55         }
56 
57         _ => Ok(Response::builder()
58             .status(StatusCode::NOT_FOUND)
59             .body(NOTFOUND.into())
60             .unwrap()),
61     }
62 }
63 
64 pub async fn run(event_producer: ChannelEventProducer, port: u32) -> Result<()> {
65     let listen_address = format!("0.0.0.0:{}", port);
66     let sock_addr = listen_address.parse().unwrap();
67 
68     let new_service = make_service_fn(move |_| {
69         let flv_copy = event_producer.clone();
70         async {
71             Ok::<_, GenericError>(service_fn(move |req| {
72                 handle_connection(req, flv_copy.clone())
73             }))
74         }
75     });
76 
77     let server = Server::bind(&sock_addr).serve(new_service);
78     println!("Listening on http://{}", sock_addr);
79     log::info!("Listening on http://{}", sock_addr);
80 
81     server.await?;
82 
83     Ok(())
84 }
85