xref: /xiu/protocol/httpflv/src/server.rs (revision 53a2e033)
1 use std::ops::Index;
2 
3 use bytes::BytesMut;
4 // use super::errors::ServerError;
5 use hyper::service::{make_service_fn, service_fn};
6 use hyper::{header, Body, Method, Request, Response, Server, StatusCode};
7 type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
8 use super::define::HttpResponseDataConsumer;
9 use super::define::HttpResponseDataProducer;
10 use super::httpflv::HttpFlv;
11 use futures_util::{stream, StreamExt};
12 use networkio::bytes_writer::BytesWriter;
13 use std::io;
14 
15 use futures::{channel::mpsc::unbounded, task::SpawnExt, SinkExt, Stream}; // 0.3.1, features = ["thread-pool"]
16 
17 use {
18     crate::rtmp::channels::define::{
19         ChannelData, ChannelDataConsumer, ChannelDataProducer, ChannelEvent, ChannelEventProducer,
20     },
21     networkio::networkio::NetworkIO,
22     std::{sync::Arc, time::Duration},
23 };
24 
25 type GenericError = Box<dyn std::error::Error + Send + Sync>;
26 type Result<T> = std::result::Result<T, GenericError>;
27 static NOTFOUND: &[u8] = b"Not Found";
28 
29 async fn handle_connection(
30     req: Request<Body>,
31     event_producer: ChannelEventProducer, // event_producer: ChannelEventProducer
32 ) -> Result<Response<Body>> {
33     let path = req.uri().path();
34 
35     match path.find(".flv") {
36         Some(index) if index > 0 => {
37             println!("{}: {}", index, path);
38             let (left, _) = path.split_at(index);
39             println!("11{}: {}", index, left);
40             let rv: Vec<_> = left.split("/").collect();
41             for s in rv.clone() {
42                 println!("22{}: {}", index, s);
43             }
44 
45             let app_name = String::from(rv[1]);
46             let stream_name = String::from(rv[2]);
47 
48             let (http_response_data_producer, http_response_data_consumer) = unbounded();
49 
50             let mut flv_hanlder = HttpFlv::new(
51                 app_name,
52                 stream_name,
53                 event_producer,
54                 http_response_data_producer,
55             );
56 
57             tokio::spawn(async move {
58                 if let Err(err) = flv_hanlder.run().await {
59                     print!("pull client error {}\n", err);
60                 }
61             });
62 
63             let mut resp = Response::new(Body::wrap_stream(http_response_data_consumer));
64 
65             resp.headers_mut()
66                 .insert("Access-Control-Allow-Origin", "*".parse().unwrap());
67 
68             Ok(resp)
69         }
70 
71         _ => Ok(Response::builder()
72             .status(StatusCode::NOT_FOUND)
73             .body(NOTFOUND.into())
74             .unwrap()),
75     }
76 }
77 
78 pub async fn run(event_producer: ChannelEventProducer, port: u32) -> Result<()> {
79     env_logger::try_init();
80 
81     let listen_address = format!("0.0.0.0:{}", port);
82     let sock_addr = listen_address.parse().unwrap();
83 
84     let new_service = make_service_fn(move |_| {
85         let flv_copy = event_producer.clone();
86         async {
87             Ok::<_, GenericError>(service_fn(move |req| {
88                 handle_connection(req, flv_copy.clone())
89             }))
90         }
91     });
92 
93     let server = Server::bind(&sock_addr).serve(new_service);
94     println!("Listening on http://{}", sock_addr);
95     log::info!("Listening on http://{}", sock_addr);
96 
97     server.await?;
98 
99     Ok(())
100 }
101