1 use { 2 super::{ 3 bytes_errors::{BytesWriteError, BytesWriteErrorValue}, 4 bytesio::TNetIO, 5 }, 6 byteorder::{ByteOrder, WriteBytesExt}, 7 bytes::BytesMut, 8 rand, 9 rand::Rng, 10 std::{io::Write, sync::Arc, time::Duration}, 11 tokio::{sync::Mutex, time::timeout}, 12 }; 13 14 pub struct BytesWriter { 15 pub bytes: Vec<u8>, 16 } 17 18 impl Default for BytesWriter { default() -> Self19 fn default() -> Self { 20 Self::new() 21 } 22 } 23 24 impl BytesWriter { new() -> Self25 pub fn new() -> Self { 26 Self { bytes: Vec::new() } 27 } 28 write_u8(&mut self, byte: u8) -> Result<(), BytesWriteError>29 pub fn write_u8(&mut self, byte: u8) -> Result<(), BytesWriteError> { 30 self.bytes.write_u8(byte)?; 31 Ok(()) 32 } 33 or_u8_at(&mut self, position: usize, byte: u8) -> Result<(), BytesWriteError>34 pub fn or_u8_at(&mut self, position: usize, byte: u8) -> Result<(), BytesWriteError> { 35 if position > self.bytes.len() { 36 return Err(BytesWriteError { 37 value: BytesWriteErrorValue::OutofIndex, 38 }); 39 } 40 self.bytes[position] |= byte; 41 42 Ok(()) 43 } 44 add_u8_at(&mut self, position: usize, byte: u8) -> Result<(), BytesWriteError>45 pub fn add_u8_at(&mut self, position: usize, byte: u8) -> Result<(), BytesWriteError> { 46 if position > self.bytes.len() { 47 return Err(BytesWriteError { 48 value: BytesWriteErrorValue::OutofIndex, 49 }); 50 } 51 self.bytes[position] += byte; 52 53 Ok(()) 54 } 55 write_u8_at(&mut self, position: usize, byte: u8) -> Result<(), BytesWriteError>56 pub fn write_u8_at(&mut self, position: usize, byte: u8) -> Result<(), BytesWriteError> { 57 if position > self.bytes.len() { 58 return Err(BytesWriteError { 59 value: BytesWriteErrorValue::OutofIndex, 60 }); 61 } 62 self.bytes[position] = byte; 63 64 Ok(()) 65 } 66 get(&mut self, position: usize) -> Option<&u8>67 pub fn get(&mut self, position: usize) -> Option<&u8> { 68 self.bytes.get(position) 69 } 70 write_u16<T: ByteOrder>(&mut self, bytes: u16) -> Result<(), BytesWriteError>71 pub fn write_u16<T: ByteOrder>(&mut self, bytes: u16) -> Result<(), BytesWriteError> { 72 self.bytes.write_u16::<T>(bytes)?; 73 Ok(()) 74 } 75 write_u24<T: ByteOrder>(&mut self, bytes: u32) -> Result<(), BytesWriteError>76 pub fn write_u24<T: ByteOrder>(&mut self, bytes: u32) -> Result<(), BytesWriteError> { 77 self.bytes.write_u24::<T>(bytes)?; 78 79 Ok(()) 80 } 81 write_u32<T: ByteOrder>(&mut self, bytes: u32) -> Result<(), BytesWriteError>82 pub fn write_u32<T: ByteOrder>(&mut self, bytes: u32) -> Result<(), BytesWriteError> { 83 self.bytes.write_u32::<T>(bytes)?; 84 Ok(()) 85 } 86 write_f64<T: ByteOrder>(&mut self, bytes: f64) -> Result<(), BytesWriteError>87 pub fn write_f64<T: ByteOrder>(&mut self, bytes: f64) -> Result<(), BytesWriteError> { 88 self.bytes.write_f64::<T>(bytes)?; 89 Ok(()) 90 } 91 write_u64<T: ByteOrder>(&mut self, bytes: u64) -> Result<(), BytesWriteError>92 pub fn write_u64<T: ByteOrder>(&mut self, bytes: u64) -> Result<(), BytesWriteError> { 93 self.bytes.write_u64::<T>(bytes)?; 94 Ok(()) 95 } 96 write(&mut self, buf: &[u8]) -> Result<(), BytesWriteError>97 pub fn write(&mut self, buf: &[u8]) -> Result<(), BytesWriteError> { 98 self.bytes.write_all(buf)?; 99 Ok(()) 100 } 101 prepend(&mut self, buf: &[u8]) -> Result<(), BytesWriteError>102 pub fn prepend(&mut self, buf: &[u8]) -> Result<(), BytesWriteError> { 103 let tmp_bytes = self.bytes.clone(); 104 self.bytes.clear(); 105 self.bytes.write_all(buf)?; 106 self.bytes.write_all(tmp_bytes.as_slice())?; 107 Ok(()) 108 } 109 append(&mut self, writer: &mut BytesWriter)110 pub fn append(&mut self, writer: &mut BytesWriter) { 111 self.bytes.append(&mut writer.bytes); 112 } 113 write_random_bytes(&mut self, length: u32) -> Result<(), BytesWriteError>114 pub fn write_random_bytes(&mut self, length: u32) -> Result<(), BytesWriteError> { 115 let mut rng = rand::thread_rng(); 116 for _ in 0..length { 117 self.bytes.write_u8(rng.gen())?; 118 } 119 Ok(()) 120 } extract_current_bytes(&mut self) -> BytesMut121 pub fn extract_current_bytes(&mut self) -> BytesMut { 122 let mut rv_data = BytesMut::new(); 123 rv_data.extend_from_slice(&self.bytes.clone()[..]); 124 self.bytes.clear(); 125 126 rv_data 127 } 128 clear(&mut self)129 pub fn clear(&mut self) { 130 self.bytes.clear(); 131 } 132 get_current_bytes(&self) -> BytesMut133 pub fn get_current_bytes(&self) -> BytesMut { 134 let mut rv_data = BytesMut::new(); 135 rv_data.extend_from_slice(&self.bytes[..]); 136 rv_data 137 } 138 pop_bytes(&mut self, size: usize)139 pub fn pop_bytes(&mut self, size: usize) { 140 for _ in 0..size { 141 self.bytes.pop(); 142 } 143 } 144 len(&self) -> usize145 pub fn len(&self) -> usize { 146 self.bytes.len() 147 } 148 is_empty(&self) -> bool149 pub fn is_empty(&self) -> bool { 150 self.len() == 0 151 } 152 } 153 154 pub struct AsyncBytesWriter { 155 pub bytes_writer: BytesWriter, 156 pub io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, 157 } 158 159 impl AsyncBytesWriter { new(io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>) -> Self160 pub fn new(io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>) -> Self { 161 Self { 162 bytes_writer: BytesWriter::new(), 163 io, 164 } 165 } 166 write_u8(&mut self, byte: u8) -> Result<(), BytesWriteError>167 pub fn write_u8(&mut self, byte: u8) -> Result<(), BytesWriteError> { 168 self.bytes_writer.write_u8(byte) 169 } 170 write_u16<T: ByteOrder>(&mut self, bytes: u16) -> Result<(), BytesWriteError>171 pub fn write_u16<T: ByteOrder>(&mut self, bytes: u16) -> Result<(), BytesWriteError> { 172 self.bytes_writer.write_u16::<T>(bytes) 173 } 174 write_u24<T: ByteOrder>(&mut self, bytes: u32) -> Result<(), BytesWriteError>175 pub fn write_u24<T: ByteOrder>(&mut self, bytes: u32) -> Result<(), BytesWriteError> { 176 self.bytes_writer.write_u24::<T>(bytes) 177 } 178 write_u32<T: ByteOrder>(&mut self, bytes: u32) -> Result<(), BytesWriteError>179 pub fn write_u32<T: ByteOrder>(&mut self, bytes: u32) -> Result<(), BytesWriteError> { 180 self.bytes_writer.write_u32::<T>(bytes) 181 } 182 write_f64<T: ByteOrder>(&mut self, bytes: f64) -> Result<(), BytesWriteError>183 pub fn write_f64<T: ByteOrder>(&mut self, bytes: f64) -> Result<(), BytesWriteError> { 184 self.bytes_writer.write_f64::<T>(bytes) 185 } 186 write(&mut self, buf: &[u8]) -> Result<(), BytesWriteError>187 pub fn write(&mut self, buf: &[u8]) -> Result<(), BytesWriteError> { 188 self.bytes_writer.write(buf) 189 } 190 write_random_bytes(&mut self, length: u32) -> Result<(), BytesWriteError>191 pub fn write_random_bytes(&mut self, length: u32) -> Result<(), BytesWriteError> { 192 self.bytes_writer.write_random_bytes(length) 193 } 194 extract_current_bytes(&mut self) -> BytesMut195 pub fn extract_current_bytes(&mut self) -> BytesMut { 196 self.bytes_writer.extract_current_bytes() 197 } 198 flush(&mut self) -> Result<(), BytesWriteError>199 pub async fn flush(&mut self) -> Result<(), BytesWriteError> { 200 self.io 201 .lock() 202 .await 203 .write(self.bytes_writer.bytes.clone().into()) 204 .await?; 205 self.bytes_writer.bytes.clear(); 206 Ok(()) 207 } 208 flush_timeout(&mut self, duration: Duration) -> Result<(), BytesWriteError>209 pub async fn flush_timeout(&mut self, duration: Duration) -> Result<(), BytesWriteError> { 210 let message = timeout( 211 duration, 212 self.io 213 .lock() 214 .await 215 .write(self.bytes_writer.bytes.clone().into()), 216 ) 217 .await; 218 219 match message { 220 Ok(_) => { 221 self.bytes_writer.bytes.clear(); 222 } 223 Err(_) => { 224 return Err(BytesWriteError { 225 value: BytesWriteErrorValue::Timeout, 226 }) 227 } 228 } 229 230 Ok(()) 231 } 232 } 233 234 #[cfg(test)] 235 mod tests { 236 use std::io::Write; 237 238 #[test] test_write_vec()239 fn test_write_vec() { 240 let mut v: Vec<u8> = Vec::new(); 241 242 v.push(0x01); 243 assert_eq!(1, v.len()); 244 assert_eq!(0x01, v[0]); 245 246 v[0] = 0x02; 247 assert_eq!(0x02, v[0]); 248 249 const FLV_HEADER: [u8; 9] = [ 250 0x46, // 'F' 251 0x4c, //'L' 252 0x56, //'V' 253 0x01, //version 254 0x05, //00000101 audio tag and video tag 255 0x00, 0x00, 0x00, 0x09, //flv header size 256 ]; 257 258 let rv = v.write(&FLV_HEADER); 259 260 if let Ok(val) = rv { 261 print!("{val} "); 262 } 263 264 assert_eq!(10, v.len()); 265 } 266 267 #[test] test_bit_opertion()268 fn test_bit_opertion() { 269 let pts: i64 = 1627702096; 270 271 let val = ((pts << 1) & 0xFE) as u8; 272 273 println!("======={}=======", pts << 1); 274 println!("======={val}======="); 275 } 276 277 #[test] test_bit_opertion2()278 fn test_bit_opertion2() { 279 let flags = 0xC0; 280 let pts: i64 = 1627702096; 281 282 let b9 = ((flags >> 2) & 0x30)/* 0011/0010 */ | (((pts >> 30) & 0x07) << 1) as u8 /* PTS 30-32 */ | 0x01 /* marker_bit */; 283 println!("=======b9{b9}======="); 284 285 let b10 = (pts >> 22) as u8; /* PTS 22-29 */ 286 println!("=======b10{b10}======="); 287 288 let b11 = ((pts >> 14) & 0xFE) as u8 /* PTS 15-21 */ | 0x01; /* marker_bit */ 289 println!("=======b11{b11}======="); 290 291 let b12 = (pts >> 7) as u8; /* PTS 7-14 */ 292 println!("=======b12{b12}======="); 293 294 let b13 = ((pts << 1) & 0xFE) as u8 /* PTS 0-6 */ | 0x01; /* marker_bit */ 295 println!("=======b13{b13}======="); 296 } 297 298 #[test] test_bit_opertion3()299 fn test_bit_opertion3() { 300 //let flags = 0xC0; 301 let pts: i64 = 1627702096; 302 303 let b12 = ((pts & 0x7fff) << 1) | 1; /* PTS 7-14 */ 304 println!("=======b12{}=======", b12 >> 8_u8); 305 println!("=======b13{}=======", b12 as u8); 306 } 307 } 308