xref: /xiu/protocol/httpflv/src/server.rs (revision e05ab47b)
187d8387fSHarlanC use std::ops::Index;
287d8387fSHarlanC 
3*e05ab47bSHarlanC use bytes::BytesMut;
45377b641SHarlanC // use super::errors::ServerError;
55377b641SHarlanC use hyper::service::{make_service_fn, service_fn};
687d8387fSHarlanC use hyper::{header, Body, Method, Request, Response, Server, StatusCode};
75377b641SHarlanC type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
8*e05ab47bSHarlanC use super::define::HttpResponseDataConsumer;
9*e05ab47bSHarlanC use super::define::HttpResponseDataProducer;
10*e05ab47bSHarlanC use super::httpflv::HttpFlv;
110c504437SHarlanC use futures_util::{stream, StreamExt};
12*e05ab47bSHarlanC use networkio::bytes_writer::BytesWriter;
13*e05ab47bSHarlanC use std::io;
14*e05ab47bSHarlanC use tokio::sync::mpsc;
15*e05ab47bSHarlanC use tokio_util::codec::{BytesCodec, FramedRead};
16*e05ab47bSHarlanC 
17*e05ab47bSHarlanC use tokio_stream::wrappers::UnboundedReceiverStream;
18*e05ab47bSHarlanC 
19*e05ab47bSHarlanC use futures::{task::SpawnExt, SinkExt, Stream}; // 0.3.1, features = ["thread-pool"]
20*e05ab47bSHarlanC 
21*e05ab47bSHarlanC use {
22*e05ab47bSHarlanC     crate::rtmp::channels::define::{
23*e05ab47bSHarlanC         ChannelData, ChannelDataConsumer, ChannelDataProducer, ChannelEvent, ChannelEventProducer,
24*e05ab47bSHarlanC     },
25*e05ab47bSHarlanC     networkio::networkio::NetworkIO,
26*e05ab47bSHarlanC     std::{sync::Arc, time::Duration},
27*e05ab47bSHarlanC     // tokio::{
28*e05ab47bSHarlanC     //     sync::{mpsc, oneshot, Mutex},
29*e05ab47bSHarlanC     //     time::sleep,
30*e05ab47bSHarlanC     // },
31*e05ab47bSHarlanC };
32*e05ab47bSHarlanC 
33*e05ab47bSHarlanC //pub static mut event_producer : ChannelEventProducer ;//
345377b641SHarlanC 
355377b641SHarlanC type GenericError = Box<dyn std::error::Error + Send + Sync>;
365377b641SHarlanC 
370c504437SHarlanC type Result<T> = std::result::Result<T, GenericError>;
380c504437SHarlanC 
395377b641SHarlanC static INDEX: &[u8] = b"<a href=\"test.html\">test.html</a>";
405377b641SHarlanC static INTERNAL_SERVER_ERROR: &[u8] = b"Internal Server Error";
415377b641SHarlanC static NOTFOUND: &[u8] = b"Not Found";
4287d8387fSHarlanC static OK: &[u8] = b"OK";
435377b641SHarlanC static POST_DATA: &str = r#"{"original": "data"}"#;
445377b641SHarlanC static URL: &str = "http://127.0.0.1:1337/json_api";
455377b641SHarlanC 
46783195d9SHarlanC async fn api_get_response() -> Result<Response<Body>> {
47783195d9SHarlanC     let data = vec!["foo", "bar"];
48783195d9SHarlanC     let res = match serde_json::to_string(&data) {
49783195d9SHarlanC         Ok(json) => Response::builder()
505377b641SHarlanC             .header(header::CONTENT_TYPE, "application/json")
51783195d9SHarlanC             .body(Body::from(json))
52783195d9SHarlanC             .unwrap(),
53783195d9SHarlanC         Err(_) => Response::builder()
54783195d9SHarlanC             .status(StatusCode::INTERNAL_SERVER_ERROR)
55783195d9SHarlanC             .body(INTERNAL_SERVER_ERROR.into())
56783195d9SHarlanC             .unwrap(),
57783195d9SHarlanC     };
58783195d9SHarlanC     Ok(res)
595377b641SHarlanC }
605377b641SHarlanC 
61*e05ab47bSHarlanC fn stream(rv: HttpResponseDataConsumer) -> impl Stream<Item = io::Result<BytesMut>> {
62*e05ab47bSHarlanC     rv
63*e05ab47bSHarlanC }
64*e05ab47bSHarlanC 
65*e05ab47bSHarlanC async fn handle_connection(
66*e05ab47bSHarlanC     req: Request<Body>,
67*e05ab47bSHarlanC     event_producer: ChannelEventProducer, // event_producer: ChannelEventProducer
68*e05ab47bSHarlanC ) -> Result<Response<Body>> {
6987d8387fSHarlanC     let path = req.uri().path();
700c504437SHarlanC 
7187d8387fSHarlanC     match path.find(".flv") {
7287d8387fSHarlanC         Some(index) if index > 0 => {
7387d8387fSHarlanC             println!("{}: {}", index, path);
7487d8387fSHarlanC             let (left, _) = path.split_at(index);
7587d8387fSHarlanC             println!("11{}: {}", index, left);
76*e05ab47bSHarlanC             let rv: Vec<_> = left.split("/").collect();
77*e05ab47bSHarlanC             for s in rv.clone() {
7887d8387fSHarlanC                 println!("22{}: {}", index, s);
7987d8387fSHarlanC             }
80*e05ab47bSHarlanC 
81*e05ab47bSHarlanC             let app_name = String::from(rv[0]);
82*e05ab47bSHarlanC             let stream_name = String::from(rv[1]);
83*e05ab47bSHarlanC 
84*e05ab47bSHarlanC             let (http_response_data_producer, http_response_data_consumer) =
85*e05ab47bSHarlanC                 mpsc::unbounded_channel();
86*e05ab47bSHarlanC 
87*e05ab47bSHarlanC             let mut flv_hanlder = HttpFlv::new(
88*e05ab47bSHarlanC                 app_name,
89*e05ab47bSHarlanC                 stream_name,
90*e05ab47bSHarlanC                 event_producer,
91*e05ab47bSHarlanC                 http_response_data_producer,
92*e05ab47bSHarlanC             );
93*e05ab47bSHarlanC 
94*e05ab47bSHarlanC             flv_hanlder.run();
95*e05ab47bSHarlanC 
96*e05ab47bSHarlanC             // Ok(Response::builder()
97*e05ab47bSHarlanC             //     .status(StatusCode::OK)
98*e05ab47bSHarlanC             //     .body(OK.into())
99*e05ab47bSHarlanC             //     .unwrap())
100*e05ab47bSHarlanC 
101*e05ab47bSHarlanC             let stream = UnboundedReceiverStream::new(http_response_data_consumer);
102*e05ab47bSHarlanC 
103*e05ab47bSHarlanC             let resp = Response::new(Body::wrap_stream(stream));
104*e05ab47bSHarlanC 
105*e05ab47bSHarlanC             Ok(resp)
1065377b641SHarlanC         }
10787d8387fSHarlanC 
10887d8387fSHarlanC         _ => Ok(Response::builder()
10987d8387fSHarlanC             .status(StatusCode::NOT_FOUND)
11087d8387fSHarlanC             .body(NOTFOUND.into())
11187d8387fSHarlanC             .unwrap()),
1125377b641SHarlanC     }
1135377b641SHarlanC }
1145377b641SHarlanC 
115*e05ab47bSHarlanC pub async fn run(event_producer: ChannelEventProducer) -> Result<()> {
116783195d9SHarlanC     let addr = "0.0.0.0:13370".parse().unwrap();
11787d8387fSHarlanC 
118*e05ab47bSHarlanC     let new_service = make_service_fn(move |_| {
119*e05ab47bSHarlanC         let flv_copy = event_producer.clone();
120*e05ab47bSHarlanC         async {
121*e05ab47bSHarlanC             Ok::<_, GenericError>(service_fn(move |req| {
122*e05ab47bSHarlanC                 handle_connection(req, flv_copy.clone())
123*e05ab47bSHarlanC             }))
124*e05ab47bSHarlanC         }
1255377b641SHarlanC     });
1265377b641SHarlanC 
127*e05ab47bSHarlanC     // let shared_router = Arc::new(router);
128*e05ab47bSHarlanC     // let new_service = make_service_fn(move |_| {
129*e05ab47bSHarlanC     //     let app_state = AppState {
130*e05ab47bSHarlanC     //         state_thing: some_state.clone(),
131*e05ab47bSHarlanC     //     };
132*e05ab47bSHarlanC 
133*e05ab47bSHarlanC     //     let router_capture = shared_router.clone();
134*e05ab47bSHarlanC     //     async {
135*e05ab47bSHarlanC     //         Ok::<_, Error>(service_fn(move |req| {
136*e05ab47bSHarlanC     //             route(router_capture.clone(), req, app_state.clone())
137*e05ab47bSHarlanC     //         }))
138*e05ab47bSHarlanC     //     }
139*e05ab47bSHarlanC     // });
140*e05ab47bSHarlanC 
1415377b641SHarlanC     let server = Server::bind(&addr).serve(new_service);
1425377b641SHarlanC     println!("Listening on http://{}", addr);
143783195d9SHarlanC     server.await?;
1440c504437SHarlanC 
145*e05ab47bSHarlanC     // let addr = "0.0.0.0:8080".parse().expect("address creation works");
146*e05ab47bSHarlanC     // let server = Server::bind(&addr).serve(new_service);
147*e05ab47bSHarlanC     // println!("Listening on http://{}", addr);
148*e05ab47bSHarlanC     // let _ = server.await;
149*e05ab47bSHarlanC 
150783195d9SHarlanC     Ok(())
1515377b641SHarlanC }
152*e05ab47bSHarlanC 
153*e05ab47bSHarlanC // pub struct HttpFlvServer {}
154*e05ab47bSHarlanC 
155*e05ab47bSHarlanC // impl HttpFlvServer {
156*e05ab47bSHarlanC //     async fn handle_connection(& mut self, req: Request<Body>) -> Result<Response<Body>> {
157*e05ab47bSHarlanC //         let path = req.uri().path();
158*e05ab47bSHarlanC 
159*e05ab47bSHarlanC //         match path.find(".flv") {
160*e05ab47bSHarlanC //             Some(index) if index > 0 => {
161*e05ab47bSHarlanC //                 println!("{}: {}", index, path);
162*e05ab47bSHarlanC //                 let (left, _) = path.split_at(index);
163*e05ab47bSHarlanC //                 println!("11{}: {}", index, left);
164*e05ab47bSHarlanC //                 let mut rv = left.split("/");
165*e05ab47bSHarlanC //                 for s in rv {
166*e05ab47bSHarlanC //                     println!("22{}: {}", index, s);
167*e05ab47bSHarlanC //                 }
168*e05ab47bSHarlanC //                 Ok(Response::builder()
169*e05ab47bSHarlanC //                     .status(StatusCode::OK)
170*e05ab47bSHarlanC //                     .body(OK.into())
171*e05ab47bSHarlanC //                     .unwrap())
172*e05ab47bSHarlanC //             }
173*e05ab47bSHarlanC 
174*e05ab47bSHarlanC //             _ => Ok(Response::builder()
175*e05ab47bSHarlanC //                 .status(StatusCode::NOT_FOUND)
176*e05ab47bSHarlanC //                 .body(NOTFOUND.into())
177*e05ab47bSHarlanC //                 .unwrap()),
178*e05ab47bSHarlanC //         }
179*e05ab47bSHarlanC //     }
180*e05ab47bSHarlanC 
181*e05ab47bSHarlanC //     pub async fn run(&'static mut self) -> Result<()> {
182*e05ab47bSHarlanC //         let addr = "0.0.0.0:13370".parse().unwrap();
183*e05ab47bSHarlanC 
184*e05ab47bSHarlanC //         let new_service = make_service_fn(move |_| async {
185*e05ab47bSHarlanC //             Ok::<_, GenericError>(service_fn(move |req| self.handle_connection(req)))
186*e05ab47bSHarlanC //         });
187*e05ab47bSHarlanC 
188*e05ab47bSHarlanC //         let server = Server::bind(&addr).serve(new_service);
189*e05ab47bSHarlanC //         println!("Listening on http://{}", addr);
190*e05ab47bSHarlanC //         server.await?;
191*e05ab47bSHarlanC 
192*e05ab47bSHarlanC //         Ok(())
193*e05ab47bSHarlanC //     }
194*e05ab47bSHarlanC // }
195