xref: /xiu/library/bytesio/src/bytes_reader.rs (revision b36cf5da)
1 use {
2     super::{
3         bytes_errors::{BytesReadError, BytesReadErrorValue},
4         bytesio::TNetIO,
5     },
6     byteorder::{ByteOrder, ReadBytesExt},
7     bytes::{BufMut, BytesMut},
8     std::{io::Cursor, sync::Arc},
9     tokio::sync::Mutex,
10 };
11 
12 pub struct BytesReader {
13     buffer: BytesMut,
14 }
15 impl BytesReader {
16     pub fn new(input: BytesMut) -> Self {
17         Self { buffer: input }
18     }
19 
20     pub fn extend_from_slice(&mut self, extend: &[u8]) {
21         let remaining_mut = self.buffer.remaining_mut();
22         let extend_length = extend.len();
23 
24         if extend_length > remaining_mut {
25             let additional = extend_length - remaining_mut;
26             self.buffer.reserve(additional);
27         }
28 
29         self.buffer.extend_from_slice(extend)
30     }
31 
32     pub fn read_bytes(&mut self, bytes_num: usize) -> Result<BytesMut, BytesReadError> {
33         if self.buffer.len() < bytes_num {
34             return Err(BytesReadError {
35                 value: BytesReadErrorValue::NotEnoughBytes,
36             });
37         }
38         Ok(self.buffer.split_to(bytes_num))
39     }
40 
41     pub fn advance_bytes(&mut self, bytes_num: usize) -> Result<BytesMut, BytesReadError> {
42         if self.buffer.len() < bytes_num {
43             return Err(BytesReadError {
44                 value: BytesReadErrorValue::NotEnoughBytes,
45             });
46         }
47 
48         //here maybe optimised
49         Ok(self.buffer.clone().split_to(bytes_num))
50     }
51 
52     pub fn read_bytes_cursor(
53         &mut self,
54         bytes_num: usize,
55     ) -> Result<Cursor<BytesMut>, BytesReadError> {
56         let tmp_bytes = self.read_bytes(bytes_num)?;
57         let tmp_cursor = Cursor::new(tmp_bytes);
58         Ok(tmp_cursor)
59     }
60 
61     pub fn advance_bytes_cursor(
62         &mut self,
63         bytes_num: usize,
64     ) -> Result<Cursor<BytesMut>, BytesReadError> {
65         let tmp_bytes = self.advance_bytes(bytes_num)?;
66         let tmp_cursor = Cursor::new(tmp_bytes);
67         Ok(tmp_cursor)
68     }
69 
70     pub fn read_u8(&mut self) -> Result<u8, BytesReadError> {
71         let mut cursor = self.read_bytes_cursor(1)?;
72 
73         Ok(cursor.read_u8()?)
74     }
75 
76     pub fn advance_u8(&mut self) -> Result<u8, BytesReadError> {
77         let mut cursor = self.advance_bytes_cursor(1)?;
78         Ok(cursor.read_u8()?)
79     }
80 
81     pub fn read_u16<T: ByteOrder>(&mut self) -> Result<u16, BytesReadError> {
82         let mut cursor = self.read_bytes_cursor(2)?;
83         let val = cursor.read_u16::<T>()?;
84         Ok(val)
85     }
86 
87     pub fn read_u24<T: ByteOrder>(&mut self) -> Result<u32, BytesReadError> {
88         let mut cursor = self.read_bytes_cursor(3)?;
89         let val = cursor.read_u24::<T>()?;
90         Ok(val)
91     }
92 
93     pub fn advance_u24<T: ByteOrder>(&mut self) -> Result<u32, BytesReadError> {
94         let mut cursor = self.advance_bytes_cursor(3)?;
95         Ok(cursor.read_u24::<T>()?)
96     }
97 
98     pub fn read_u32<T: ByteOrder>(&mut self) -> Result<u32, BytesReadError> {
99         let mut cursor = self.read_bytes_cursor(4)?;
100         let val = cursor.read_u32::<T>()?;
101 
102         Ok(val)
103     }
104 
105     pub fn read_f64<T: ByteOrder>(&mut self) -> Result<f64, BytesReadError> {
106         let mut cursor = self.read_bytes_cursor(8)?;
107         let val = cursor.read_f64::<T>()?;
108 
109         Ok(val)
110     }
111 
112     pub fn read_u64<T: ByteOrder>(&mut self) -> Result<u64, BytesReadError> {
113         let mut cursor = self.read_bytes_cursor(8)?;
114         let val = cursor.read_u64::<T>()?;
115 
116         Ok(val)
117     }
118 
119     pub fn get(&self, index: usize) -> Result<u8, BytesReadError> {
120         if index >= self.len() {
121             return Err(BytesReadError {
122                 value: BytesReadErrorValue::IndexOutofRange,
123             });
124         }
125 
126         Ok(*self.buffer.get(index).unwrap())
127     }
128 
129     pub fn len(&self) -> usize {
130         self.buffer.len()
131     }
132 
133     pub fn is_empty(&self) -> bool {
134         self.len() == 0
135     }
136 
137     pub fn extract_remaining_bytes(&mut self) -> BytesMut {
138         self.buffer.split_to(self.buffer.len())
139     }
140     pub fn get_remaining_bytes(&self) -> BytesMut {
141         self.buffer.clone()
142     }
143 }
144 pub struct AsyncBytesReader<T1: TNetIO> {
145     pub bytes_reader: BytesReader,
146     pub io: Arc<Mutex<T1>>,
147 }
148 
149 impl<T1> AsyncBytesReader<T1>
150 where
151     T1: TNetIO,
152 {
153     pub fn new(io: Arc<Mutex<T1>>) -> Self {
154         Self {
155             bytes_reader: BytesReader::new(BytesMut::default()),
156             io,
157         }
158     }
159 
160     pub async fn read(&mut self) -> Result<(), BytesReadError> {
161         let data = self.io.lock().await.read().await?;
162         self.bytes_reader.extend_from_slice(&data[..]);
163         Ok(())
164     }
165 
166     async fn check(&mut self, bytes_num: usize) -> Result<(), BytesReadError> {
167         while self.bytes_reader.len() < bytes_num {
168             self.read().await?;
169         }
170 
171         Ok(())
172     }
173 
174     pub async fn read_bytes(&mut self, bytes_num: usize) -> Result<BytesMut, BytesReadError> {
175         self.check(bytes_num).await?;
176         self.bytes_reader.read_bytes(bytes_num)
177     }
178 
179     pub async fn advance_bytes(&mut self, bytes_num: usize) -> Result<BytesMut, BytesReadError> {
180         self.check(bytes_num).await?;
181         self.bytes_reader.advance_bytes(bytes_num)
182     }
183 
184     pub async fn read_bytes_cursor(
185         &mut self,
186         bytes_num: usize,
187     ) -> Result<Cursor<BytesMut>, BytesReadError> {
188         self.check(bytes_num).await?;
189         self.bytes_reader.read_bytes_cursor(bytes_num)
190     }
191 
192     pub async fn advance_bytes_cursor(
193         &mut self,
194         bytes_num: usize,
195     ) -> Result<Cursor<BytesMut>, BytesReadError> {
196         self.check(bytes_num).await?;
197         self.bytes_reader.advance_bytes_cursor(bytes_num)
198     }
199 
200     pub async fn read_u8(&mut self) -> Result<u8, BytesReadError> {
201         self.check(1).await?;
202         self.bytes_reader.read_u8()
203     }
204 
205     pub async fn advance_u8(&mut self) -> Result<u8, BytesReadError> {
206         self.check(1).await?;
207         self.bytes_reader.advance_u8()
208     }
209 
210     pub async fn read_u16<T: ByteOrder>(&mut self) -> Result<u16, BytesReadError> {
211         self.check(2).await?;
212         self.bytes_reader.read_u16::<T>()
213     }
214 
215     pub async fn read_u24<T: ByteOrder>(&mut self) -> Result<u32, BytesReadError> {
216         self.check(3).await?;
217         self.bytes_reader.read_u24::<T>()
218     }
219 
220     pub async fn advance_u24<T: ByteOrder>(&mut self) -> Result<u32, BytesReadError> {
221         self.check(3).await?;
222         self.bytes_reader.advance_u24::<T>()
223     }
224 
225     pub async fn read_u32<T: ByteOrder>(&mut self) -> Result<u32, BytesReadError> {
226         self.check(4).await?;
227         self.bytes_reader.read_u32::<T>()
228     }
229 
230     pub async fn read_f64<T: ByteOrder>(&mut self) -> Result<f64, BytesReadError> {
231         self.check(8).await?;
232         self.bytes_reader.read_f64::<T>()
233     }
234 }
235 
236 #[cfg(test)]
237 mod tests {
238 
239     use super::BytesReader;
240     use bytes::BytesMut;
241     use std::cell::RefCell;
242     use std::rc::Rc;
243 
244     #[test]
245     fn test_rc_refcell() {
246         let reader = Rc::new(RefCell::new(BytesReader::new(BytesMut::new())));
247         let xs: [u8; 3] = [1, 2, 3];
248         reader.borrow_mut().extend_from_slice(&xs[..]);
249 
250         let mut rv = reader.borrow_mut().read_u8().unwrap();
251         assert_eq!(rv, 1, "Incorrect value");
252 
253         rv = reader.borrow_mut().read_u8().unwrap();
254         assert_eq!(rv, 2, "Incorrect value");
255 
256         rv = reader.borrow_mut().read_u8().unwrap();
257         assert_eq!(rv, 3, "Incorrect value");
258     }
259 
260     struct RefStruct {
261         pub reader: Rc<RefCell<BytesReader>>,
262     }
263 
264     impl RefStruct {
265         pub fn new(reader: Rc<RefCell<BytesReader>>) -> Self {
266             Self { reader }
267         }
268 
269         // pub fn read_u8(&mut self) -> u8 {
270         //     return self.reader.borrow_mut().read_u8().unwrap();
271         // }
272 
273         pub fn extend_from_slice(&mut self, data: &[u8]) {
274             self.reader.borrow_mut().extend_from_slice(data);
275         }
276     }
277 
278     #[test]
279     fn test_struct_rc_refcell() {
280         let reader = Rc::new(RefCell::new(BytesReader::new(BytesMut::new())));
281 
282         let mut ref_struct = RefStruct::new(reader);
283 
284         let xs: [u8; 3] = [1, 2, 3];
285         ref_struct.extend_from_slice(&xs);
286 
287         let mut reader = ref_struct.reader.borrow_mut();
288 
289         let mut rv = reader.read_u8().unwrap();
290         assert_eq!(rv, 1, "Incorrect value");
291 
292         rv = reader.read_u8().unwrap();
293         assert_eq!(rv, 2, "Incorrect value");
294 
295         rv = reader.read_u8().unwrap();
296         assert_eq!(rv, 3, "Incorrect value");
297     }
298 }
299