xref: /xiu/protocol/rtmp/src/messages/parser.rs (revision 69de9bbd)
1 use {
2     super::{
3         define::{msg_type_id, RtmpMessageData},
4         errors::MessageError,
5     },
6     crate::{
7         amf0::{amf0_markers, amf0_reader::Amf0Reader},
8         chunk::ChunkInfo,
9         protocol_control_messages::reader::ProtocolControlMessageReader,
10         user_control_messages::reader::EventMessagesReader,
11         // utils,
12     },
13     bytesio::bytes_reader::BytesReader,
14 };
15 
16 pub struct MessageParser {
17     chunk_info: ChunkInfo,
18 }
19 
20 impl MessageParser {
new(chunk_info: ChunkInfo) -> Self21     pub fn new(chunk_info: ChunkInfo) -> Self {
22         Self { chunk_info }
23     }
parse(self) -> Result<Option<RtmpMessageData>, MessageError>24     pub fn parse(self) -> Result<Option<RtmpMessageData>, MessageError> {
25         let mut reader = BytesReader::new(self.chunk_info.payload);
26 
27         match self.chunk_info.message_header.msg_type_id {
28             msg_type_id::COMMAND_AMF0 | msg_type_id::COMMAND_AMF3 => {
29                 if self.chunk_info.message_header.msg_type_id == msg_type_id::COMMAND_AMF3 {
30                     reader.read_u8()?;
31                 }
32                 let mut amf_reader = Amf0Reader::new(reader);
33 
34                 let command_name = amf_reader.read_with_type(amf0_markers::STRING)?;
35                 let transaction_id = amf_reader.read_with_type(amf0_markers::NUMBER)?;
36 
37                 // match command_name.clone() {
38                 //     Amf0ValueType::UTF8String(val) => {
39                 //         log::info!("command_name:{}", val);
40                 //     }
41                 //     _ => {}
42                 // }
43 
44                 //The third value can be an object or NULL object
45                 let command_obj_raw = amf_reader.read_with_type(amf0_markers::OBJECT);
46                 let command_obj = match command_obj_raw {
47                     Ok(val) => val,
48                     Err(_) => amf_reader.read_with_type(amf0_markers::NULL)?,
49                 };
50 
51                 let others = amf_reader.read_all()?;
52 
53                 return Ok(Some(RtmpMessageData::Amf0Command {
54                     command_name,
55                     transaction_id,
56                     command_object: command_obj,
57                     others,
58                 }));
59             }
60 
61             msg_type_id::AUDIO => {
62                 log::trace!(
63                     "receive audio msg , msg length is{}\n",
64                     self.chunk_info.message_header.msg_length
65                 );
66 
67                 return Ok(Some(RtmpMessageData::AudioData {
68                     data: reader.extract_remaining_bytes(),
69                 }));
70             }
71             msg_type_id::VIDEO => {
72                 log::trace!(
73                     "receive video msg , msg length is{}\n",
74                     self.chunk_info.message_header.msg_length
75                 );
76                 return Ok(Some(RtmpMessageData::VideoData {
77                     data: reader.extract_remaining_bytes(),
78                 }));
79             }
80             msg_type_id::USER_CONTROL_EVENT => {
81                 log::trace!(
82                     "receive user control event msg , msg length is{}\n",
83                     self.chunk_info.message_header.msg_length
84                 );
85                 let data = EventMessagesReader::new(reader).parse_event()?;
86                 return Ok(Some(data));
87             }
88             msg_type_id::SET_CHUNK_SIZE => {
89                 let chunk_size = ProtocolControlMessageReader::new(reader).read_set_chunk_size()?;
90                 return Ok(Some(RtmpMessageData::SetChunkSize { chunk_size }));
91             }
92             msg_type_id::ABORT => {
93                 let chunk_stream_id =
94                     ProtocolControlMessageReader::new(reader).read_abort_message()?;
95                 return Ok(Some(RtmpMessageData::AbortMessage { chunk_stream_id }));
96             }
97             msg_type_id::ACKNOWLEDGEMENT => {
98                 let sequence_number =
99                     ProtocolControlMessageReader::new(reader).read_acknowledgement()?;
100                 return Ok(Some(RtmpMessageData::Acknowledgement { sequence_number }));
101             }
102             msg_type_id::WIN_ACKNOWLEDGEMENT_SIZE => {
103                 let size =
104                     ProtocolControlMessageReader::new(reader).read_window_acknowledgement_size()?;
105                 return Ok(Some(RtmpMessageData::WindowAcknowledgementSize { size }));
106             }
107             msg_type_id::SET_PEER_BANDWIDTH => {
108                 let properties =
109                     ProtocolControlMessageReader::new(reader).read_set_peer_bandwidth()?;
110                 return Ok(Some(RtmpMessageData::SetPeerBandwidth { properties }));
111             }
112             msg_type_id::DATA_AMF0 | msg_type_id::DATA_AMF3 => {
113                 //let values = Amf0Reader::new(reader).read_all()?;
114                 return Ok(Some(RtmpMessageData::AmfData {
115                     raw_data: reader.extract_remaining_bytes(),
116                 }));
117             }
118 
119             msg_type_id::SHARED_OBJ_AMF3 | msg_type_id::SHARED_OBJ_AMF0 => {}
120 
121             msg_type_id::AGGREGATE => {}
122 
123             _ => {}
124         }
125         log::warn!(
126             "the msg_type_id is not processed: {}",
127             self.chunk_info.message_header.msg_type_id
128         );
129         Ok(None)
130     }
131 }
132 
133 #[cfg(test)]
134 mod tests {
135 
136     use super::MessageParser;
137     use crate::chunk::unpacketizer::ChunkUnpacketizer;
138     use crate::chunk::unpacketizer::UnpackResult;
139 
140     #[test]
test_message_parse()141     fn test_message_parse() {
142         let mut unpacker = ChunkUnpacketizer::new();
143 
144         let data: [u8; 205] = [
145             2, 0, 0, 0, 0, 0, 4, 1, 0, 0, 0, 0, 0, 0, 16, 0, //set chunk size
146             //connect
147             3, //|format+csid|
148             0, 0, 0, //timestamp
149             0, 0, 177, //msg_length
150             20,  //msg_type_id 0x14
151             0, 0, 0, 0, //msg_stream_id
152             2, 0, 7, 99, 111, 110, 110, 101, 99, 116, 0, 63, 240, 0, 0, 0, 0, 0, 0, //body
153             3, 0, 3, 97, 112, 112, 2, 0, 6, 104, 97, 114, 108, 97, 110, 0, 4, 116, 121, 112, 101,
154             2, 0, 10, 110, 111, 110, 112, 114, 105, 118, 97, 116, 101, 0, 8, 102, 108, 97, 115,
155             104, 86, 101, 114, 2, 0, 31, 70, 77, 76, 69, 47, 51, 46, 48, 32, 40, 99, 111, 109, 112,
156             97, 116, 105, 98, 108, 101, 59, 32, 70, 77, 83, 99, 47, 49, 46, 48, 41, 0, 6, 115, 119,
157             102, 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108,
158             104, 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 5, 116, 99,
159             85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108, 104,
160             111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 0, 9,
161         ];
162 
163         unpacker.extend_data(&data[..]);
164 
165         loop {
166             let result = unpacker.read_chunk();
167 
168             let rv = match result {
169                 Ok(val) => val,
170                 Err(_) => {
171                     print!("end-----------");
172                     return;
173                 }
174             };
175 
176             if let UnpackResult::ChunkInfo(chunk_info) = rv {
177                 let _ = chunk_info.message_header.msg_streamd_id;
178                 let _ = chunk_info.message_header.timestamp;
179 
180                 let message_parser = MessageParser::new(chunk_info);
181                 let _ = message_parser.parse();
182             }
183         }
184     }
185 }
186