1 use { 2 super::{ 3 bytes_errors::{BytesReadError, BytesReadErrorValue}, 4 bytesio::TNetIO, 5 }, 6 byteorder::{ByteOrder, ReadBytesExt}, 7 bytes::{BufMut, BytesMut}, 8 std::{io::Cursor, sync::Arc}, 9 tokio::sync::Mutex, 10 }; 11 12 pub struct BytesReader { 13 buffer: BytesMut, 14 } 15 impl BytesReader { 16 pub fn new(input: BytesMut) -> Self { 17 Self { buffer: input } 18 } 19 20 pub fn extend_from_slice(&mut self, extend: &[u8]) { 21 let remaining_mut = self.buffer.remaining_mut(); 22 let extend_length = extend.len(); 23 24 if extend_length > remaining_mut { 25 let additional = extend_length - remaining_mut; 26 self.buffer.reserve(additional); 27 } 28 29 self.buffer.extend_from_slice(extend) 30 } 31 32 pub fn read_bytes(&mut self, bytes_num: usize) -> Result<BytesMut, BytesReadError> { 33 if self.buffer.len() < bytes_num { 34 return Err(BytesReadError { 35 value: BytesReadErrorValue::NotEnoughBytes, 36 }); 37 } 38 Ok(self.buffer.split_to(bytes_num)) 39 } 40 41 pub fn advance_bytes(&mut self, bytes_num: usize) -> Result<BytesMut, BytesReadError> { 42 if self.buffer.len() < bytes_num { 43 return Err(BytesReadError { 44 value: BytesReadErrorValue::NotEnoughBytes, 45 }); 46 } 47 48 //here maybe optimised 49 Ok(self.buffer.clone().split_to(bytes_num)) 50 } 51 52 pub fn read_bytes_cursor( 53 &mut self, 54 bytes_num: usize, 55 ) -> Result<Cursor<BytesMut>, BytesReadError> { 56 let tmp_bytes = self.read_bytes(bytes_num)?; 57 let tmp_cursor = Cursor::new(tmp_bytes); 58 Ok(tmp_cursor) 59 } 60 61 pub fn advance_bytes_cursor( 62 &mut self, 63 bytes_num: usize, 64 ) -> Result<Cursor<BytesMut>, BytesReadError> { 65 let tmp_bytes = self.advance_bytes(bytes_num)?; 66 let tmp_cursor = Cursor::new(tmp_bytes); 67 Ok(tmp_cursor) 68 } 69 70 pub fn read_u8(&mut self) -> Result<u8, BytesReadError> { 71 let mut cursor = self.read_bytes_cursor(1)?; 72 73 Ok(cursor.read_u8()?) 74 } 75 76 pub fn advance_u8(&mut self) -> Result<u8, BytesReadError> { 77 let mut cursor = self.advance_bytes_cursor(1)?; 78 Ok(cursor.read_u8()?) 79 } 80 81 pub fn read_u16<T: ByteOrder>(&mut self) -> Result<u16, BytesReadError> { 82 let mut cursor = self.read_bytes_cursor(2)?; 83 let val = cursor.read_u16::<T>()?; 84 Ok(val) 85 } 86 87 pub fn read_u24<T: ByteOrder>(&mut self) -> Result<u32, BytesReadError> { 88 let mut cursor = self.read_bytes_cursor(3)?; 89 let val = cursor.read_u24::<T>()?; 90 Ok(val) 91 } 92 93 pub fn advance_u24<T: ByteOrder>(&mut self) -> Result<u32, BytesReadError> { 94 let mut cursor = self.advance_bytes_cursor(3)?; 95 Ok(cursor.read_u24::<T>()?) 96 } 97 98 pub fn read_u32<T: ByteOrder>(&mut self) -> Result<u32, BytesReadError> { 99 let mut cursor = self.read_bytes_cursor(4)?; 100 let val = cursor.read_u32::<T>()?; 101 102 Ok(val) 103 } 104 105 pub fn read_f64<T: ByteOrder>(&mut self) -> Result<f64, BytesReadError> { 106 let mut cursor = self.read_bytes_cursor(8)?; 107 let val = cursor.read_f64::<T>()?; 108 109 Ok(val) 110 } 111 112 pub fn read_u64<T: ByteOrder>(&mut self) -> Result<u64, BytesReadError> { 113 let mut cursor = self.read_bytes_cursor(8)?; 114 let val = cursor.read_u64::<T>()?; 115 116 Ok(val) 117 } 118 119 pub fn get(&self, index: usize) -> Result<u8, BytesReadError> { 120 if index >= self.len() { 121 return Err(BytesReadError { 122 value: BytesReadErrorValue::IndexOutofRange, 123 }); 124 } 125 126 Ok(*self.buffer.get(index).unwrap()) 127 } 128 129 pub fn len(&self) -> usize { 130 self.buffer.len() 131 } 132 133 pub fn is_empty(&self) -> bool { 134 self.len() == 0 135 } 136 137 pub fn extract_remaining_bytes(&mut self) -> BytesMut { 138 self.buffer.split_to(self.buffer.len()) 139 } 140 pub fn get_remaining_bytes(&self) -> BytesMut { 141 self.buffer.clone() 142 } 143 } 144 pub struct AsyncBytesReader<T1: TNetIO> { 145 pub bytes_reader: BytesReader, 146 pub io: Arc<Mutex<T1>>, 147 } 148 149 impl<T1> AsyncBytesReader<T1> 150 where 151 T1: TNetIO, 152 { 153 pub fn new(io: Arc<Mutex<T1>>) -> Self { 154 Self { 155 bytes_reader: BytesReader::new(BytesMut::default()), 156 io, 157 } 158 } 159 160 pub async fn read(&mut self) -> Result<(), BytesReadError> { 161 let data = self.io.lock().await.read().await?; 162 self.bytes_reader.extend_from_slice(&data[..]); 163 Ok(()) 164 } 165 166 async fn check(&mut self, bytes_num: usize) -> Result<(), BytesReadError> { 167 while self.bytes_reader.len() < bytes_num { 168 self.read().await?; 169 } 170 171 Ok(()) 172 } 173 174 pub async fn read_bytes(&mut self, bytes_num: usize) -> Result<BytesMut, BytesReadError> { 175 self.check(bytes_num).await?; 176 self.bytes_reader.read_bytes(bytes_num) 177 } 178 179 pub async fn advance_bytes(&mut self, bytes_num: usize) -> Result<BytesMut, BytesReadError> { 180 self.check(bytes_num).await?; 181 self.bytes_reader.advance_bytes(bytes_num) 182 } 183 184 pub async fn read_bytes_cursor( 185 &mut self, 186 bytes_num: usize, 187 ) -> Result<Cursor<BytesMut>, BytesReadError> { 188 self.check(bytes_num).await?; 189 self.bytes_reader.read_bytes_cursor(bytes_num) 190 } 191 192 pub async fn advance_bytes_cursor( 193 &mut self, 194 bytes_num: usize, 195 ) -> Result<Cursor<BytesMut>, BytesReadError> { 196 self.check(bytes_num).await?; 197 self.bytes_reader.advance_bytes_cursor(bytes_num) 198 } 199 200 pub async fn read_u8(&mut self) -> Result<u8, BytesReadError> { 201 self.check(1).await?; 202 self.bytes_reader.read_u8() 203 } 204 205 pub async fn advance_u8(&mut self) -> Result<u8, BytesReadError> { 206 self.check(1).await?; 207 self.bytes_reader.advance_u8() 208 } 209 210 pub async fn read_u16<T: ByteOrder>(&mut self) -> Result<u16, BytesReadError> { 211 self.check(2).await?; 212 self.bytes_reader.read_u16::<T>() 213 } 214 215 pub async fn read_u24<T: ByteOrder>(&mut self) -> Result<u32, BytesReadError> { 216 self.check(3).await?; 217 self.bytes_reader.read_u24::<T>() 218 } 219 220 pub async fn advance_u24<T: ByteOrder>(&mut self) -> Result<u32, BytesReadError> { 221 self.check(3).await?; 222 self.bytes_reader.advance_u24::<T>() 223 } 224 225 pub async fn read_u32<T: ByteOrder>(&mut self) -> Result<u32, BytesReadError> { 226 self.check(4).await?; 227 self.bytes_reader.read_u32::<T>() 228 } 229 230 pub async fn read_f64<T: ByteOrder>(&mut self) -> Result<f64, BytesReadError> { 231 self.check(8).await?; 232 self.bytes_reader.read_f64::<T>() 233 } 234 } 235 236 #[cfg(test)] 237 mod tests { 238 239 use super::BytesReader; 240 use bytes::BytesMut; 241 use std::cell::RefCell; 242 use std::rc::Rc; 243 244 #[test] 245 fn test_rc_refcell() { 246 let reader = Rc::new(RefCell::new(BytesReader::new(BytesMut::new()))); 247 let xs: [u8; 3] = [1, 2, 3]; 248 reader.borrow_mut().extend_from_slice(&xs[..]); 249 250 let mut rv = reader.borrow_mut().read_u8().unwrap(); 251 assert_eq!(rv, 1, "Incorrect value"); 252 253 rv = reader.borrow_mut().read_u8().unwrap(); 254 assert_eq!(rv, 2, "Incorrect value"); 255 256 rv = reader.borrow_mut().read_u8().unwrap(); 257 assert_eq!(rv, 3, "Incorrect value"); 258 } 259 260 struct RefStruct { 261 pub reader: Rc<RefCell<BytesReader>>, 262 } 263 264 impl RefStruct { 265 pub fn new(reader: Rc<RefCell<BytesReader>>) -> Self { 266 Self { reader } 267 } 268 269 // pub fn read_u8(&mut self) -> u8 { 270 // return self.reader.borrow_mut().read_u8().unwrap(); 271 // } 272 273 pub fn extend_from_slice(&mut self, data: &[u8]) { 274 self.reader.borrow_mut().extend_from_slice(data); 275 } 276 } 277 278 #[test] 279 fn test_struct_rc_refcell() { 280 let reader = Rc::new(RefCell::new(BytesReader::new(BytesMut::new()))); 281 282 let mut ref_struct = RefStruct::new(reader); 283 284 let xs: [u8; 3] = [1, 2, 3]; 285 ref_struct.extend_from_slice(&xs); 286 287 let mut reader = ref_struct.reader.borrow_mut(); 288 289 let mut rv = reader.read_u8().unwrap(); 290 assert_eq!(rv, 1, "Incorrect value"); 291 292 rv = reader.read_u8().unwrap(); 293 assert_eq!(rv, 2, "Incorrect value"); 294 295 rv = reader.read_u8().unwrap(); 296 assert_eq!(rv, 3, "Incorrect value"); 297 } 298 } 299