1 use std::ops::Index; 2 3 use bytes::BytesMut; 4 // use super::errors::ServerError; 5 use hyper::service::{make_service_fn, service_fn}; 6 use hyper::{header, Body, Method, Request, Response, Server, StatusCode}; 7 type Error = Box<dyn std::error::Error + Send + Sync + 'static>; 8 use super::define::HttpResponseDataConsumer; 9 use super::define::HttpResponseDataProducer; 10 use super::httpflv::HttpFlv; 11 use futures_util::{stream, StreamExt}; 12 use networkio::bytes_writer::BytesWriter; 13 use std::io; 14 15 use futures::{channel::mpsc::unbounded, task::SpawnExt, SinkExt, Stream}; // 0.3.1, features = ["thread-pool"] 16 17 use { 18 crate::rtmp::channels::define::{ 19 ChannelData, ChannelDataConsumer, ChannelDataProducer, ChannelEvent, ChannelEventProducer, 20 }, 21 networkio::networkio::NetworkIO, 22 std::{sync::Arc, time::Duration}, 23 }; 24 25 type GenericError = Box<dyn std::error::Error + Send + Sync>; 26 type Result<T> = std::result::Result<T, GenericError>; 27 static NOTFOUND: &[u8] = b"Not Found"; 28 29 async fn handle_connection( 30 req: Request<Body>, 31 event_producer: ChannelEventProducer, // event_producer: ChannelEventProducer 32 ) -> Result<Response<Body>> { 33 let path = req.uri().path(); 34 35 match path.find(".flv") { 36 Some(index) if index > 0 => { 37 println!("{}: {}", index, path); 38 let (left, _) = path.split_at(index); 39 println!("11{}: {}", index, left); 40 let rv: Vec<_> = left.split("/").collect(); 41 for s in rv.clone() { 42 println!("22{}: {}", index, s); 43 } 44 45 let app_name = String::from(rv[1]); 46 let stream_name = String::from(rv[2]); 47 48 let (http_response_data_producer, http_response_data_consumer) = unbounded(); 49 50 let mut flv_hanlder = HttpFlv::new( 51 app_name, 52 stream_name, 53 event_producer, 54 http_response_data_producer, 55 ); 56 57 tokio::spawn(async move { 58 if let Err(err) = flv_hanlder.run().await { 59 print!("pull client error {}\n", err); 60 } 61 }); 62 63 let mut resp = Response::new(Body::wrap_stream(http_response_data_consumer)); 64 65 resp.headers_mut() 66 .insert("Access-Control-Allow-Origin", "*".parse().unwrap()); 67 68 Ok(resp) 69 } 70 71 _ => Ok(Response::builder() 72 .status(StatusCode::NOT_FOUND) 73 .body(NOTFOUND.into()) 74 .unwrap()), 75 } 76 } 77 78 pub async fn run(event_producer: ChannelEventProducer, port: u32) -> Result<()> { 79 env_logger::try_init(); 80 81 let listen_address = format!("0.0.0.0:{}", port); 82 let sock_addr = listen_address.parse().unwrap(); 83 84 let new_service = make_service_fn(move |_| { 85 let flv_copy = event_producer.clone(); 86 async { 87 Ok::<_, GenericError>(service_fn(move |req| { 88 handle_connection(req, flv_copy.clone()) 89 })) 90 } 91 }); 92 93 let server = Server::bind(&sock_addr).serve(new_service); 94 println!("Listening on http://{}", sock_addr); 95 log::info!("Listening on http://{}", sock_addr); 96 97 server.await?; 98 99 Ok(()) 100 } 101