xref: /xiu/protocol/rtmp/src/messages/parser.rs (revision 88d91efd)
1 use {
2     super::{
3         define::{msg_type_id, RtmpMessageData},
4         errors::{MessageError, MessageErrorValue},
5     },
6     crate::{
7         amf0::{amf0_markers, amf0_reader::Amf0Reader},
8         chunk::ChunkInfo,
9         protocol_control_messages::reader::ProtocolControlMessageReader,
10         user_control_messages::reader::EventMessagesReader,
11         // utils,
12     },
13     bytesio::bytes_reader::BytesReader,
14 };
15 
16 pub struct MessageParser {
17     chunk_info: ChunkInfo,
18 }
19 
20 impl MessageParser {
21     pub fn new(chunk_info: ChunkInfo) -> Self {
22         Self {
23             chunk_info: chunk_info,
24         }
25     }
26     pub fn parse(&mut self) -> Result<RtmpMessageData, MessageError> {
27         let mut reader = BytesReader::new(self.chunk_info.payload.clone());
28 
29         match self.chunk_info.message_header.msg_type_id {
30             msg_type_id::COMMAND_AMF0 | msg_type_id::COMMAND_AMF3 => {
31                 if self.chunk_info.message_header.msg_type_id == msg_type_id::COMMAND_AMF3 {
32                     reader.read_u8()?;
33                 }
34                 let mut amf_reader = Amf0Reader::new(reader);
35 
36                 let command_name = amf_reader.read_with_type(amf0_markers::STRING)?;
37                 let transaction_id = amf_reader.read_with_type(amf0_markers::NUMBER)?;
38 
39                 //The third value can be an object or NULL object
40                 let command_obj_raw = amf_reader.read_with_type(amf0_markers::OBJECT);
41                 let command_obj = match command_obj_raw {
42                     Ok(val) => val,
43                     Err(_) => amf_reader.read_with_type(amf0_markers::NULL)?,
44                 };
45 
46                 let others = amf_reader.read_all()?;
47 
48                 return Ok(RtmpMessageData::Amf0Command {
49                     command_name: command_name,
50                     transaction_id: transaction_id,
51                     command_object: command_obj,
52                     others,
53                 });
54             }
55 
56             msg_type_id::AUDIO => {
57                 log::trace!(
58                     "receive audio msg , msg length is{}\n",
59                     self.chunk_info.message_header.msg_length
60                 );
61 
62                 return Ok(RtmpMessageData::AudioData {
63                     data: self.chunk_info.payload.clone(),
64                 });
65             }
66             msg_type_id::VIDEO => {
67                 log::trace!(
68                     "receive video msg , msg length is{}\n",
69                     self.chunk_info.message_header.msg_length
70                 );
71                 return Ok(RtmpMessageData::VideoData {
72                     data: self.chunk_info.payload.clone(),
73                 });
74             }
75             msg_type_id::USER_CONTROL_EVENT => {
76                 log::trace!(
77                     "receive user control event msg , msg length is{}\n",
78                     self.chunk_info.message_header.msg_length
79                 );
80                 let data = EventMessagesReader::new(reader).parse_event()?;
81                 return Ok(data);
82             }
83             msg_type_id::SET_CHUNK_SIZE => {
84                 let chunk_size = ProtocolControlMessageReader::new(reader).read_set_chunk_size()?;
85                 return Ok(RtmpMessageData::SetChunkSize {
86                     chunk_size: chunk_size,
87                 });
88             }
89             msg_type_id::ABORT => {
90                 let chunk_stream_id =
91                     ProtocolControlMessageReader::new(reader).read_abort_message()?;
92                 return Ok(RtmpMessageData::AbortMessage {
93                     chunk_stream_id: chunk_stream_id,
94                 });
95             }
96             msg_type_id::ACKNOWLEDGEMENT => {
97                 let sequence_number =
98                     ProtocolControlMessageReader::new(reader).read_acknowledgement()?;
99                 return Ok(RtmpMessageData::Acknowledgement {
100                     sequence_number: sequence_number,
101                 });
102             }
103             msg_type_id::WIN_ACKNOWLEDGEMENT_SIZE => {
104                 let size =
105                     ProtocolControlMessageReader::new(reader).read_window_acknowledgement_size()?;
106                 return Ok(RtmpMessageData::WindowAcknowledgementSize { size: size });
107             }
108             msg_type_id::SET_PEER_BANDWIDTH => {
109                 let properties =
110                     ProtocolControlMessageReader::new(reader).read_set_peer_bandwidth()?;
111                 return Ok(RtmpMessageData::SetPeerBandwidth {
112                     properties: properties,
113                 });
114             }
115             msg_type_id::DATA_AMF0 | msg_type_id::DATA_AMF3 => {
116                 //let values = Amf0Reader::new(reader).read_all()?;
117                 return Ok(RtmpMessageData::AmfData {
118                     raw_data: self.chunk_info.payload.clone(),
119                 });
120             }
121 
122             msg_type_id::SHARED_OBJ_AMF3 | msg_type_id::SHARED_OBJ_AMF0 => {}
123 
124             msg_type_id::AGGREGATE => {}
125 
126             _ => {
127                 return Err(MessageError {
128                     value: MessageErrorValue::UnknowMessageType,
129                 });
130             }
131         }
132         return Err(MessageError {
133             value: MessageErrorValue::UnknowMessageType,
134         });
135     }
136 }
137 
138 #[cfg(test)]
139 mod tests {
140 
141     use super::MessageParser;
142     use crate::chunk::unpacketizer::ChunkUnpacketizer;
143     use crate::chunk::unpacketizer::UnpackResult;
144 
145     #[test]
146     fn test_message_parse() {
147         let mut unpacker = ChunkUnpacketizer::new();
148 
149         let data: [u8; 205] = [
150             2, 0, 0, 0, 0, 0, 4, 1, 0, 0, 0, 0, 0, 0, 16, 0, //set chunk size
151             //connect
152             3, //|format+csid|
153             0, 0, 0, //timestamp
154             0, 0, 177, //msg_length
155             20,  //msg_type_id 0x14
156             0, 0, 0, 0, //msg_stream_id
157             2, 0, 7, 99, 111, 110, 110, 101, 99, 116, 0, 63, 240, 0, 0, 0, 0, 0, 0, //body
158             3, 0, 3, 97, 112, 112, 2, 0, 6, 104, 97, 114, 108, 97, 110, 0, 4, 116, 121, 112, 101,
159             2, 0, 10, 110, 111, 110, 112, 114, 105, 118, 97, 116, 101, 0, 8, 102, 108, 97, 115,
160             104, 86, 101, 114, 2, 0, 31, 70, 77, 76, 69, 47, 51, 46, 48, 32, 40, 99, 111, 109, 112,
161             97, 116, 105, 98, 108, 101, 59, 32, 70, 77, 83, 99, 47, 49, 46, 48, 41, 0, 6, 115, 119,
162             102, 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108,
163             104, 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 5, 116, 99,
164             85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108, 104,
165             111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 0, 9,
166         ];
167 
168         unpacker.extend_data(&data[..]);
169 
170         loop {
171             let result = unpacker.read_chunk();
172 
173             let rv = match result {
174                 Ok(val) => val,
175                 Err(_) => {
176                     print!("end-----------");
177                     return;
178                 }
179             };
180 
181             match rv {
182                 UnpackResult::ChunkInfo(chunk_info) => {
183                     let _ = chunk_info.message_header.msg_streamd_id;
184                     let _ = chunk_info.message_header.timestamp;
185 
186                     let mut message_parser = MessageParser::new(chunk_info);
187                     let _ = message_parser.parse();
188                 }
189                 _ => {}
190             }
191         }
192     }
193 
194     use uuid::Uuid;
195 
196     #[test]
197     fn test_uuid() {
198         let my_uuid = Uuid::new_v4();
199         println!("{}", my_uuid);
200     }
201 }
202