1 use { 2 super::{ 3 define::{msg_type_id, RtmpMessageData}, 4 errors::MessageError, 5 }, 6 crate::{ 7 amf0::{amf0_markers, amf0_reader::Amf0Reader}, 8 chunk::ChunkInfo, 9 protocol_control_messages::reader::ProtocolControlMessageReader, 10 user_control_messages::reader::EventMessagesReader, 11 // utils, 12 }, 13 bytesio::bytes_reader::BytesReader, 14 }; 15 16 pub struct MessageParser { 17 chunk_info: ChunkInfo, 18 } 19 20 impl MessageParser { new(chunk_info: ChunkInfo) -> Self21 pub fn new(chunk_info: ChunkInfo) -> Self { 22 Self { chunk_info } 23 } parse(self) -> Result<Option<RtmpMessageData>, MessageError>24 pub fn parse(self) -> Result<Option<RtmpMessageData>, MessageError> { 25 let mut reader = BytesReader::new(self.chunk_info.payload); 26 27 match self.chunk_info.message_header.msg_type_id { 28 msg_type_id::COMMAND_AMF0 | msg_type_id::COMMAND_AMF3 => { 29 if self.chunk_info.message_header.msg_type_id == msg_type_id::COMMAND_AMF3 { 30 reader.read_u8()?; 31 } 32 let mut amf_reader = Amf0Reader::new(reader); 33 34 let command_name = amf_reader.read_with_type(amf0_markers::STRING)?; 35 let transaction_id = amf_reader.read_with_type(amf0_markers::NUMBER)?; 36 37 // match command_name.clone() { 38 // Amf0ValueType::UTF8String(val) => { 39 // log::info!("command_name:{}", val); 40 // } 41 // _ => {} 42 // } 43 44 //The third value can be an object or NULL object 45 let command_obj_raw = amf_reader.read_with_type(amf0_markers::OBJECT); 46 let command_obj = match command_obj_raw { 47 Ok(val) => val, 48 Err(_) => amf_reader.read_with_type(amf0_markers::NULL)?, 49 }; 50 51 let others = amf_reader.read_all()?; 52 53 return Ok(Some(RtmpMessageData::Amf0Command { 54 command_name, 55 transaction_id, 56 command_object: command_obj, 57 others, 58 })); 59 } 60 61 msg_type_id::AUDIO => { 62 log::trace!( 63 "receive audio msg , msg length is{}\n", 64 self.chunk_info.message_header.msg_length 65 ); 66 67 return Ok(Some(RtmpMessageData::AudioData { 68 data: reader.extract_remaining_bytes(), 69 })); 70 } 71 msg_type_id::VIDEO => { 72 log::trace!( 73 "receive video msg , msg length is{}\n", 74 self.chunk_info.message_header.msg_length 75 ); 76 return Ok(Some(RtmpMessageData::VideoData { 77 data: reader.extract_remaining_bytes(), 78 })); 79 } 80 msg_type_id::USER_CONTROL_EVENT => { 81 log::trace!( 82 "receive user control event msg , msg length is{}\n", 83 self.chunk_info.message_header.msg_length 84 ); 85 let data = EventMessagesReader::new(reader).parse_event()?; 86 return Ok(Some(data)); 87 } 88 msg_type_id::SET_CHUNK_SIZE => { 89 let chunk_size = ProtocolControlMessageReader::new(reader).read_set_chunk_size()?; 90 return Ok(Some(RtmpMessageData::SetChunkSize { chunk_size })); 91 } 92 msg_type_id::ABORT => { 93 let chunk_stream_id = 94 ProtocolControlMessageReader::new(reader).read_abort_message()?; 95 return Ok(Some(RtmpMessageData::AbortMessage { chunk_stream_id })); 96 } 97 msg_type_id::ACKNOWLEDGEMENT => { 98 let sequence_number = 99 ProtocolControlMessageReader::new(reader).read_acknowledgement()?; 100 return Ok(Some(RtmpMessageData::Acknowledgement { sequence_number })); 101 } 102 msg_type_id::WIN_ACKNOWLEDGEMENT_SIZE => { 103 let size = 104 ProtocolControlMessageReader::new(reader).read_window_acknowledgement_size()?; 105 return Ok(Some(RtmpMessageData::WindowAcknowledgementSize { size })); 106 } 107 msg_type_id::SET_PEER_BANDWIDTH => { 108 let properties = 109 ProtocolControlMessageReader::new(reader).read_set_peer_bandwidth()?; 110 return Ok(Some(RtmpMessageData::SetPeerBandwidth { properties })); 111 } 112 msg_type_id::DATA_AMF0 | msg_type_id::DATA_AMF3 => { 113 //let values = Amf0Reader::new(reader).read_all()?; 114 return Ok(Some(RtmpMessageData::AmfData { 115 raw_data: reader.extract_remaining_bytes(), 116 })); 117 } 118 119 msg_type_id::SHARED_OBJ_AMF3 | msg_type_id::SHARED_OBJ_AMF0 => {} 120 121 msg_type_id::AGGREGATE => {} 122 123 _ => {} 124 } 125 log::warn!( 126 "the msg_type_id is not processed: {}", 127 self.chunk_info.message_header.msg_type_id 128 ); 129 Ok(None) 130 } 131 } 132 133 #[cfg(test)] 134 mod tests { 135 136 use super::MessageParser; 137 use crate::chunk::unpacketizer::ChunkUnpacketizer; 138 use crate::chunk::unpacketizer::UnpackResult; 139 140 #[test] test_message_parse()141 fn test_message_parse() { 142 let mut unpacker = ChunkUnpacketizer::new(); 143 144 let data: [u8; 205] = [ 145 2, 0, 0, 0, 0, 0, 4, 1, 0, 0, 0, 0, 0, 0, 16, 0, //set chunk size 146 //connect 147 3, //|format+csid| 148 0, 0, 0, //timestamp 149 0, 0, 177, //msg_length 150 20, //msg_type_id 0x14 151 0, 0, 0, 0, //msg_stream_id 152 2, 0, 7, 99, 111, 110, 110, 101, 99, 116, 0, 63, 240, 0, 0, 0, 0, 0, 0, //body 153 3, 0, 3, 97, 112, 112, 2, 0, 6, 104, 97, 114, 108, 97, 110, 0, 4, 116, 121, 112, 101, 154 2, 0, 10, 110, 111, 110, 112, 114, 105, 118, 97, 116, 101, 0, 8, 102, 108, 97, 115, 155 104, 86, 101, 114, 2, 0, 31, 70, 77, 76, 69, 47, 51, 46, 48, 32, 40, 99, 111, 109, 112, 156 97, 116, 105, 98, 108, 101, 59, 32, 70, 77, 83, 99, 47, 49, 46, 48, 41, 0, 6, 115, 119, 157 102, 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108, 158 104, 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 5, 116, 99, 159 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108, 104, 160 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 0, 9, 161 ]; 162 163 unpacker.extend_data(&data[..]); 164 165 loop { 166 let result = unpacker.read_chunk(); 167 168 let rv = match result { 169 Ok(val) => val, 170 Err(_) => { 171 print!("end-----------"); 172 return; 173 } 174 }; 175 176 if let UnpackResult::ChunkInfo(chunk_info) = rv { 177 let _ = chunk_info.message_header.msg_streamd_id; 178 let _ = chunk_info.message_header.timestamp; 179 180 let message_parser = MessageParser::new(chunk_info); 181 let _ = message_parser.parse(); 182 } 183 } 184 } 185 } 186