1 use { 2 super::{amf0_markers, errors::Amf0ReadErrorValue, Amf0ReadError, Amf0ValueType}, 3 byteorder::BigEndian, 4 // bytes::BytesMut, 5 bytesio::bytes_reader::BytesReader, 6 indexmap::IndexMap, 7 }; 8 9 pub struct Amf0Reader { 10 reader: BytesReader, 11 } 12 13 impl Amf0Reader { new(reader: BytesReader) -> Self14 pub fn new(reader: BytesReader) -> Self { 15 Self { reader } 16 } read_all(&mut self) -> Result<Vec<Amf0ValueType>, Amf0ReadError>17 pub fn read_all(&mut self) -> Result<Vec<Amf0ValueType>, Amf0ReadError> { 18 let mut results = vec![]; 19 20 loop { 21 let result = self.read_any()?; 22 match result { 23 Amf0ValueType::END => { 24 break; 25 } 26 _ => { 27 results.push(result); 28 } 29 } 30 } 31 Ok(results) 32 } read_any(&mut self) -> Result<Amf0ValueType, Amf0ReadError>33 pub fn read_any(&mut self) -> Result<Amf0ValueType, Amf0ReadError> { 34 if self.reader.is_empty() { 35 return Ok(Amf0ValueType::END); 36 } 37 let markers = self.reader.read_u8()?; 38 39 if markers == amf0_markers::OBJECT_END { 40 return Ok(Amf0ValueType::END); 41 } 42 43 match markers { 44 amf0_markers::NUMBER => self.read_number(), 45 amf0_markers::BOOLEAN => self.read_bool(), 46 amf0_markers::STRING => self.read_string(), 47 amf0_markers::OBJECT => self.read_object(), 48 amf0_markers::NULL => self.read_null(), 49 amf0_markers::ECMA_ARRAY => self.read_ecma_array(), 50 amf0_markers::LONG_STRING => self.read_long_string(), 51 _ => Err(Amf0ReadError { 52 value: Amf0ReadErrorValue::UnknownMarker { marker: markers }, 53 }), 54 } 55 } read_with_type(&mut self, specified_marker: u8) -> Result<Amf0ValueType, Amf0ReadError>56 pub fn read_with_type(&mut self, specified_marker: u8) -> Result<Amf0ValueType, Amf0ReadError> { 57 let marker = self.reader.advance_u8()?; 58 59 if marker != specified_marker { 60 return Err(Amf0ReadError { 61 value: Amf0ReadErrorValue::WrongType, 62 }); 63 } 64 65 self.read_any() 66 } 67 read_number(&mut self) -> Result<Amf0ValueType, Amf0ReadError>68 pub fn read_number(&mut self) -> Result<Amf0ValueType, Amf0ReadError> { 69 let number = self.reader.read_f64::<BigEndian>()?; 70 let value = Amf0ValueType::Number(number); 71 Ok(value) 72 } 73 read_bool(&mut self) -> Result<Amf0ValueType, Amf0ReadError>74 pub fn read_bool(&mut self) -> Result<Amf0ValueType, Amf0ReadError> { 75 let value = self.reader.read_u8()?; 76 77 match value { 78 1 => Ok(Amf0ValueType::Boolean(true)), 79 _ => Ok(Amf0ValueType::Boolean(false)), 80 } 81 } 82 read_raw_string(&mut self) -> Result<String, Amf0ReadError>83 pub fn read_raw_string(&mut self) -> Result<String, Amf0ReadError> { 84 let l = self.reader.read_u16::<BigEndian>()?; 85 86 let bytes = self.reader.read_bytes(l as usize)?; 87 let val = String::from_utf8(bytes.to_vec())?; 88 89 Ok(val) 90 } 91 read_string(&mut self) -> Result<Amf0ValueType, Amf0ReadError>92 pub fn read_string(&mut self) -> Result<Amf0ValueType, Amf0ReadError> { 93 let raw_string = self.read_raw_string()?; 94 Ok(Amf0ValueType::UTF8String(raw_string)) 95 } 96 read_null(&mut self) -> Result<Amf0ValueType, Amf0ReadError>97 pub fn read_null(&mut self) -> Result<Amf0ValueType, Amf0ReadError> { 98 Ok(Amf0ValueType::Null) 99 } 100 is_read_object_eof(&mut self) -> Result<bool, Amf0ReadError>101 pub fn is_read_object_eof(&mut self) -> Result<bool, Amf0ReadError> { 102 let marker = self.reader.advance_u24::<BigEndian>()?; 103 if marker == amf0_markers::OBJECT_END as u32 { 104 self.reader.read_u24::<BigEndian>()?; 105 return Ok(true); 106 } 107 Ok(false) 108 } 109 read_object(&mut self) -> Result<Amf0ValueType, Amf0ReadError>110 pub fn read_object(&mut self) -> Result<Amf0ValueType, Amf0ReadError> { 111 let mut properties = IndexMap::new(); 112 113 loop { 114 let is_eof = self.is_read_object_eof()?; 115 116 if is_eof { 117 break; 118 } 119 120 let key = self.read_raw_string()?; 121 let val = self.read_any()?; 122 123 properties.insert(key, val); 124 } 125 126 Ok(Amf0ValueType::Object(properties)) 127 } 128 read_ecma_array(&mut self) -> Result<Amf0ValueType, Amf0ReadError>129 pub fn read_ecma_array(&mut self) -> Result<Amf0ValueType, Amf0ReadError> { 130 let len = self.reader.read_u32::<BigEndian>()?; 131 132 let mut properties = IndexMap::new(); 133 134 //here we do not use length to traverse the map, because in some 135 //other media server, the length is 0 which is not correct. 136 while !self.is_read_object_eof()? { 137 let key = self.read_raw_string()?; 138 let val = self.read_any()?; 139 properties.insert(key, val); 140 } 141 142 if len != properties.len() as u32 { 143 log::warn!("the ecma array length is not correct!"); 144 } 145 146 Ok(Amf0ValueType::Object(properties)) 147 } 148 read_long_string(&mut self) -> Result<Amf0ValueType, Amf0ReadError>149 pub fn read_long_string(&mut self) -> Result<Amf0ValueType, Amf0ReadError> { 150 let l = self.reader.read_u32::<BigEndian>()?; 151 152 let buff = self.reader.read_bytes(l as usize)?; 153 154 let val = String::from_utf8(buff.to_vec())?; 155 Ok(Amf0ValueType::LongUTF8String(val)) 156 } 157 158 // pub fn get_remaining_bytes(&mut self) -> BytesMut { 159 // return self.reader.get_remaining_bytes(); 160 // } 161 } 162 163 #[cfg(test)] 164 mod tests { 165 166 #[test] test_byte_order()167 fn test_byte_order() { 168 use byteorder::{BigEndian, ByteOrder}; 169 170 let phi = 1.6180339887; 171 let mut buf = [0; 8]; 172 BigEndian::write_f64(&mut buf, phi); 173 assert_eq!(phi, BigEndian::read_f64(&buf)); 174 println!("tsetstt") 175 } 176 177 use super::amf0_markers; 178 use super::Amf0Reader; 179 use super::Amf0ValueType; 180 181 use bytes::BytesMut; 182 use bytesio::bytes_reader::BytesReader; 183 184 use indexmap::IndexMap; 185 186 #[test] test_amf_reader()187 fn test_amf_reader() { 188 let data: [u8; 177] = [ 189 2, 0, 7, 99, 111, 110, 110, 101, 99, 116, 0, 63, 240, 0, 0, 0, 0, 0, 0, //body 190 3, 0, 3, 97, 112, 112, 2, 0, 6, 104, 97, 114, 108, 97, 110, 0, 4, 116, 121, 112, 101, 191 2, 0, 10, 110, 111, 110, 112, 114, 105, 118, 97, 116, 101, 0, 8, 102, 108, 97, 115, 192 104, 86, 101, 114, 2, 0, 31, 70, 77, 76, 69, 47, 51, 46, 48, 32, 40, 99, 111, 109, 112, 193 97, 116, 105, 98, 108, 101, 59, 32, 70, 77, 83, 99, 47, 49, 46, 48, 41, 0, 6, 115, 119, 194 102, 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108, 195 104, 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 5, 116, 99, 196 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108, 104, 197 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 0, 9, 198 ]; 199 200 let mut bytes_reader = BytesReader::new(BytesMut::new()); 201 bytes_reader.extend_from_slice(&data); 202 let mut amf_reader = Amf0Reader::new(bytes_reader); 203 204 let command_name = amf_reader.read_with_type(amf0_markers::STRING).unwrap(); 205 assert_eq!( 206 command_name, 207 Amf0ValueType::UTF8String(String::from("connect")) 208 ); 209 210 let transaction_id = amf_reader.read_with_type(amf0_markers::NUMBER).unwrap(); 211 assert_eq!(transaction_id, Amf0ValueType::Number(1.0)); 212 213 let command_obj_raw = amf_reader.read_with_type(amf0_markers::OBJECT).unwrap(); 214 let mut properties = IndexMap::new(); 215 properties.insert( 216 String::from("app"), 217 Amf0ValueType::UTF8String(String::from("harlan")), 218 ); 219 properties.insert( 220 String::from("type"), 221 Amf0ValueType::UTF8String(String::from("nonprivate")), 222 ); 223 properties.insert( 224 String::from("flashVer"), 225 Amf0ValueType::UTF8String(String::from("FMLE/3.0 (compatible; FMSc/1.0)")), 226 ); 227 properties.insert( 228 String::from("swfUrl"), 229 Amf0ValueType::UTF8String(String::from("rtmp://localhost:1935/harlan")), 230 ); 231 properties.insert( 232 String::from("tcUrl"), 233 Amf0ValueType::UTF8String(String::from("rtmp://localhost:1935/harlan")), 234 ); 235 assert_eq!(command_obj_raw, Amf0ValueType::Object(properties)); 236 237 let _ = amf_reader.read_all(); 238 239 print!("test") 240 } 241 242 // fn uint32_to_int24(num: u32) -> i32 { 243 // // 截取低24位 244 // let mut result = num & 0xFFFFFF; 245 246 // let mut result2: i32 = result as i32; 247 248 // // 判断最高位是否为1 249 // if (result & 0x800000) == 0x800000 { 250 // // 获取补码表示 251 // result = (result ^ 0xFFFFFF) + 1; 252 253 // result2 = result as i32 * -1; 254 // } 255 256 // result2 257 // } 258 bytes_to_i24(bytes: [u8; 3]) -> i32259 fn bytes_to_i24(bytes: [u8; 3]) -> i32 { 260 let sign_extend_mask = 0xff_ff << 23; 261 let value = ((bytes[0] as i32) << 16) | ((bytes[1] as i32) << 8) | (bytes[2] as i32); 262 263 if value & (1 << 23) != 0 { 264 // Sign extend the value 265 value | sign_extend_mask 266 } else { 267 value 268 } 269 } 270 #[test] test_number()271 fn test_number() { 272 let data: [u8; 3] = [0xFF, 0xFF, 0xF0]; 273 274 let mut bytes_reader = BytesReader::new(BytesMut::new()); 275 276 bytes_reader.extend_from_slice(&data); 277 278 let mut t: u32 = 0; 279 280 for _ in 0..3 { 281 let time = bytes_reader.read_u8().unwrap(); 282 //print!("==time0=={}\n", time); 283 //print!("==time1=={}\n", self.tag.composition_time); 284 t = (t << 8) + time as u32; 285 } 286 287 println!("number: {}", bytes_to_i24(data)); 288 } 289 290 #[test] test_player_connect_reader()291 fn test_player_connect_reader() { 292 // chunk header 293 // 0000 03 00 00 00 00 00 aa 14 00 00 00 00 294 //amf0 data 295 // 02 00 07 63 ...............c 296 // 0010 6f 6e 6e 65 63 74 00 3f f0 00 00 00 00 00 00 03 onnect.?........ 297 // 0020 00 03 61 70 70 02 00 04 6c 69 76 65 00 05 74 63 ..app...live..tc 298 // 0030 55 72 6c 02 00 1a 72 74 6d 70 3a 2f 2f 6c 6f 63 Url...rtmp://loc 299 // 0040 61 6c 68 6f 73 74 3a 31 39 33 35 2f 6c 69 76 65 alhost:1935/live 300 // 0050 00 04 66 70 61 64 01 00 00 0c 63 61 70 61 62 69 ..fpad....capabi 301 // 0060 6c 69 74 69 65 73 00 40 2e 00 00 00 00 00 00 00 lities.@........ 302 // 0070 0b 61 75 64 69 6f 43 6f 64 65 63 73 00 40 a8 ee .audioCodecs.@.. 303 // 0080 00 00 00 00 00 00 0b 76 69 64 65 6f 43 6f 64 65 .......videoCode 118 105 304 // 0090 63 73 00 40 6f 80 00 00 00 00 00 00 0d 76 69 64 [email protected] 305 // 00a0 65 6f 46 75 6e 63 74 69 6f 6e 00 3f f0 00 00 00 eoFunction.?.... 306 // 0b00 00 00 00 00 00 09 ...... 307 308 let data: [u8; 170] = [ 309 0x02, 0x00, 0x07, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x00, 0x3f, 0xf0, 0x00, 310 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x03, 0x61, 0x70, 0x70, 0x02, 0x00, 0x04, 311 0x6c, 0x69, 0x76, 0x65, 0x00, 0x05, 0x74, 0x63, 0x55, 0x72, 0x6c, 0x02, 0x00, 0x1a, 312 0x72, 0x74, 0x6d, 0x70, 0x3a, 0x2f, 0x2f, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x68, 0x6f, 313 0x73, 0x74, 0x3a, 0x31, 0x39, 0x33, 0x35, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x00, 0x04, 314 0x66, 0x70, 0x61, 0x64, 0x01, 0x00, 0x00, 0x0c, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 315 0x6c, 0x69, 0x74, 0x69, 0x65, 0x73, 0x00, 0x40, 0x2e, 0x00, 0x00, 0x00, 0x00, 0x00, 316 0x00, 0x00, 0x0b, 0x61, 0x75, 0x64, 0x69, 0x6f, 0x43, 0x6f, 0x64, 0x65, 0x63, 0x73, 317 0x00, 0x40, 0xa8, 0xee, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0b, 0x76, 0x69, 0x64, 318 0x65, 0x6f, 0x43, 0x6f, 0x64, 0x65, 0x63, 0x73, 0x00, 0x40, 0x6f, 0x80, 0x00, 0x00, 319 0x00, 0x00, 0x00, 0x00, 0x0d, 0x76, 0x69, 0x64, 0x65, 0x6f, 0x46, 0x75, 0x6e, 0x63, 320 0x74, 0x69, 0x6f, 0x6e, 0x00, 0x3f, 0xf0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 321 0x00, 0x09, 322 ]; 323 324 //76 69 64 65 6f 43 6f 64 65 63 73 325 // 118 105 100 101 111 67 111 100 101 99 115 326 327 let mut bytes_reader = BytesReader::new(BytesMut::new()); 328 bytes_reader.extend_from_slice(&data); 329 let mut amf_reader = Amf0Reader::new(bytes_reader); 330 331 let command_name = amf_reader.read_with_type(amf0_markers::STRING).unwrap(); 332 333 assert_eq!( 334 command_name, 335 Amf0ValueType::UTF8String(String::from("connect")) 336 ); 337 338 let transaction_id = amf_reader.read_with_type(amf0_markers::NUMBER).unwrap(); 339 assert_eq!(transaction_id, Amf0ValueType::Number(1.0)); 340 341 let command_obj_raw = amf_reader.read_with_type(amf0_markers::OBJECT); 342 343 if let Err(err) = &command_obj_raw { 344 println!("adfa{err}"); 345 } 346 347 let mut properties = IndexMap::new(); 348 349 properties.insert(String::from("audioCodecs"), Amf0ValueType::Number(3191.0)); 350 properties.insert(String::from("videoCodecs"), Amf0ValueType::Number(252.0)); 351 properties.insert(String::from("videoFunction"), Amf0ValueType::Number(1.0)); 352 properties.insert( 353 String::from("tcUrl"), 354 Amf0ValueType::UTF8String(String::from("rtmp://localhost:1935/live")), 355 ); 356 357 properties.insert( 358 String::from("app"), 359 Amf0ValueType::UTF8String(String::from("live")), 360 ); 361 362 properties.insert(String::from("fpad"), Amf0ValueType::Boolean(false)); 363 properties.insert(String::from("capabilities"), Amf0ValueType::Number(15.0)); 364 365 // let result = amf_writer.write_any(&Amf0ValueType::Object(properties)); 366 367 // print::printu8(amf_writer.get_current_bytes()); 368 369 assert_eq!(command_obj_raw.unwrap(), Amf0ValueType::Object(properties)); 370 } 371 } 372