187d8387fSHarlanC use std::ops::Index; 287d8387fSHarlanC 3*e05ab47bSHarlanC use bytes::BytesMut; 45377b641SHarlanC // use super::errors::ServerError; 55377b641SHarlanC use hyper::service::{make_service_fn, service_fn}; 687d8387fSHarlanC use hyper::{header, Body, Method, Request, Response, Server, StatusCode}; 75377b641SHarlanC type Error = Box<dyn std::error::Error + Send + Sync + 'static>; 8*e05ab47bSHarlanC use super::define::HttpResponseDataConsumer; 9*e05ab47bSHarlanC use super::define::HttpResponseDataProducer; 10*e05ab47bSHarlanC use super::httpflv::HttpFlv; 110c504437SHarlanC use futures_util::{stream, StreamExt}; 12*e05ab47bSHarlanC use networkio::bytes_writer::BytesWriter; 13*e05ab47bSHarlanC use std::io; 14*e05ab47bSHarlanC use tokio::sync::mpsc; 15*e05ab47bSHarlanC use tokio_util::codec::{BytesCodec, FramedRead}; 16*e05ab47bSHarlanC 17*e05ab47bSHarlanC use tokio_stream::wrappers::UnboundedReceiverStream; 18*e05ab47bSHarlanC 19*e05ab47bSHarlanC use futures::{task::SpawnExt, SinkExt, Stream}; // 0.3.1, features = ["thread-pool"] 20*e05ab47bSHarlanC 21*e05ab47bSHarlanC use { 22*e05ab47bSHarlanC crate::rtmp::channels::define::{ 23*e05ab47bSHarlanC ChannelData, ChannelDataConsumer, ChannelDataProducer, ChannelEvent, ChannelEventProducer, 24*e05ab47bSHarlanC }, 25*e05ab47bSHarlanC networkio::networkio::NetworkIO, 26*e05ab47bSHarlanC std::{sync::Arc, time::Duration}, 27*e05ab47bSHarlanC // tokio::{ 28*e05ab47bSHarlanC // sync::{mpsc, oneshot, Mutex}, 29*e05ab47bSHarlanC // time::sleep, 30*e05ab47bSHarlanC // }, 31*e05ab47bSHarlanC }; 32*e05ab47bSHarlanC 33*e05ab47bSHarlanC //pub static mut event_producer : ChannelEventProducer ;// 345377b641SHarlanC 355377b641SHarlanC type GenericError = Box<dyn std::error::Error + Send + Sync>; 365377b641SHarlanC 370c504437SHarlanC type Result<T> = std::result::Result<T, GenericError>; 380c504437SHarlanC 395377b641SHarlanC static INDEX: &[u8] = b"<a href=\"test.html\">test.html</a>"; 405377b641SHarlanC static INTERNAL_SERVER_ERROR: &[u8] = b"Internal Server Error"; 415377b641SHarlanC static NOTFOUND: &[u8] = b"Not Found"; 4287d8387fSHarlanC static OK: &[u8] = b"OK"; 435377b641SHarlanC static POST_DATA: &str = r#"{"original": "data"}"#; 445377b641SHarlanC static URL: &str = "http://127.0.0.1:1337/json_api"; 455377b641SHarlanC 46783195d9SHarlanC async fn api_get_response() -> Result<Response<Body>> { 47783195d9SHarlanC let data = vec!["foo", "bar"]; 48783195d9SHarlanC let res = match serde_json::to_string(&data) { 49783195d9SHarlanC Ok(json) => Response::builder() 505377b641SHarlanC .header(header::CONTENT_TYPE, "application/json") 51783195d9SHarlanC .body(Body::from(json)) 52783195d9SHarlanC .unwrap(), 53783195d9SHarlanC Err(_) => Response::builder() 54783195d9SHarlanC .status(StatusCode::INTERNAL_SERVER_ERROR) 55783195d9SHarlanC .body(INTERNAL_SERVER_ERROR.into()) 56783195d9SHarlanC .unwrap(), 57783195d9SHarlanC }; 58783195d9SHarlanC Ok(res) 595377b641SHarlanC } 605377b641SHarlanC 61*e05ab47bSHarlanC fn stream(rv: HttpResponseDataConsumer) -> impl Stream<Item = io::Result<BytesMut>> { 62*e05ab47bSHarlanC rv 63*e05ab47bSHarlanC } 64*e05ab47bSHarlanC 65*e05ab47bSHarlanC async fn handle_connection( 66*e05ab47bSHarlanC req: Request<Body>, 67*e05ab47bSHarlanC event_producer: ChannelEventProducer, // event_producer: ChannelEventProducer 68*e05ab47bSHarlanC ) -> Result<Response<Body>> { 6987d8387fSHarlanC let path = req.uri().path(); 700c504437SHarlanC 7187d8387fSHarlanC match path.find(".flv") { 7287d8387fSHarlanC Some(index) if index > 0 => { 7387d8387fSHarlanC println!("{}: {}", index, path); 7487d8387fSHarlanC let (left, _) = path.split_at(index); 7587d8387fSHarlanC println!("11{}: {}", index, left); 76*e05ab47bSHarlanC let rv: Vec<_> = left.split("/").collect(); 77*e05ab47bSHarlanC for s in rv.clone() { 7887d8387fSHarlanC println!("22{}: {}", index, s); 7987d8387fSHarlanC } 80*e05ab47bSHarlanC 81*e05ab47bSHarlanC let app_name = String::from(rv[0]); 82*e05ab47bSHarlanC let stream_name = String::from(rv[1]); 83*e05ab47bSHarlanC 84*e05ab47bSHarlanC let (http_response_data_producer, http_response_data_consumer) = 85*e05ab47bSHarlanC mpsc::unbounded_channel(); 86*e05ab47bSHarlanC 87*e05ab47bSHarlanC let mut flv_hanlder = HttpFlv::new( 88*e05ab47bSHarlanC app_name, 89*e05ab47bSHarlanC stream_name, 90*e05ab47bSHarlanC event_producer, 91*e05ab47bSHarlanC http_response_data_producer, 92*e05ab47bSHarlanC ); 93*e05ab47bSHarlanC 94*e05ab47bSHarlanC flv_hanlder.run(); 95*e05ab47bSHarlanC 96*e05ab47bSHarlanC // Ok(Response::builder() 97*e05ab47bSHarlanC // .status(StatusCode::OK) 98*e05ab47bSHarlanC // .body(OK.into()) 99*e05ab47bSHarlanC // .unwrap()) 100*e05ab47bSHarlanC 101*e05ab47bSHarlanC let stream = UnboundedReceiverStream::new(http_response_data_consumer); 102*e05ab47bSHarlanC 103*e05ab47bSHarlanC let resp = Response::new(Body::wrap_stream(stream)); 104*e05ab47bSHarlanC 105*e05ab47bSHarlanC Ok(resp) 1065377b641SHarlanC } 10787d8387fSHarlanC 10887d8387fSHarlanC _ => Ok(Response::builder() 10987d8387fSHarlanC .status(StatusCode::NOT_FOUND) 11087d8387fSHarlanC .body(NOTFOUND.into()) 11187d8387fSHarlanC .unwrap()), 1125377b641SHarlanC } 1135377b641SHarlanC } 1145377b641SHarlanC 115*e05ab47bSHarlanC pub async fn run(event_producer: ChannelEventProducer) -> Result<()> { 116783195d9SHarlanC let addr = "0.0.0.0:13370".parse().unwrap(); 11787d8387fSHarlanC 118*e05ab47bSHarlanC let new_service = make_service_fn(move |_| { 119*e05ab47bSHarlanC let flv_copy = event_producer.clone(); 120*e05ab47bSHarlanC async { 121*e05ab47bSHarlanC Ok::<_, GenericError>(service_fn(move |req| { 122*e05ab47bSHarlanC handle_connection(req, flv_copy.clone()) 123*e05ab47bSHarlanC })) 124*e05ab47bSHarlanC } 1255377b641SHarlanC }); 1265377b641SHarlanC 127*e05ab47bSHarlanC // let shared_router = Arc::new(router); 128*e05ab47bSHarlanC // let new_service = make_service_fn(move |_| { 129*e05ab47bSHarlanC // let app_state = AppState { 130*e05ab47bSHarlanC // state_thing: some_state.clone(), 131*e05ab47bSHarlanC // }; 132*e05ab47bSHarlanC 133*e05ab47bSHarlanC // let router_capture = shared_router.clone(); 134*e05ab47bSHarlanC // async { 135*e05ab47bSHarlanC // Ok::<_, Error>(service_fn(move |req| { 136*e05ab47bSHarlanC // route(router_capture.clone(), req, app_state.clone()) 137*e05ab47bSHarlanC // })) 138*e05ab47bSHarlanC // } 139*e05ab47bSHarlanC // }); 140*e05ab47bSHarlanC 1415377b641SHarlanC let server = Server::bind(&addr).serve(new_service); 1425377b641SHarlanC println!("Listening on http://{}", addr); 143783195d9SHarlanC server.await?; 1440c504437SHarlanC 145*e05ab47bSHarlanC // let addr = "0.0.0.0:8080".parse().expect("address creation works"); 146*e05ab47bSHarlanC // let server = Server::bind(&addr).serve(new_service); 147*e05ab47bSHarlanC // println!("Listening on http://{}", addr); 148*e05ab47bSHarlanC // let _ = server.await; 149*e05ab47bSHarlanC 150783195d9SHarlanC Ok(()) 1515377b641SHarlanC } 152*e05ab47bSHarlanC 153*e05ab47bSHarlanC // pub struct HttpFlvServer {} 154*e05ab47bSHarlanC 155*e05ab47bSHarlanC // impl HttpFlvServer { 156*e05ab47bSHarlanC // async fn handle_connection(& mut self, req: Request<Body>) -> Result<Response<Body>> { 157*e05ab47bSHarlanC // let path = req.uri().path(); 158*e05ab47bSHarlanC 159*e05ab47bSHarlanC // match path.find(".flv") { 160*e05ab47bSHarlanC // Some(index) if index > 0 => { 161*e05ab47bSHarlanC // println!("{}: {}", index, path); 162*e05ab47bSHarlanC // let (left, _) = path.split_at(index); 163*e05ab47bSHarlanC // println!("11{}: {}", index, left); 164*e05ab47bSHarlanC // let mut rv = left.split("/"); 165*e05ab47bSHarlanC // for s in rv { 166*e05ab47bSHarlanC // println!("22{}: {}", index, s); 167*e05ab47bSHarlanC // } 168*e05ab47bSHarlanC // Ok(Response::builder() 169*e05ab47bSHarlanC // .status(StatusCode::OK) 170*e05ab47bSHarlanC // .body(OK.into()) 171*e05ab47bSHarlanC // .unwrap()) 172*e05ab47bSHarlanC // } 173*e05ab47bSHarlanC 174*e05ab47bSHarlanC // _ => Ok(Response::builder() 175*e05ab47bSHarlanC // .status(StatusCode::NOT_FOUND) 176*e05ab47bSHarlanC // .body(NOTFOUND.into()) 177*e05ab47bSHarlanC // .unwrap()), 178*e05ab47bSHarlanC // } 179*e05ab47bSHarlanC // } 180*e05ab47bSHarlanC 181*e05ab47bSHarlanC // pub async fn run(&'static mut self) -> Result<()> { 182*e05ab47bSHarlanC // let addr = "0.0.0.0:13370".parse().unwrap(); 183*e05ab47bSHarlanC 184*e05ab47bSHarlanC // let new_service = make_service_fn(move |_| async { 185*e05ab47bSHarlanC // Ok::<_, GenericError>(service_fn(move |req| self.handle_connection(req))) 186*e05ab47bSHarlanC // }); 187*e05ab47bSHarlanC 188*e05ab47bSHarlanC // let server = Server::bind(&addr).serve(new_service); 189*e05ab47bSHarlanC // println!("Listening on http://{}", addr); 190*e05ab47bSHarlanC // server.await?; 191*e05ab47bSHarlanC 192*e05ab47bSHarlanC // Ok(()) 193*e05ab47bSHarlanC // } 194*e05ab47bSHarlanC // } 195