xref: /xiu/protocol/rtmp/src/messages/parser.rs (revision 69de9bbd)
1e2687b83SHarlanC use {
2e2687b83SHarlanC     super::{
3e2687b83SHarlanC         define::{msg_type_id, RtmpMessageData},
4*69de9bbdSHarlanC         errors::MessageError,
5e2687b83SHarlanC     },
6e2687b83SHarlanC     crate::{
7f9029ceaSHarlanC         amf0::{amf0_markers, amf0_reader::Amf0Reader},
8e2687b83SHarlanC         chunk::ChunkInfo,
9e2687b83SHarlanC         protocol_control_messages::reader::ProtocolControlMessageReader,
10e2687b83SHarlanC         user_control_messages::reader::EventMessagesReader,
11b1840569SHarlanC         // utils,
12e2687b83SHarlanC     },
1388325f54SHarlanC     bytesio::bytes_reader::BytesReader,
14e2687b83SHarlanC };
15ff1f1192SHarlanC 
16ff1f1192SHarlanC pub struct MessageParser {
17ff1f1192SHarlanC     chunk_info: ChunkInfo,
18ff1f1192SHarlanC }
19ff1f1192SHarlanC 
20ff1f1192SHarlanC impl MessageParser {
new(chunk_info: ChunkInfo) -> Self21a3d19cccSHarlanC     pub fn new(chunk_info: ChunkInfo) -> Self {
220ca99c20SHarlan         Self { chunk_info }
23ff1f1192SHarlanC     }
parse(self) -> Result<Option<RtmpMessageData>, MessageError>24*69de9bbdSHarlanC     pub fn parse(self) -> Result<Option<RtmpMessageData>, MessageError> {
25cbcc815dSHarlanC         let mut reader = BytesReader::new(self.chunk_info.payload);
26ff1f1192SHarlanC 
27ff1f1192SHarlanC         match self.chunk_info.message_header.msg_type_id {
28ff1f1192SHarlanC             msg_type_id::COMMAND_AMF0 | msg_type_id::COMMAND_AMF3 => {
293312fa17SHarlanC                 if self.chunk_info.message_header.msg_type_id == msg_type_id::COMMAND_AMF3 {
30ff1f1192SHarlanC                     reader.read_u8()?;
31ff1f1192SHarlanC                 }
32ff1f1192SHarlanC                 let mut amf_reader = Amf0Reader::new(reader);
33ff1f1192SHarlanC 
34ff1f1192SHarlanC                 let command_name = amf_reader.read_with_type(amf0_markers::STRING)?;
35ff1f1192SHarlanC                 let transaction_id = amf_reader.read_with_type(amf0_markers::NUMBER)?;
36ff1f1192SHarlanC 
371d352d98SHarlanC                 // match command_name.clone() {
381d352d98SHarlanC                 //     Amf0ValueType::UTF8String(val) => {
391d352d98SHarlanC                 //         log::info!("command_name:{}", val);
401d352d98SHarlanC                 //     }
411d352d98SHarlanC                 //     _ => {}
421d352d98SHarlanC                 // }
431d352d98SHarlanC 
44ff1f1192SHarlanC                 //The third value can be an object or NULL object
45ff1f1192SHarlanC                 let command_obj_raw = amf_reader.read_with_type(amf0_markers::OBJECT);
46ff1f1192SHarlanC                 let command_obj = match command_obj_raw {
47ff1f1192SHarlanC                     Ok(val) => val,
48ff1f1192SHarlanC                     Err(_) => amf_reader.read_with_type(amf0_markers::NULL)?,
49ff1f1192SHarlanC                 };
50ff1f1192SHarlanC 
51ff1f1192SHarlanC                 let others = amf_reader.read_all()?;
52ff1f1192SHarlanC 
53*69de9bbdSHarlanC                 return Ok(Some(RtmpMessageData::Amf0Command {
5485c0af6aSLuca Barbato                     command_name,
5585c0af6aSLuca Barbato                     transaction_id,
56ff1f1192SHarlanC                     command_object: command_obj,
57ff1f1192SHarlanC                     others,
58*69de9bbdSHarlanC                 }));
59ff1f1192SHarlanC             }
60fe91dfa7SHarlanC 
613312fa17SHarlanC             msg_type_id::AUDIO => {
625de1eabbSHarlanC                 log::trace!(
635de1eabbSHarlanC                     "receive audio msg , msg length is{}\n",
64fe91dfa7SHarlanC                     self.chunk_info.message_header.msg_length
65fe91dfa7SHarlanC                 );
66fe91dfa7SHarlanC 
67*69de9bbdSHarlanC                 return Ok(Some(RtmpMessageData::AudioData {
68cbcc815dSHarlanC                     data: reader.extract_remaining_bytes(),
69*69de9bbdSHarlanC                 }));
703312fa17SHarlanC             }
713312fa17SHarlanC             msg_type_id::VIDEO => {
725de1eabbSHarlanC                 log::trace!(
735de1eabbSHarlanC                     "receive video msg , msg length is{}\n",
74fe91dfa7SHarlanC                     self.chunk_info.message_header.msg_length
75fe91dfa7SHarlanC                 );
76*69de9bbdSHarlanC                 return Ok(Some(RtmpMessageData::VideoData {
77cbcc815dSHarlanC                     data: reader.extract_remaining_bytes(),
78*69de9bbdSHarlanC                 }));
793312fa17SHarlanC             }
8097f0b5afSHarlanC             msg_type_id::USER_CONTROL_EVENT => {
815de1eabbSHarlanC                 log::trace!(
825de1eabbSHarlanC                     "receive user control event msg , msg length is{}\n",
83cc18a6e9SHarlanC                     self.chunk_info.message_header.msg_length
84cc18a6e9SHarlanC                 );
8597f0b5afSHarlanC                 let data = EventMessagesReader::new(reader).parse_event()?;
86*69de9bbdSHarlanC                 return Ok(Some(data));
8797f0b5afSHarlanC             }
88ff1f1192SHarlanC             msg_type_id::SET_CHUNK_SIZE => {
89ff1f1192SHarlanC                 let chunk_size = ProtocolControlMessageReader::new(reader).read_set_chunk_size()?;
90*69de9bbdSHarlanC                 return Ok(Some(RtmpMessageData::SetChunkSize { chunk_size }));
91ff1f1192SHarlanC             }
92ff1f1192SHarlanC             msg_type_id::ABORT => {
93ff1f1192SHarlanC                 let chunk_stream_id =
94ff1f1192SHarlanC                     ProtocolControlMessageReader::new(reader).read_abort_message()?;
95*69de9bbdSHarlanC                 return Ok(Some(RtmpMessageData::AbortMessage { chunk_stream_id }));
96ff1f1192SHarlanC             }
97ff1f1192SHarlanC             msg_type_id::ACKNOWLEDGEMENT => {
98ff1f1192SHarlanC                 let sequence_number =
99ff1f1192SHarlanC                     ProtocolControlMessageReader::new(reader).read_acknowledgement()?;
100*69de9bbdSHarlanC                 return Ok(Some(RtmpMessageData::Acknowledgement { sequence_number }));
101ff1f1192SHarlanC             }
102ff1f1192SHarlanC             msg_type_id::WIN_ACKNOWLEDGEMENT_SIZE => {
103ff1f1192SHarlanC                 let size =
104ff1f1192SHarlanC                     ProtocolControlMessageReader::new(reader).read_window_acknowledgement_size()?;
105*69de9bbdSHarlanC                 return Ok(Some(RtmpMessageData::WindowAcknowledgementSize { size }));
106ff1f1192SHarlanC             }
107ff1f1192SHarlanC             msg_type_id::SET_PEER_BANDWIDTH => {
108ff1f1192SHarlanC                 let properties =
109ff1f1192SHarlanC                     ProtocolControlMessageReader::new(reader).read_set_peer_bandwidth()?;
110*69de9bbdSHarlanC                 return Ok(Some(RtmpMessageData::SetPeerBandwidth { properties }));
111ff1f1192SHarlanC             }
112fe91dfa7SHarlanC             msg_type_id::DATA_AMF0 | msg_type_id::DATA_AMF3 => {
11361bf3e1bSHarlanC                 //let values = Amf0Reader::new(reader).read_all()?;
114*69de9bbdSHarlanC                 return Ok(Some(RtmpMessageData::AmfData {
115cbcc815dSHarlanC                     raw_data: reader.extract_remaining_bytes(),
116*69de9bbdSHarlanC                 }));
117fe91dfa7SHarlanC             }
118ff1f1192SHarlanC 
119ff1f1192SHarlanC             msg_type_id::SHARED_OBJ_AMF3 | msg_type_id::SHARED_OBJ_AMF0 => {}
120ff1f1192SHarlanC 
121ff1f1192SHarlanC             msg_type_id::AGGREGATE => {}
122ff1f1192SHarlanC 
123*69de9bbdSHarlanC             _ => {}
124ff1f1192SHarlanC         }
125*69de9bbdSHarlanC         log::warn!(
126c8d4d932SHarlan             "the msg_type_id is not processed: {}",
127c8d4d932SHarlan             self.chunk_info.message_header.msg_type_id
128c8d4d932SHarlan         );
129*69de9bbdSHarlanC         Ok(None)
130ff1f1192SHarlanC     }
131ff1f1192SHarlanC }
1324745db95SHarlanC 
1334745db95SHarlanC #[cfg(test)]
1344745db95SHarlanC mod tests {
1354745db95SHarlanC 
1364745db95SHarlanC     use super::MessageParser;
1374745db95SHarlanC     use crate::chunk::unpacketizer::ChunkUnpacketizer;
1384745db95SHarlanC     use crate::chunk::unpacketizer::UnpackResult;
1394745db95SHarlanC 
1404745db95SHarlanC     #[test]
test_message_parse()1414745db95SHarlanC     fn test_message_parse() {
1424745db95SHarlanC         let mut unpacker = ChunkUnpacketizer::new();
1434745db95SHarlanC 
1444745db95SHarlanC         let data: [u8; 205] = [
1454745db95SHarlanC             2, 0, 0, 0, 0, 0, 4, 1, 0, 0, 0, 0, 0, 0, 16, 0, //set chunk size
1464745db95SHarlanC             //connect
1474745db95SHarlanC             3, //|format+csid|
1484745db95SHarlanC             0, 0, 0, //timestamp
1494745db95SHarlanC             0, 0, 177, //msg_length
1504745db95SHarlanC             20,  //msg_type_id 0x14
1514745db95SHarlanC             0, 0, 0, 0, //msg_stream_id
1524745db95SHarlanC             2, 0, 7, 99, 111, 110, 110, 101, 99, 116, 0, 63, 240, 0, 0, 0, 0, 0, 0, //body
1534745db95SHarlanC             3, 0, 3, 97, 112, 112, 2, 0, 6, 104, 97, 114, 108, 97, 110, 0, 4, 116, 121, 112, 101,
1544745db95SHarlanC             2, 0, 10, 110, 111, 110, 112, 114, 105, 118, 97, 116, 101, 0, 8, 102, 108, 97, 115,
1554745db95SHarlanC             104, 86, 101, 114, 2, 0, 31, 70, 77, 76, 69, 47, 51, 46, 48, 32, 40, 99, 111, 109, 112,
1564745db95SHarlanC             97, 116, 105, 98, 108, 101, 59, 32, 70, 77, 83, 99, 47, 49, 46, 48, 41, 0, 6, 115, 119,
1574745db95SHarlanC             102, 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108,
1584745db95SHarlanC             104, 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 5, 116, 99,
1594745db95SHarlanC             85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108, 104,
1604745db95SHarlanC             111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 0, 9,
1614745db95SHarlanC         ];
1624745db95SHarlanC 
1634745db95SHarlanC         unpacker.extend_data(&data[..]);
1644745db95SHarlanC 
1654745db95SHarlanC         loop {
1664745db95SHarlanC             let result = unpacker.read_chunk();
1674745db95SHarlanC 
1684745db95SHarlanC             let rv = match result {
1694745db95SHarlanC                 Ok(val) => val,
1704745db95SHarlanC                 Err(_) => {
1714745db95SHarlanC                     print!("end-----------");
1724745db95SHarlanC                     return;
1734745db95SHarlanC                 }
1744745db95SHarlanC             };
1754745db95SHarlanC 
1760ca99c20SHarlan             if let UnpackResult::ChunkInfo(chunk_info) = rv {
177f9029ceaSHarlanC                 let _ = chunk_info.message_header.msg_streamd_id;
178f9029ceaSHarlanC                 let _ = chunk_info.message_header.timestamp;
1794745db95SHarlanC 
18085c0af6aSLuca Barbato                 let message_parser = MessageParser::new(chunk_info);
181f9029ceaSHarlanC                 let _ = message_parser.parse();
1824745db95SHarlanC             }
1834745db95SHarlanC         }
1844745db95SHarlanC     }
1854745db95SHarlanC }
186