1 use std::ops::Index; 2 3 use bytes::BytesMut; 4 // use super::errors::ServerError; 5 use hyper::service::{make_service_fn, service_fn}; 6 use hyper::{header, Body, Method, Request, Response, Server, StatusCode}; 7 type Error = Box<dyn std::error::Error + Send + Sync + 'static>; 8 use super::define::HttpResponseDataConsumer; 9 use super::define::HttpResponseDataProducer; 10 use super::httpflv::HttpFlv; 11 use futures_util::{stream, StreamExt}; 12 use networkio::bytes_writer::BytesWriter; 13 use std::io; 14 use tokio::sync::mpsc; 15 use tokio_util::codec::{BytesCodec, FramedRead}; 16 17 use tokio_stream::wrappers::UnboundedReceiverStream; 18 19 use futures::{task::SpawnExt, SinkExt, Stream}; // 0.3.1, features = ["thread-pool"] 20 21 use { 22 crate::rtmp::channels::define::{ 23 ChannelData, ChannelDataConsumer, ChannelDataProducer, ChannelEvent, ChannelEventProducer, 24 }, 25 networkio::networkio::NetworkIO, 26 std::{sync::Arc, time::Duration}, 27 // tokio::{ 28 // sync::{mpsc, oneshot, Mutex}, 29 // time::sleep, 30 // }, 31 }; 32 33 //pub static mut event_producer : ChannelEventProducer ;// 34 35 type GenericError = Box<dyn std::error::Error + Send + Sync>; 36 37 type Result<T> = std::result::Result<T, GenericError>; 38 39 static INDEX: &[u8] = b"<a href=\"test.html\">test.html</a>"; 40 static INTERNAL_SERVER_ERROR: &[u8] = b"Internal Server Error"; 41 static NOTFOUND: &[u8] = b"Not Found"; 42 static OK: &[u8] = b"OK"; 43 static POST_DATA: &str = r#"{"original": "data"}"#; 44 static URL: &str = "http://127.0.0.1:1337/json_api"; 45 46 async fn api_get_response() -> Result<Response<Body>> { 47 let data = vec!["foo", "bar"]; 48 let res = match serde_json::to_string(&data) { 49 Ok(json) => Response::builder() 50 .header(header::CONTENT_TYPE, "application/json") 51 .body(Body::from(json)) 52 .unwrap(), 53 Err(_) => Response::builder() 54 .status(StatusCode::INTERNAL_SERVER_ERROR) 55 .body(INTERNAL_SERVER_ERROR.into()) 56 .unwrap(), 57 }; 58 Ok(res) 59 } 60 61 fn stream(rv: HttpResponseDataConsumer) -> impl Stream<Item = io::Result<BytesMut>> { 62 rv 63 } 64 65 async fn handle_connection( 66 req: Request<Body>, 67 event_producer: ChannelEventProducer, // event_producer: ChannelEventProducer 68 ) -> Result<Response<Body>> { 69 let path = req.uri().path(); 70 71 match path.find(".flv") { 72 Some(index) if index > 0 => { 73 println!("{}: {}", index, path); 74 let (left, _) = path.split_at(index); 75 println!("11{}: {}", index, left); 76 let rv: Vec<_> = left.split("/").collect(); 77 for s in rv.clone() { 78 println!("22{}: {}", index, s); 79 } 80 81 let app_name = String::from(rv[0]); 82 let stream_name = String::from(rv[1]); 83 84 let (http_response_data_producer, http_response_data_consumer) = 85 mpsc::unbounded_channel(); 86 87 let mut flv_hanlder = HttpFlv::new( 88 app_name, 89 stream_name, 90 event_producer, 91 http_response_data_producer, 92 ); 93 94 flv_hanlder.run(); 95 96 // Ok(Response::builder() 97 // .status(StatusCode::OK) 98 // .body(OK.into()) 99 // .unwrap()) 100 101 let stream = UnboundedReceiverStream::new(http_response_data_consumer); 102 103 let resp = Response::new(Body::wrap_stream(stream)); 104 105 Ok(resp) 106 } 107 108 _ => Ok(Response::builder() 109 .status(StatusCode::NOT_FOUND) 110 .body(NOTFOUND.into()) 111 .unwrap()), 112 } 113 } 114 115 pub async fn run(event_producer: ChannelEventProducer) -> Result<()> { 116 let addr = "0.0.0.0:13370".parse().unwrap(); 117 118 let new_service = make_service_fn(move |_| { 119 let flv_copy = event_producer.clone(); 120 async { 121 Ok::<_, GenericError>(service_fn(move |req| { 122 handle_connection(req, flv_copy.clone()) 123 })) 124 } 125 }); 126 127 // let shared_router = Arc::new(router); 128 // let new_service = make_service_fn(move |_| { 129 // let app_state = AppState { 130 // state_thing: some_state.clone(), 131 // }; 132 133 // let router_capture = shared_router.clone(); 134 // async { 135 // Ok::<_, Error>(service_fn(move |req| { 136 // route(router_capture.clone(), req, app_state.clone()) 137 // })) 138 // } 139 // }); 140 141 let server = Server::bind(&addr).serve(new_service); 142 println!("Listening on http://{}", addr); 143 server.await?; 144 145 // let addr = "0.0.0.0:8080".parse().expect("address creation works"); 146 // let server = Server::bind(&addr).serve(new_service); 147 // println!("Listening on http://{}", addr); 148 // let _ = server.await; 149 150 Ok(()) 151 } 152 153 // pub struct HttpFlvServer {} 154 155 // impl HttpFlvServer { 156 // async fn handle_connection(& mut self, req: Request<Body>) -> Result<Response<Body>> { 157 // let path = req.uri().path(); 158 159 // match path.find(".flv") { 160 // Some(index) if index > 0 => { 161 // println!("{}: {}", index, path); 162 // let (left, _) = path.split_at(index); 163 // println!("11{}: {}", index, left); 164 // let mut rv = left.split("/"); 165 // for s in rv { 166 // println!("22{}: {}", index, s); 167 // } 168 // Ok(Response::builder() 169 // .status(StatusCode::OK) 170 // .body(OK.into()) 171 // .unwrap()) 172 // } 173 174 // _ => Ok(Response::builder() 175 // .status(StatusCode::NOT_FOUND) 176 // .body(NOTFOUND.into()) 177 // .unwrap()), 178 // } 179 // } 180 181 // pub async fn run(&'static mut self) -> Result<()> { 182 // let addr = "0.0.0.0:13370".parse().unwrap(); 183 184 // let new_service = make_service_fn(move |_| async { 185 // Ok::<_, GenericError>(service_fn(move |req| self.handle_connection(req))) 186 // }); 187 188 // let server = Server::bind(&addr).serve(new_service); 189 // println!("Listening on http://{}", addr); 190 // server.await?; 191 192 // Ok(()) 193 // } 194 // } 195