xref: /xiu/protocol/httpflv/src/server.rs (revision e05ab47b)
1 use std::ops::Index;
2 
3 use bytes::BytesMut;
4 // use super::errors::ServerError;
5 use hyper::service::{make_service_fn, service_fn};
6 use hyper::{header, Body, Method, Request, Response, Server, StatusCode};
7 type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
8 use super::define::HttpResponseDataConsumer;
9 use super::define::HttpResponseDataProducer;
10 use super::httpflv::HttpFlv;
11 use futures_util::{stream, StreamExt};
12 use networkio::bytes_writer::BytesWriter;
13 use std::io;
14 use tokio::sync::mpsc;
15 use tokio_util::codec::{BytesCodec, FramedRead};
16 
17 use tokio_stream::wrappers::UnboundedReceiverStream;
18 
19 use futures::{task::SpawnExt, SinkExt, Stream}; // 0.3.1, features = ["thread-pool"]
20 
21 use {
22     crate::rtmp::channels::define::{
23         ChannelData, ChannelDataConsumer, ChannelDataProducer, ChannelEvent, ChannelEventProducer,
24     },
25     networkio::networkio::NetworkIO,
26     std::{sync::Arc, time::Duration},
27     // tokio::{
28     //     sync::{mpsc, oneshot, Mutex},
29     //     time::sleep,
30     // },
31 };
32 
33 //pub static mut event_producer : ChannelEventProducer ;//
34 
35 type GenericError = Box<dyn std::error::Error + Send + Sync>;
36 
37 type Result<T> = std::result::Result<T, GenericError>;
38 
39 static INDEX: &[u8] = b"<a href=\"test.html\">test.html</a>";
40 static INTERNAL_SERVER_ERROR: &[u8] = b"Internal Server Error";
41 static NOTFOUND: &[u8] = b"Not Found";
42 static OK: &[u8] = b"OK";
43 static POST_DATA: &str = r#"{"original": "data"}"#;
44 static URL: &str = "http://127.0.0.1:1337/json_api";
45 
46 async fn api_get_response() -> Result<Response<Body>> {
47     let data = vec!["foo", "bar"];
48     let res = match serde_json::to_string(&data) {
49         Ok(json) => Response::builder()
50             .header(header::CONTENT_TYPE, "application/json")
51             .body(Body::from(json))
52             .unwrap(),
53         Err(_) => Response::builder()
54             .status(StatusCode::INTERNAL_SERVER_ERROR)
55             .body(INTERNAL_SERVER_ERROR.into())
56             .unwrap(),
57     };
58     Ok(res)
59 }
60 
61 fn stream(rv: HttpResponseDataConsumer) -> impl Stream<Item = io::Result<BytesMut>> {
62     rv
63 }
64 
65 async fn handle_connection(
66     req: Request<Body>,
67     event_producer: ChannelEventProducer, // event_producer: ChannelEventProducer
68 ) -> Result<Response<Body>> {
69     let path = req.uri().path();
70 
71     match path.find(".flv") {
72         Some(index) if index > 0 => {
73             println!("{}: {}", index, path);
74             let (left, _) = path.split_at(index);
75             println!("11{}: {}", index, left);
76             let rv: Vec<_> = left.split("/").collect();
77             for s in rv.clone() {
78                 println!("22{}: {}", index, s);
79             }
80 
81             let app_name = String::from(rv[0]);
82             let stream_name = String::from(rv[1]);
83 
84             let (http_response_data_producer, http_response_data_consumer) =
85                 mpsc::unbounded_channel();
86 
87             let mut flv_hanlder = HttpFlv::new(
88                 app_name,
89                 stream_name,
90                 event_producer,
91                 http_response_data_producer,
92             );
93 
94             flv_hanlder.run();
95 
96             // Ok(Response::builder()
97             //     .status(StatusCode::OK)
98             //     .body(OK.into())
99             //     .unwrap())
100 
101             let stream = UnboundedReceiverStream::new(http_response_data_consumer);
102 
103             let resp = Response::new(Body::wrap_stream(stream));
104 
105             Ok(resp)
106         }
107 
108         _ => Ok(Response::builder()
109             .status(StatusCode::NOT_FOUND)
110             .body(NOTFOUND.into())
111             .unwrap()),
112     }
113 }
114 
115 pub async fn run(event_producer: ChannelEventProducer) -> Result<()> {
116     let addr = "0.0.0.0:13370".parse().unwrap();
117 
118     let new_service = make_service_fn(move |_| {
119         let flv_copy = event_producer.clone();
120         async {
121             Ok::<_, GenericError>(service_fn(move |req| {
122                 handle_connection(req, flv_copy.clone())
123             }))
124         }
125     });
126 
127     // let shared_router = Arc::new(router);
128     // let new_service = make_service_fn(move |_| {
129     //     let app_state = AppState {
130     //         state_thing: some_state.clone(),
131     //     };
132 
133     //     let router_capture = shared_router.clone();
134     //     async {
135     //         Ok::<_, Error>(service_fn(move |req| {
136     //             route(router_capture.clone(), req, app_state.clone())
137     //         }))
138     //     }
139     // });
140 
141     let server = Server::bind(&addr).serve(new_service);
142     println!("Listening on http://{}", addr);
143     server.await?;
144 
145     // let addr = "0.0.0.0:8080".parse().expect("address creation works");
146     // let server = Server::bind(&addr).serve(new_service);
147     // println!("Listening on http://{}", addr);
148     // let _ = server.await;
149 
150     Ok(())
151 }
152 
153 // pub struct HttpFlvServer {}
154 
155 // impl HttpFlvServer {
156 //     async fn handle_connection(& mut self, req: Request<Body>) -> Result<Response<Body>> {
157 //         let path = req.uri().path();
158 
159 //         match path.find(".flv") {
160 //             Some(index) if index > 0 => {
161 //                 println!("{}: {}", index, path);
162 //                 let (left, _) = path.split_at(index);
163 //                 println!("11{}: {}", index, left);
164 //                 let mut rv = left.split("/");
165 //                 for s in rv {
166 //                     println!("22{}: {}", index, s);
167 //                 }
168 //                 Ok(Response::builder()
169 //                     .status(StatusCode::OK)
170 //                     .body(OK.into())
171 //                     .unwrap())
172 //             }
173 
174 //             _ => Ok(Response::builder()
175 //                 .status(StatusCode::NOT_FOUND)
176 //                 .body(NOTFOUND.into())
177 //                 .unwrap()),
178 //         }
179 //     }
180 
181 //     pub async fn run(&'static mut self) -> Result<()> {
182 //         let addr = "0.0.0.0:13370".parse().unwrap();
183 
184 //         let new_service = make_service_fn(move |_| async {
185 //             Ok::<_, GenericError>(service_fn(move |req| self.handle_connection(req)))
186 //         });
187 
188 //         let server = Server::bind(&addr).serve(new_service);
189 //         println!("Listening on http://{}", addr);
190 //         server.await?;
191 
192 //         Ok(())
193 //     }
194 // }
195