xref: /xiu/protocol/rtmp/src/amf0/amf0_reader.rs (revision b36cf5da)
1 use {
2     super::{amf0_markers, errors::Amf0ReadErrorValue, Amf0ReadError, Amf0ValueType},
3     byteorder::BigEndian,
4     // bytes::BytesMut,
5     bytesio::bytes_reader::BytesReader,
6     indexmap::IndexMap,
7 };
8 
9 pub struct Amf0Reader {
10     reader: BytesReader,
11 }
12 
13 impl Amf0Reader {
new(reader: BytesReader) -> Self14     pub fn new(reader: BytesReader) -> Self {
15         Self { reader }
16     }
read_all(&mut self) -> Result<Vec<Amf0ValueType>, Amf0ReadError>17     pub fn read_all(&mut self) -> Result<Vec<Amf0ValueType>, Amf0ReadError> {
18         let mut results = vec![];
19 
20         loop {
21             let result = self.read_any()?;
22             match result {
23                 Amf0ValueType::END => {
24                     break;
25                 }
26                 _ => {
27                     results.push(result);
28                 }
29             }
30         }
31         Ok(results)
32     }
read_any(&mut self) -> Result<Amf0ValueType, Amf0ReadError>33     pub fn read_any(&mut self) -> Result<Amf0ValueType, Amf0ReadError> {
34         if self.reader.is_empty() {
35             return Ok(Amf0ValueType::END);
36         }
37         let markers = self.reader.read_u8()?;
38 
39         if markers == amf0_markers::OBJECT_END {
40             return Ok(Amf0ValueType::END);
41         }
42 
43         match markers {
44             amf0_markers::NUMBER => self.read_number(),
45             amf0_markers::BOOLEAN => self.read_bool(),
46             amf0_markers::STRING => self.read_string(),
47             amf0_markers::OBJECT => self.read_object(),
48             amf0_markers::NULL => self.read_null(),
49             amf0_markers::ECMA_ARRAY => self.read_ecma_array(),
50             amf0_markers::LONG_STRING => self.read_long_string(),
51             _ => Err(Amf0ReadError {
52                 value: Amf0ReadErrorValue::UnknownMarker { marker: markers },
53             }),
54         }
55     }
read_with_type(&mut self, specified_marker: u8) -> Result<Amf0ValueType, Amf0ReadError>56     pub fn read_with_type(&mut self, specified_marker: u8) -> Result<Amf0ValueType, Amf0ReadError> {
57         let marker = self.reader.advance_u8()?;
58 
59         if marker != specified_marker {
60             return Err(Amf0ReadError {
61                 value: Amf0ReadErrorValue::WrongType,
62             });
63         }
64 
65         self.read_any()
66     }
67 
read_number(&mut self) -> Result<Amf0ValueType, Amf0ReadError>68     pub fn read_number(&mut self) -> Result<Amf0ValueType, Amf0ReadError> {
69         let number = self.reader.read_f64::<BigEndian>()?;
70         let value = Amf0ValueType::Number(number);
71         Ok(value)
72     }
73 
read_bool(&mut self) -> Result<Amf0ValueType, Amf0ReadError>74     pub fn read_bool(&mut self) -> Result<Amf0ValueType, Amf0ReadError> {
75         let value = self.reader.read_u8()?;
76 
77         match value {
78             1 => Ok(Amf0ValueType::Boolean(true)),
79             _ => Ok(Amf0ValueType::Boolean(false)),
80         }
81     }
82 
read_raw_string(&mut self) -> Result<String, Amf0ReadError>83     pub fn read_raw_string(&mut self) -> Result<String, Amf0ReadError> {
84         let l = self.reader.read_u16::<BigEndian>()?;
85 
86         let bytes = self.reader.read_bytes(l as usize)?;
87         let val = String::from_utf8(bytes.to_vec())?;
88 
89         Ok(val)
90     }
91 
read_string(&mut self) -> Result<Amf0ValueType, Amf0ReadError>92     pub fn read_string(&mut self) -> Result<Amf0ValueType, Amf0ReadError> {
93         let raw_string = self.read_raw_string()?;
94         Ok(Amf0ValueType::UTF8String(raw_string))
95     }
96 
read_null(&mut self) -> Result<Amf0ValueType, Amf0ReadError>97     pub fn read_null(&mut self) -> Result<Amf0ValueType, Amf0ReadError> {
98         Ok(Amf0ValueType::Null)
99     }
100 
is_read_object_eof(&mut self) -> Result<bool, Amf0ReadError>101     pub fn is_read_object_eof(&mut self) -> Result<bool, Amf0ReadError> {
102         let marker = self.reader.advance_u24::<BigEndian>()?;
103         if marker == amf0_markers::OBJECT_END as u32 {
104             self.reader.read_u24::<BigEndian>()?;
105             return Ok(true);
106         }
107         Ok(false)
108     }
109 
read_object(&mut self) -> Result<Amf0ValueType, Amf0ReadError>110     pub fn read_object(&mut self) -> Result<Amf0ValueType, Amf0ReadError> {
111         let mut properties = IndexMap::new();
112 
113         loop {
114             let is_eof = self.is_read_object_eof()?;
115 
116             if is_eof {
117                 break;
118             }
119 
120             let key = self.read_raw_string()?;
121             let val = self.read_any()?;
122 
123             properties.insert(key, val);
124         }
125 
126         Ok(Amf0ValueType::Object(properties))
127     }
128 
read_ecma_array(&mut self) -> Result<Amf0ValueType, Amf0ReadError>129     pub fn read_ecma_array(&mut self) -> Result<Amf0ValueType, Amf0ReadError> {
130         let len = self.reader.read_u32::<BigEndian>()?;
131 
132         let mut properties = IndexMap::new();
133 
134         //here we do not use length to traverse the map, because in some
135         //other media server, the length is 0 which is not correct.
136         while !self.is_read_object_eof()? {
137             let key = self.read_raw_string()?;
138             let val = self.read_any()?;
139             properties.insert(key, val);
140         }
141 
142         if len != properties.len() as u32 {
143             log::warn!("the ecma array length is not correct!");
144         }
145 
146         Ok(Amf0ValueType::Object(properties))
147     }
148 
read_long_string(&mut self) -> Result<Amf0ValueType, Amf0ReadError>149     pub fn read_long_string(&mut self) -> Result<Amf0ValueType, Amf0ReadError> {
150         let l = self.reader.read_u32::<BigEndian>()?;
151 
152         let buff = self.reader.read_bytes(l as usize)?;
153 
154         let val = String::from_utf8(buff.to_vec())?;
155         Ok(Amf0ValueType::LongUTF8String(val))
156     }
157 
158     // pub fn get_remaining_bytes(&mut self) -> BytesMut {
159     //     return self.reader.get_remaining_bytes();
160     // }
161 }
162 
163 #[cfg(test)]
164 mod tests {
165 
166     #[test]
test_byte_order()167     fn test_byte_order() {
168         use byteorder::{BigEndian, ByteOrder};
169 
170         let phi = 1.6180339887;
171         let mut buf = [0; 8];
172         BigEndian::write_f64(&mut buf, phi);
173         assert_eq!(phi, BigEndian::read_f64(&buf));
174         println!("tsetstt")
175     }
176 
177     use super::amf0_markers;
178     use super::Amf0Reader;
179     use super::Amf0ValueType;
180 
181     use bytes::BytesMut;
182     use bytesio::bytes_reader::BytesReader;
183 
184     use indexmap::IndexMap;
185 
186     #[test]
test_amf_reader()187     fn test_amf_reader() {
188         let data: [u8; 177] = [
189             2, 0, 7, 99, 111, 110, 110, 101, 99, 116, 0, 63, 240, 0, 0, 0, 0, 0, 0, //body
190             3, 0, 3, 97, 112, 112, 2, 0, 6, 104, 97, 114, 108, 97, 110, 0, 4, 116, 121, 112, 101,
191             2, 0, 10, 110, 111, 110, 112, 114, 105, 118, 97, 116, 101, 0, 8, 102, 108, 97, 115,
192             104, 86, 101, 114, 2, 0, 31, 70, 77, 76, 69, 47, 51, 46, 48, 32, 40, 99, 111, 109, 112,
193             97, 116, 105, 98, 108, 101, 59, 32, 70, 77, 83, 99, 47, 49, 46, 48, 41, 0, 6, 115, 119,
194             102, 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108,
195             104, 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 5, 116, 99,
196             85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108, 104,
197             111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 0, 9,
198         ];
199 
200         let mut bytes_reader = BytesReader::new(BytesMut::new());
201         bytes_reader.extend_from_slice(&data);
202         let mut amf_reader = Amf0Reader::new(bytes_reader);
203 
204         let command_name = amf_reader.read_with_type(amf0_markers::STRING).unwrap();
205         assert_eq!(
206             command_name,
207             Amf0ValueType::UTF8String(String::from("connect"))
208         );
209 
210         let transaction_id = amf_reader.read_with_type(amf0_markers::NUMBER).unwrap();
211         assert_eq!(transaction_id, Amf0ValueType::Number(1.0));
212 
213         let command_obj_raw = amf_reader.read_with_type(amf0_markers::OBJECT).unwrap();
214         let mut properties = IndexMap::new();
215         properties.insert(
216             String::from("app"),
217             Amf0ValueType::UTF8String(String::from("harlan")),
218         );
219         properties.insert(
220             String::from("type"),
221             Amf0ValueType::UTF8String(String::from("nonprivate")),
222         );
223         properties.insert(
224             String::from("flashVer"),
225             Amf0ValueType::UTF8String(String::from("FMLE/3.0 (compatible; FMSc/1.0)")),
226         );
227         properties.insert(
228             String::from("swfUrl"),
229             Amf0ValueType::UTF8String(String::from("rtmp://localhost:1935/harlan")),
230         );
231         properties.insert(
232             String::from("tcUrl"),
233             Amf0ValueType::UTF8String(String::from("rtmp://localhost:1935/harlan")),
234         );
235         assert_eq!(command_obj_raw, Amf0ValueType::Object(properties));
236 
237         let _ = amf_reader.read_all();
238 
239         print!("test")
240     }
241 
242     // fn uint32_to_int24(num: u32) -> i32 {
243     //     // 截取低24位
244     //     let mut result = num & 0xFFFFFF;
245 
246     //     let mut result2: i32 = result as i32;
247 
248     //     // 判断最高位是否为1
249     //     if (result & 0x800000) == 0x800000 {
250     //         // 获取补码表示
251     //         result = (result ^ 0xFFFFFF) + 1;
252 
253     //         result2 = result as i32 * -1;
254     //     }
255 
256     //     result2
257     // }
258 
bytes_to_i24(bytes: [u8; 3]) -> i32259     fn bytes_to_i24(bytes: [u8; 3]) -> i32 {
260         let sign_extend_mask = 0xff_ff << 23;
261         let value = ((bytes[0] as i32) << 16) | ((bytes[1] as i32) << 8) | (bytes[2] as i32);
262 
263         if value & (1 << 23) != 0 {
264             // Sign extend the value
265             value | sign_extend_mask
266         } else {
267             value
268         }
269     }
270     #[test]
test_number()271     fn test_number() {
272         let data: [u8; 3] = [0xFF, 0xFF, 0xF0];
273 
274         let mut bytes_reader = BytesReader::new(BytesMut::new());
275 
276         bytes_reader.extend_from_slice(&data);
277 
278         let mut t: u32 = 0;
279 
280         for _ in 0..3 {
281             let time = bytes_reader.read_u8().unwrap();
282             //print!("==time0=={}\n", time);
283             //print!("==time1=={}\n", self.tag.composition_time);
284             t = (t << 8) + time as u32;
285         }
286 
287         println!("number: {}", bytes_to_i24(data));
288     }
289 
290     #[test]
test_player_connect_reader()291     fn test_player_connect_reader() {
292         // chunk header
293         // 0000   03 00 00 00 00 00 aa 14 00 00 00 00
294         //amf0 data
295         //                                            02 00 07 63  ...............c
296         // 0010   6f 6e 6e 65 63 74 00 3f f0 00 00 00 00 00 00 03  onnect.?........
297         // 0020   00 03 61 70 70 02 00 04 6c 69 76 65 00 05 74 63  ..app...live..tc
298         // 0030   55 72 6c 02 00 1a 72 74 6d 70 3a 2f 2f 6c 6f 63  Url...rtmp://loc
299         // 0040   61 6c 68 6f 73 74 3a 31 39 33 35 2f 6c 69 76 65  alhost:1935/live
300         // 0050   00 04 66 70 61 64 01 00 00 0c 63 61 70 61 62 69  ..fpad....capabi
301         // 0060   6c 69 74 69 65 73 00 40 2e 00 00 00 00 00 00 00  lities.@........
302         // 0070   0b 61 75 64 69 6f 43 6f 64 65 63 73 00 40 a8 ee  .audioCodecs.@..
303         // 0080   00 00 00 00 00 00 0b 76 69 64 65 6f 43 6f 64 65  .......videoCode 118 105
304         // 0090   63 73 00 40 6f 80 00 00 00 00 00 00 0d 76 69 64  [email protected]
305         // 00a0   65 6f 46 75 6e 63 74 69 6f 6e 00 3f f0 00 00 00  eoFunction.?....
306         // 0b00   00 00 00 00 00 09                                ......
307 
308         let data: [u8; 170] = [
309             0x02, 0x00, 0x07, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x00, 0x3f, 0xf0, 0x00,
310             0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x03, 0x61, 0x70, 0x70, 0x02, 0x00, 0x04,
311             0x6c, 0x69, 0x76, 0x65, 0x00, 0x05, 0x74, 0x63, 0x55, 0x72, 0x6c, 0x02, 0x00, 0x1a,
312             0x72, 0x74, 0x6d, 0x70, 0x3a, 0x2f, 0x2f, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x68, 0x6f,
313             0x73, 0x74, 0x3a, 0x31, 0x39, 0x33, 0x35, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x00, 0x04,
314             0x66, 0x70, 0x61, 0x64, 0x01, 0x00, 0x00, 0x0c, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69,
315             0x6c, 0x69, 0x74, 0x69, 0x65, 0x73, 0x00, 0x40, 0x2e, 0x00, 0x00, 0x00, 0x00, 0x00,
316             0x00, 0x00, 0x0b, 0x61, 0x75, 0x64, 0x69, 0x6f, 0x43, 0x6f, 0x64, 0x65, 0x63, 0x73,
317             0x00, 0x40, 0xa8, 0xee, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0b, 0x76, 0x69, 0x64,
318             0x65, 0x6f, 0x43, 0x6f, 0x64, 0x65, 0x63, 0x73, 0x00, 0x40, 0x6f, 0x80, 0x00, 0x00,
319             0x00, 0x00, 0x00, 0x00, 0x0d, 0x76, 0x69, 0x64, 0x65, 0x6f, 0x46, 0x75, 0x6e, 0x63,
320             0x74, 0x69, 0x6f, 0x6e, 0x00, 0x3f, 0xf0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
321             0x00, 0x09,
322         ];
323 
324         //76 69 64 65 6f 43 6f 64 65 63 73
325         // 118 105 100 101  111 67 111 100 101 99 115
326 
327         let mut bytes_reader = BytesReader::new(BytesMut::new());
328         bytes_reader.extend_from_slice(&data);
329         let mut amf_reader = Amf0Reader::new(bytes_reader);
330 
331         let command_name = amf_reader.read_with_type(amf0_markers::STRING).unwrap();
332 
333         assert_eq!(
334             command_name,
335             Amf0ValueType::UTF8String(String::from("connect"))
336         );
337 
338         let transaction_id = amf_reader.read_with_type(amf0_markers::NUMBER).unwrap();
339         assert_eq!(transaction_id, Amf0ValueType::Number(1.0));
340 
341         let command_obj_raw = amf_reader.read_with_type(amf0_markers::OBJECT);
342 
343         if let Err(err) = &command_obj_raw {
344             println!("adfa{err}");
345         }
346 
347         let mut properties = IndexMap::new();
348 
349         properties.insert(String::from("audioCodecs"), Amf0ValueType::Number(3191.0));
350         properties.insert(String::from("videoCodecs"), Amf0ValueType::Number(252.0));
351         properties.insert(String::from("videoFunction"), Amf0ValueType::Number(1.0));
352         properties.insert(
353             String::from("tcUrl"),
354             Amf0ValueType::UTF8String(String::from("rtmp://localhost:1935/live")),
355         );
356 
357         properties.insert(
358             String::from("app"),
359             Amf0ValueType::UTF8String(String::from("live")),
360         );
361 
362         properties.insert(String::from("fpad"), Amf0ValueType::Boolean(false));
363         properties.insert(String::from("capabilities"), Amf0ValueType::Number(15.0));
364 
365         // let result = amf_writer.write_any(&Amf0ValueType::Object(properties));
366 
367         // print::printu8(amf_writer.get_current_bytes());
368 
369         assert_eq!(command_obj_raw.unwrap(), Amf0ValueType::Object(properties));
370     }
371 }
372