xref: /xiu/protocol/rtmp/src/messages/parser.rs (revision 2f8005f4)
1 use {
2     super::{
3         define::{msg_type_id, RtmpMessageData},
4         errors::{MessageError, MessageErrorValue},
5     },
6     crate::{
7         amf0::{amf0_markers, amf0_reader::Amf0Reader},
8         chunk::ChunkInfo,
9         protocol_control_messages::reader::ProtocolControlMessageReader,
10         user_control_messages::reader::EventMessagesReader,
11         // utils,
12     },
13     bytesio::bytes_reader::BytesReader,
14 };
15 
16 pub struct MessageParser {
17     chunk_info: ChunkInfo,
18 }
19 
20 impl MessageParser {
21     pub fn new(chunk_info: ChunkInfo) -> Self {
22         Self { chunk_info }
23     }
24     pub fn parse(self) -> Result<RtmpMessageData, MessageError> {
25         let mut reader = BytesReader::new(self.chunk_info.payload);
26 
27         match self.chunk_info.message_header.msg_type_id {
28             msg_type_id::COMMAND_AMF0 | msg_type_id::COMMAND_AMF3 => {
29                 if self.chunk_info.message_header.msg_type_id == msg_type_id::COMMAND_AMF3 {
30                     reader.read_u8()?;
31                 }
32                 let mut amf_reader = Amf0Reader::new(reader);
33 
34                 let command_name = amf_reader.read_with_type(amf0_markers::STRING)?;
35                 let transaction_id = amf_reader.read_with_type(amf0_markers::NUMBER)?;
36 
37                 // match command_name.clone() {
38                 //     Amf0ValueType::UTF8String(val) => {
39                 //         log::info!("command_name:{}", val);
40                 //     }
41                 //     _ => {}
42                 // }
43 
44                 //The third value can be an object or NULL object
45                 let command_obj_raw = amf_reader.read_with_type(amf0_markers::OBJECT);
46                 let command_obj = match command_obj_raw {
47                     Ok(val) => val,
48                     Err(_) => amf_reader.read_with_type(amf0_markers::NULL)?,
49                 };
50 
51                 let others = amf_reader.read_all()?;
52 
53                 return Ok(RtmpMessageData::Amf0Command {
54                     command_name,
55                     transaction_id,
56                     command_object: command_obj,
57                     others,
58                 });
59             }
60 
61             msg_type_id::AUDIO => {
62                 log::trace!(
63                     "receive audio msg , msg length is{}\n",
64                     self.chunk_info.message_header.msg_length
65                 );
66 
67                 return Ok(RtmpMessageData::AudioData {
68                     data: reader.extract_remaining_bytes(),
69                 });
70             }
71             msg_type_id::VIDEO => {
72                 log::trace!(
73                     "receive video msg , msg length is{}\n",
74                     self.chunk_info.message_header.msg_length
75                 );
76                 return Ok(RtmpMessageData::VideoData {
77                     data: reader.extract_remaining_bytes(),
78                 });
79             }
80             msg_type_id::USER_CONTROL_EVENT => {
81                 log::trace!(
82                     "receive user control event msg , msg length is{}\n",
83                     self.chunk_info.message_header.msg_length
84                 );
85                 let data = EventMessagesReader::new(reader).parse_event()?;
86                 return Ok(data);
87             }
88             msg_type_id::SET_CHUNK_SIZE => {
89                 let chunk_size = ProtocolControlMessageReader::new(reader).read_set_chunk_size()?;
90                 return Ok(RtmpMessageData::SetChunkSize { chunk_size });
91             }
92             msg_type_id::ABORT => {
93                 let chunk_stream_id =
94                     ProtocolControlMessageReader::new(reader).read_abort_message()?;
95                 return Ok(RtmpMessageData::AbortMessage { chunk_stream_id });
96             }
97             msg_type_id::ACKNOWLEDGEMENT => {
98                 let sequence_number =
99                     ProtocolControlMessageReader::new(reader).read_acknowledgement()?;
100                 return Ok(RtmpMessageData::Acknowledgement { sequence_number });
101             }
102             msg_type_id::WIN_ACKNOWLEDGEMENT_SIZE => {
103                 let size =
104                     ProtocolControlMessageReader::new(reader).read_window_acknowledgement_size()?;
105                 return Ok(RtmpMessageData::WindowAcknowledgementSize { size });
106             }
107             msg_type_id::SET_PEER_BANDWIDTH => {
108                 let properties =
109                     ProtocolControlMessageReader::new(reader).read_set_peer_bandwidth()?;
110                 return Ok(RtmpMessageData::SetPeerBandwidth { properties });
111             }
112             msg_type_id::DATA_AMF0 | msg_type_id::DATA_AMF3 => {
113                 //let values = Amf0Reader::new(reader).read_all()?;
114                 return Ok(RtmpMessageData::AmfData {
115                     raw_data: reader.extract_remaining_bytes(),
116                 });
117             }
118 
119             msg_type_id::SHARED_OBJ_AMF3 | msg_type_id::SHARED_OBJ_AMF0 => {}
120 
121             msg_type_id::AGGREGATE => {}
122 
123             _ => {
124                 log::error!(
125                     "the msg_type_id is not supported: {}",
126                     self.chunk_info.message_header.msg_type_id
127                 );
128                 return Err(MessageError {
129                     value: MessageErrorValue::UnknowMessageType,
130                 });
131             }
132         }
133         log::error!(
134             "the msg_type_id is not processed: {}",
135             self.chunk_info.message_header.msg_type_id
136         );
137         Err(MessageError {
138             value: MessageErrorValue::UnknowMessageType,
139         })
140     }
141 }
142 
143 #[cfg(test)]
144 mod tests {
145 
146     use super::MessageParser;
147     use crate::chunk::unpacketizer::ChunkUnpacketizer;
148     use crate::chunk::unpacketizer::UnpackResult;
149 
150     #[test]
151     fn test_message_parse() {
152         let mut unpacker = ChunkUnpacketizer::new();
153 
154         let data: [u8; 205] = [
155             2, 0, 0, 0, 0, 0, 4, 1, 0, 0, 0, 0, 0, 0, 16, 0, //set chunk size
156             //connect
157             3, //|format+csid|
158             0, 0, 0, //timestamp
159             0, 0, 177, //msg_length
160             20,  //msg_type_id 0x14
161             0, 0, 0, 0, //msg_stream_id
162             2, 0, 7, 99, 111, 110, 110, 101, 99, 116, 0, 63, 240, 0, 0, 0, 0, 0, 0, //body
163             3, 0, 3, 97, 112, 112, 2, 0, 6, 104, 97, 114, 108, 97, 110, 0, 4, 116, 121, 112, 101,
164             2, 0, 10, 110, 111, 110, 112, 114, 105, 118, 97, 116, 101, 0, 8, 102, 108, 97, 115,
165             104, 86, 101, 114, 2, 0, 31, 70, 77, 76, 69, 47, 51, 46, 48, 32, 40, 99, 111, 109, 112,
166             97, 116, 105, 98, 108, 101, 59, 32, 70, 77, 83, 99, 47, 49, 46, 48, 41, 0, 6, 115, 119,
167             102, 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108,
168             104, 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 5, 116, 99,
169             85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108, 104,
170             111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 0, 9,
171         ];
172 
173         unpacker.extend_data(&data[..]);
174 
175         loop {
176             let result = unpacker.read_chunk();
177 
178             let rv = match result {
179                 Ok(val) => val,
180                 Err(_) => {
181                     print!("end-----------");
182                     return;
183                 }
184             };
185 
186             if let UnpackResult::ChunkInfo(chunk_info) = rv {
187                 let _ = chunk_info.message_header.msg_streamd_id;
188                 let _ = chunk_info.message_header.timestamp;
189 
190                 let message_parser = MessageParser::new(chunk_info);
191                 let _ = message_parser.parse();
192             }
193         }
194     }
195 }
196