1 use { 2 super::{ 3 bytes_errors::{BytesReadError, BytesReadErrorValue}, 4 bytesio::TNetIO, 5 }, 6 byteorder::{ByteOrder, ReadBytesExt}, 7 bytes::{BufMut, BytesMut}, 8 std::{io::Cursor, sync::Arc}, 9 tokio::sync::Mutex, 10 }; 11 12 pub struct BytesReader { 13 buffer: BytesMut, 14 } 15 impl BytesReader { 16 pub fn new(input: BytesMut) -> Self { 17 Self { buffer: input } 18 } 19 20 pub fn extend_from_slice(&mut self, extend: &[u8]) { 21 let remaining_mut = self.buffer.remaining_mut(); 22 let extend_length = extend.len(); 23 24 if extend_length > remaining_mut { 25 let additional = extend_length - remaining_mut; 26 self.buffer.reserve(additional); 27 } 28 29 self.buffer.extend_from_slice(extend) 30 } 31 32 pub fn read_bytes(&mut self, bytes_num: usize) -> Result<BytesMut, BytesReadError> { 33 if self.buffer.len() < bytes_num { 34 return Err(BytesReadError { 35 value: BytesReadErrorValue::NotEnoughBytes, 36 }); 37 } 38 Ok(self.buffer.split_to(bytes_num)) 39 } 40 41 pub fn advance_bytes(&mut self, bytes_num: usize) -> Result<BytesMut, BytesReadError> { 42 if self.buffer.len() < bytes_num { 43 return Err(BytesReadError { 44 value: BytesReadErrorValue::NotEnoughBytes, 45 }); 46 } 47 48 //here maybe optimised 49 Ok(self.buffer.clone().split_to(bytes_num)) 50 } 51 52 pub fn read_bytes_cursor( 53 &mut self, 54 bytes_num: usize, 55 ) -> Result<Cursor<BytesMut>, BytesReadError> { 56 let tmp_bytes = self.read_bytes(bytes_num)?; 57 let tmp_cursor = Cursor::new(tmp_bytes); 58 Ok(tmp_cursor) 59 } 60 61 pub fn advance_bytes_cursor( 62 &mut self, 63 bytes_num: usize, 64 ) -> Result<Cursor<BytesMut>, BytesReadError> { 65 let tmp_bytes = self.advance_bytes(bytes_num)?; 66 let tmp_cursor = Cursor::new(tmp_bytes); 67 Ok(tmp_cursor) 68 } 69 70 pub fn read_u8(&mut self) -> Result<u8, BytesReadError> { 71 let mut cursor = self.read_bytes_cursor(1)?; 72 73 Ok(cursor.read_u8()?) 74 } 75 76 pub fn advance_u8(&mut self) -> Result<u8, BytesReadError> { 77 let mut cursor = self.advance_bytes_cursor(1)?; 78 Ok(cursor.read_u8()?) 79 } 80 81 pub fn read_u16<T: ByteOrder>(&mut self) -> Result<u16, BytesReadError> { 82 let mut cursor = self.read_bytes_cursor(2)?; 83 let val = cursor.read_u16::<T>()?; 84 Ok(val) 85 } 86 87 pub fn read_u24<T: ByteOrder>(&mut self) -> Result<u32, BytesReadError> { 88 let mut cursor = self.read_bytes_cursor(3)?; 89 let val = cursor.read_u24::<T>()?; 90 Ok(val) 91 } 92 93 pub fn advance_u24<T: ByteOrder>(&mut self) -> Result<u32, BytesReadError> { 94 let mut cursor = self.advance_bytes_cursor(3)?; 95 Ok(cursor.read_u24::<T>()?) 96 } 97 98 pub fn read_u32<T: ByteOrder>(&mut self) -> Result<u32, BytesReadError> { 99 let mut cursor = self.read_bytes_cursor(4)?; 100 let val = cursor.read_u32::<T>()?; 101 102 Ok(val) 103 } 104 105 pub fn read_u48<T: ByteOrder>(&mut self) -> Result<u64, BytesReadError> { 106 let mut cursor = self.read_bytes_cursor(6)?; 107 let val = cursor.read_u48::<T>()?; 108 109 Ok(val) 110 } 111 112 pub fn read_f64<T: ByteOrder>(&mut self) -> Result<f64, BytesReadError> { 113 let mut cursor = self.read_bytes_cursor(8)?; 114 let val = cursor.read_f64::<T>()?; 115 116 Ok(val) 117 } 118 119 pub fn read_u64<T: ByteOrder>(&mut self) -> Result<u64, BytesReadError> { 120 let mut cursor = self.read_bytes_cursor(8)?; 121 let val = cursor.read_u64::<T>()?; 122 123 Ok(val) 124 } 125 126 pub fn get(&self, index: usize) -> Result<u8, BytesReadError> { 127 if index >= self.len() { 128 return Err(BytesReadError { 129 value: BytesReadErrorValue::IndexOutofRange, 130 }); 131 } 132 133 Ok(*self.buffer.get(index).unwrap()) 134 } 135 136 pub fn len(&self) -> usize { 137 self.buffer.len() 138 } 139 140 pub fn is_empty(&self) -> bool { 141 self.len() == 0 142 } 143 144 pub fn extract_remaining_bytes(&mut self) -> BytesMut { 145 self.buffer.split_to(self.buffer.len()) 146 } 147 pub fn get_remaining_bytes(&self) -> BytesMut { 148 self.buffer.clone() 149 } 150 } 151 pub struct AsyncBytesReader<T1: TNetIO> { 152 pub bytes_reader: BytesReader, 153 pub io: Arc<Mutex<T1>>, 154 } 155 156 impl<T1> AsyncBytesReader<T1> 157 where 158 T1: TNetIO, 159 { 160 pub fn new(io: Arc<Mutex<T1>>) -> Self { 161 Self { 162 bytes_reader: BytesReader::new(BytesMut::default()), 163 io, 164 } 165 } 166 167 pub async fn read(&mut self) -> Result<(), BytesReadError> { 168 let data = self.io.lock().await.read().await?; 169 self.bytes_reader.extend_from_slice(&data[..]); 170 Ok(()) 171 } 172 173 async fn check(&mut self, bytes_num: usize) -> Result<(), BytesReadError> { 174 while self.bytes_reader.len() < bytes_num { 175 self.read().await?; 176 } 177 178 Ok(()) 179 } 180 181 pub async fn read_bytes(&mut self, bytes_num: usize) -> Result<BytesMut, BytesReadError> { 182 self.check(bytes_num).await?; 183 self.bytes_reader.read_bytes(bytes_num) 184 } 185 186 pub async fn advance_bytes(&mut self, bytes_num: usize) -> Result<BytesMut, BytesReadError> { 187 self.check(bytes_num).await?; 188 self.bytes_reader.advance_bytes(bytes_num) 189 } 190 191 pub async fn read_bytes_cursor( 192 &mut self, 193 bytes_num: usize, 194 ) -> Result<Cursor<BytesMut>, BytesReadError> { 195 self.check(bytes_num).await?; 196 self.bytes_reader.read_bytes_cursor(bytes_num) 197 } 198 199 pub async fn advance_bytes_cursor( 200 &mut self, 201 bytes_num: usize, 202 ) -> Result<Cursor<BytesMut>, BytesReadError> { 203 self.check(bytes_num).await?; 204 self.bytes_reader.advance_bytes_cursor(bytes_num) 205 } 206 207 pub async fn read_u8(&mut self) -> Result<u8, BytesReadError> { 208 self.check(1).await?; 209 self.bytes_reader.read_u8() 210 } 211 212 pub async fn advance_u8(&mut self) -> Result<u8, BytesReadError> { 213 self.check(1).await?; 214 self.bytes_reader.advance_u8() 215 } 216 217 pub async fn read_u16<T: ByteOrder>(&mut self) -> Result<u16, BytesReadError> { 218 self.check(2).await?; 219 self.bytes_reader.read_u16::<T>() 220 } 221 222 pub async fn read_u24<T: ByteOrder>(&mut self) -> Result<u32, BytesReadError> { 223 self.check(3).await?; 224 self.bytes_reader.read_u24::<T>() 225 } 226 227 pub async fn advance_u24<T: ByteOrder>(&mut self) -> Result<u32, BytesReadError> { 228 self.check(3).await?; 229 self.bytes_reader.advance_u24::<T>() 230 } 231 232 pub async fn read_u32<T: ByteOrder>(&mut self) -> Result<u32, BytesReadError> { 233 self.check(4).await?; 234 self.bytes_reader.read_u32::<T>() 235 } 236 237 pub async fn read_f64<T: ByteOrder>(&mut self) -> Result<f64, BytesReadError> { 238 self.check(8).await?; 239 self.bytes_reader.read_f64::<T>() 240 } 241 } 242 243 #[cfg(test)] 244 mod tests { 245 246 use super::BytesReader; 247 use bytes::BytesMut; 248 use std::cell::RefCell; 249 use std::rc::Rc; 250 251 #[test] 252 fn test_rc_refcell() { 253 let reader = Rc::new(RefCell::new(BytesReader::new(BytesMut::new()))); 254 let xs: [u8; 3] = [1, 2, 3]; 255 reader.borrow_mut().extend_from_slice(&xs[..]); 256 257 let mut rv = reader.borrow_mut().read_u8().unwrap(); 258 assert_eq!(rv, 1, "Incorrect value"); 259 260 rv = reader.borrow_mut().read_u8().unwrap(); 261 assert_eq!(rv, 2, "Incorrect value"); 262 263 rv = reader.borrow_mut().read_u8().unwrap(); 264 assert_eq!(rv, 3, "Incorrect value"); 265 } 266 267 struct RefStruct { 268 pub reader: Rc<RefCell<BytesReader>>, 269 } 270 271 impl RefStruct { 272 pub fn new(reader: Rc<RefCell<BytesReader>>) -> Self { 273 Self { reader } 274 } 275 276 // pub fn read_u8(&mut self) -> u8 { 277 // return self.reader.borrow_mut().read_u8().unwrap(); 278 // } 279 280 pub fn extend_from_slice(&mut self, data: &[u8]) { 281 self.reader.borrow_mut().extend_from_slice(data); 282 } 283 } 284 285 #[test] 286 fn test_struct_rc_refcell() { 287 let reader = Rc::new(RefCell::new(BytesReader::new(BytesMut::new()))); 288 289 let mut ref_struct = RefStruct::new(reader); 290 291 let xs: [u8; 3] = [1, 2, 3]; 292 ref_struct.extend_from_slice(&xs); 293 294 let mut reader = ref_struct.reader.borrow_mut(); 295 296 let mut rv = reader.read_u8().unwrap(); 297 assert_eq!(rv, 1, "Incorrect value"); 298 299 rv = reader.read_u8().unwrap(); 300 assert_eq!(rv, 2, "Incorrect value"); 301 302 rv = reader.read_u8().unwrap(); 303 assert_eq!(rv, 3, "Incorrect value"); 304 } 305 } 306