1 //! I/O utility for bridging between `tokio::io` and `hyper::rt`. 2 3 use hyper::rt::{Read, ReadBufCursor, Write}; 4 use std::io::Error; 5 use std::pin::Pin; 6 use std::task::{Context, Poll}; 7 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; 8 9 /// A type that wraps any type implementing [`tokio::io::AsyncRead`] and [`tokio::io::AsyncWrite`] 10 /// and itself implements [`hyper::rt::Read`] and [`hyper::rt::Write`]. 11 #[derive(Debug)] 12 pub struct TokioIo<T> { 13 inner: T, 14 } 15 16 impl<T> TokioIo<T> { 17 /// Create a new `TokioIo` wrapping the given inner type. new(inner: T) -> TokioIo<T>18 pub fn new(inner: T) -> TokioIo<T> { 19 TokioIo { inner } 20 } 21 } 22 23 impl<T: AsyncRead + Unpin> Read for TokioIo<T> { poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, mut buf: ReadBufCursor<'_>, ) -> Poll<Result<(), Error>>24 fn poll_read( 25 mut self: Pin<&mut Self>, 26 cx: &mut Context<'_>, 27 mut buf: ReadBufCursor<'_>, 28 ) -> Poll<Result<(), Error>> { 29 unsafe { 30 let mut dst = ReadBuf::uninit(buf.as_mut()); 31 let res = Pin::new(&mut self.inner).poll_read(cx, &mut dst); 32 let amt = dst.filled().len(); 33 buf.advance(amt); 34 res 35 } 36 } 37 } 38 39 impl<T: AsyncWrite + Unpin> Write for TokioIo<T> { poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize, Error>>40 fn poll_write( 41 mut self: Pin<&mut Self>, 42 cx: &mut Context<'_>, 43 buf: &[u8], 44 ) -> Poll<Result<usize, Error>> { 45 Pin::new(&mut self.inner).poll_write(cx, buf) 46 } 47 poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>>48 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> { 49 Pin::new(&mut self.inner).poll_flush(cx) 50 } 51 poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>>52 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> { 53 Pin::new(&mut self.inner).poll_shutdown(cx) 54 } 55 } 56