1 use { 2 super::{ 3 define::{msg_type_id, RtmpMessageData}, 4 errors::{MessageError, MessageErrorValue}, 5 }, 6 crate::{ 7 amf0::{amf0_markers, amf0_reader::Amf0Reader}, 8 chunk::ChunkInfo, 9 protocol_control_messages::reader::ProtocolControlMessageReader, 10 user_control_messages::reader::EventMessagesReader, 11 // utils, 12 }, 13 bytesio::bytes_reader::BytesReader, 14 }; 15 16 pub struct MessageParser { 17 chunk_info: ChunkInfo, 18 } 19 20 impl MessageParser { 21 pub fn new(chunk_info: ChunkInfo) -> Self { 22 Self { chunk_info } 23 } 24 pub fn parse(self) -> Result<RtmpMessageData, MessageError> { 25 let mut reader = BytesReader::new(self.chunk_info.payload); 26 27 match self.chunk_info.message_header.msg_type_id { 28 msg_type_id::COMMAND_AMF0 | msg_type_id::COMMAND_AMF3 => { 29 if self.chunk_info.message_header.msg_type_id == msg_type_id::COMMAND_AMF3 { 30 reader.read_u8()?; 31 } 32 let mut amf_reader = Amf0Reader::new(reader); 33 34 let command_name = amf_reader.read_with_type(amf0_markers::STRING)?; 35 let transaction_id = amf_reader.read_with_type(amf0_markers::NUMBER)?; 36 37 // match command_name.clone() { 38 // Amf0ValueType::UTF8String(val) => { 39 // log::info!("command_name:{}", val); 40 // } 41 // _ => {} 42 // } 43 44 //The third value can be an object or NULL object 45 let command_obj_raw = amf_reader.read_with_type(amf0_markers::OBJECT); 46 let command_obj = match command_obj_raw { 47 Ok(val) => val, 48 Err(_) => amf_reader.read_with_type(amf0_markers::NULL)?, 49 }; 50 51 let others = amf_reader.read_all()?; 52 53 return Ok(RtmpMessageData::Amf0Command { 54 command_name, 55 transaction_id, 56 command_object: command_obj, 57 others, 58 }); 59 } 60 61 msg_type_id::AUDIO => { 62 log::trace!( 63 "receive audio msg , msg length is{}\n", 64 self.chunk_info.message_header.msg_length 65 ); 66 67 return Ok(RtmpMessageData::AudioData { 68 data: reader.extract_remaining_bytes(), 69 }); 70 } 71 msg_type_id::VIDEO => { 72 log::trace!( 73 "receive video msg , msg length is{}\n", 74 self.chunk_info.message_header.msg_length 75 ); 76 return Ok(RtmpMessageData::VideoData { 77 data: reader.extract_remaining_bytes(), 78 }); 79 } 80 msg_type_id::USER_CONTROL_EVENT => { 81 log::trace!( 82 "receive user control event msg , msg length is{}\n", 83 self.chunk_info.message_header.msg_length 84 ); 85 let data = EventMessagesReader::new(reader).parse_event()?; 86 return Ok(data); 87 } 88 msg_type_id::SET_CHUNK_SIZE => { 89 let chunk_size = ProtocolControlMessageReader::new(reader).read_set_chunk_size()?; 90 return Ok(RtmpMessageData::SetChunkSize { chunk_size }); 91 } 92 msg_type_id::ABORT => { 93 let chunk_stream_id = 94 ProtocolControlMessageReader::new(reader).read_abort_message()?; 95 return Ok(RtmpMessageData::AbortMessage { chunk_stream_id }); 96 } 97 msg_type_id::ACKNOWLEDGEMENT => { 98 let sequence_number = 99 ProtocolControlMessageReader::new(reader).read_acknowledgement()?; 100 return Ok(RtmpMessageData::Acknowledgement { sequence_number }); 101 } 102 msg_type_id::WIN_ACKNOWLEDGEMENT_SIZE => { 103 let size = 104 ProtocolControlMessageReader::new(reader).read_window_acknowledgement_size()?; 105 return Ok(RtmpMessageData::WindowAcknowledgementSize { size }); 106 } 107 msg_type_id::SET_PEER_BANDWIDTH => { 108 let properties = 109 ProtocolControlMessageReader::new(reader).read_set_peer_bandwidth()?; 110 return Ok(RtmpMessageData::SetPeerBandwidth { properties }); 111 } 112 msg_type_id::DATA_AMF0 | msg_type_id::DATA_AMF3 => { 113 //let values = Amf0Reader::new(reader).read_all()?; 114 return Ok(RtmpMessageData::AmfData { 115 raw_data: reader.extract_remaining_bytes(), 116 }); 117 } 118 119 msg_type_id::SHARED_OBJ_AMF3 | msg_type_id::SHARED_OBJ_AMF0 => {} 120 121 msg_type_id::AGGREGATE => {} 122 123 _ => { 124 log::error!( 125 "the msg_type_id is not supported: {}", 126 self.chunk_info.message_header.msg_type_id 127 ); 128 return Err(MessageError { 129 value: MessageErrorValue::UnknowMessageType, 130 }); 131 } 132 } 133 log::error!( 134 "the msg_type_id is not processed: {}", 135 self.chunk_info.message_header.msg_type_id 136 ); 137 Err(MessageError { 138 value: MessageErrorValue::UnknowMessageType, 139 }) 140 } 141 } 142 143 #[cfg(test)] 144 mod tests { 145 146 use super::MessageParser; 147 use crate::chunk::unpacketizer::ChunkUnpacketizer; 148 use crate::chunk::unpacketizer::UnpackResult; 149 150 #[test] 151 fn test_message_parse() { 152 let mut unpacker = ChunkUnpacketizer::new(); 153 154 let data: [u8; 205] = [ 155 2, 0, 0, 0, 0, 0, 4, 1, 0, 0, 0, 0, 0, 0, 16, 0, //set chunk size 156 //connect 157 3, //|format+csid| 158 0, 0, 0, //timestamp 159 0, 0, 177, //msg_length 160 20, //msg_type_id 0x14 161 0, 0, 0, 0, //msg_stream_id 162 2, 0, 7, 99, 111, 110, 110, 101, 99, 116, 0, 63, 240, 0, 0, 0, 0, 0, 0, //body 163 3, 0, 3, 97, 112, 112, 2, 0, 6, 104, 97, 114, 108, 97, 110, 0, 4, 116, 121, 112, 101, 164 2, 0, 10, 110, 111, 110, 112, 114, 105, 118, 97, 116, 101, 0, 8, 102, 108, 97, 115, 165 104, 86, 101, 114, 2, 0, 31, 70, 77, 76, 69, 47, 51, 46, 48, 32, 40, 99, 111, 109, 112, 166 97, 116, 105, 98, 108, 101, 59, 32, 70, 77, 83, 99, 47, 49, 46, 48, 41, 0, 6, 115, 119, 167 102, 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108, 168 104, 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 5, 116, 99, 169 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108, 104, 170 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 0, 9, 171 ]; 172 173 unpacker.extend_data(&data[..]); 174 175 loop { 176 let result = unpacker.read_chunk(); 177 178 let rv = match result { 179 Ok(val) => val, 180 Err(_) => { 181 print!("end-----------"); 182 return; 183 } 184 }; 185 186 if let UnpackResult::ChunkInfo(chunk_info) = rv { 187 let _ = chunk_info.message_header.msg_streamd_id; 188 let _ = chunk_info.message_header.timestamp; 189 190 let message_parser = MessageParser::new(chunk_info); 191 let _ = message_parser.parse(); 192 } 193 } 194 } 195 } 196