1 use { 2 super::{ 3 define::{msg_type_id, RtmpMessageData}, 4 errors::{MessageError, MessageErrorValue}, 5 }, 6 crate::{ 7 amf0::{amf0_markers, amf0_reader::Amf0Reader}, 8 chunk::ChunkInfo, 9 protocol_control_messages::reader::ProtocolControlMessageReader, 10 user_control_messages::reader::EventMessagesReader, 11 // utils, 12 }, 13 bytesio::bytes_reader::BytesReader, 14 }; 15 16 pub struct MessageParser { 17 chunk_info: ChunkInfo, 18 } 19 20 impl MessageParser { 21 pub fn new(chunk_info: ChunkInfo) -> Self { 22 Self { 23 chunk_info: chunk_info, 24 } 25 } 26 pub fn parse(&mut self) -> Result<RtmpMessageData, MessageError> { 27 let mut reader = BytesReader::new(self.chunk_info.payload.clone()); 28 29 match self.chunk_info.message_header.msg_type_id { 30 msg_type_id::COMMAND_AMF0 | msg_type_id::COMMAND_AMF3 => { 31 if self.chunk_info.message_header.msg_type_id == msg_type_id::COMMAND_AMF3 { 32 reader.read_u8()?; 33 } 34 let mut amf_reader = Amf0Reader::new(reader); 35 36 let command_name = amf_reader.read_with_type(amf0_markers::STRING)?; 37 let transaction_id = amf_reader.read_with_type(amf0_markers::NUMBER)?; 38 39 //The third value can be an object or NULL object 40 let command_obj_raw = amf_reader.read_with_type(amf0_markers::OBJECT); 41 let command_obj = match command_obj_raw { 42 Ok(val) => val, 43 Err(_) => amf_reader.read_with_type(amf0_markers::NULL)?, 44 }; 45 46 let others = amf_reader.read_all()?; 47 48 return Ok(RtmpMessageData::Amf0Command { 49 command_name: command_name, 50 transaction_id: transaction_id, 51 command_object: command_obj, 52 others, 53 }); 54 } 55 56 msg_type_id::AUDIO => { 57 log::trace!( 58 "receive audio msg , msg length is{}\n", 59 self.chunk_info.message_header.msg_length 60 ); 61 62 return Ok(RtmpMessageData::AudioData { 63 data: self.chunk_info.payload.clone(), 64 }); 65 } 66 msg_type_id::VIDEO => { 67 log::trace!( 68 "receive video msg , msg length is{}\n", 69 self.chunk_info.message_header.msg_length 70 ); 71 return Ok(RtmpMessageData::VideoData { 72 data: self.chunk_info.payload.clone(), 73 }); 74 } 75 msg_type_id::USER_CONTROL_EVENT => { 76 log::trace!( 77 "receive user control event msg , msg length is{}\n", 78 self.chunk_info.message_header.msg_length 79 ); 80 let data = EventMessagesReader::new(reader).parse_event()?; 81 return Ok(data); 82 } 83 msg_type_id::SET_CHUNK_SIZE => { 84 let chunk_size = ProtocolControlMessageReader::new(reader).read_set_chunk_size()?; 85 return Ok(RtmpMessageData::SetChunkSize { 86 chunk_size: chunk_size, 87 }); 88 } 89 msg_type_id::ABORT => { 90 let chunk_stream_id = 91 ProtocolControlMessageReader::new(reader).read_abort_message()?; 92 return Ok(RtmpMessageData::AbortMessage { 93 chunk_stream_id: chunk_stream_id, 94 }); 95 } 96 msg_type_id::ACKNOWLEDGEMENT => { 97 let sequence_number = 98 ProtocolControlMessageReader::new(reader).read_acknowledgement()?; 99 return Ok(RtmpMessageData::Acknowledgement { 100 sequence_number: sequence_number, 101 }); 102 } 103 msg_type_id::WIN_ACKNOWLEDGEMENT_SIZE => { 104 let size = 105 ProtocolControlMessageReader::new(reader).read_window_acknowledgement_size()?; 106 return Ok(RtmpMessageData::WindowAcknowledgementSize { size: size }); 107 } 108 msg_type_id::SET_PEER_BANDWIDTH => { 109 let properties = 110 ProtocolControlMessageReader::new(reader).read_set_peer_bandwidth()?; 111 return Ok(RtmpMessageData::SetPeerBandwidth { 112 properties: properties, 113 }); 114 } 115 msg_type_id::DATA_AMF0 | msg_type_id::DATA_AMF3 => { 116 //let values = Amf0Reader::new(reader).read_all()?; 117 return Ok(RtmpMessageData::AmfData { 118 raw_data: self.chunk_info.payload.clone(), 119 }); 120 } 121 122 msg_type_id::SHARED_OBJ_AMF3 | msg_type_id::SHARED_OBJ_AMF0 => {} 123 124 msg_type_id::AGGREGATE => {} 125 126 _ => { 127 return Err(MessageError { 128 value: MessageErrorValue::UnknowMessageType, 129 }); 130 } 131 } 132 return Err(MessageError { 133 value: MessageErrorValue::UnknowMessageType, 134 }); 135 } 136 } 137 138 #[cfg(test)] 139 mod tests { 140 141 use super::MessageParser; 142 use crate::chunk::unpacketizer::ChunkUnpacketizer; 143 use crate::chunk::unpacketizer::UnpackResult; 144 145 #[test] 146 fn test_message_parse() { 147 let mut unpacker = ChunkUnpacketizer::new(); 148 149 let data: [u8; 205] = [ 150 2, 0, 0, 0, 0, 0, 4, 1, 0, 0, 0, 0, 0, 0, 16, 0, //set chunk size 151 //connect 152 3, //|format+csid| 153 0, 0, 0, //timestamp 154 0, 0, 177, //msg_length 155 20, //msg_type_id 0x14 156 0, 0, 0, 0, //msg_stream_id 157 2, 0, 7, 99, 111, 110, 110, 101, 99, 116, 0, 63, 240, 0, 0, 0, 0, 0, 0, //body 158 3, 0, 3, 97, 112, 112, 2, 0, 6, 104, 97, 114, 108, 97, 110, 0, 4, 116, 121, 112, 101, 159 2, 0, 10, 110, 111, 110, 112, 114, 105, 118, 97, 116, 101, 0, 8, 102, 108, 97, 115, 160 104, 86, 101, 114, 2, 0, 31, 70, 77, 76, 69, 47, 51, 46, 48, 32, 40, 99, 111, 109, 112, 161 97, 116, 105, 98, 108, 101, 59, 32, 70, 77, 83, 99, 47, 49, 46, 48, 41, 0, 6, 115, 119, 162 102, 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108, 163 104, 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 5, 116, 99, 164 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108, 104, 165 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 0, 9, 166 ]; 167 168 unpacker.extend_data(&data[..]); 169 170 loop { 171 let result = unpacker.read_chunk(); 172 173 let rv = match result { 174 Ok(val) => val, 175 Err(_) => { 176 print!("end-----------"); 177 return; 178 } 179 }; 180 181 match rv { 182 UnpackResult::ChunkInfo(chunk_info) => { 183 let _ = chunk_info.message_header.msg_streamd_id; 184 let _ = chunk_info.message_header.timestamp; 185 186 let mut message_parser = MessageParser::new(chunk_info); 187 let _ = message_parser.parse(); 188 } 189 _ => {} 190 } 191 } 192 } 193 194 use uuid::Uuid; 195 196 #[test] 197 fn test_uuid() { 198 let my_uuid = Uuid::new_v4(); 199 println!("{}", my_uuid); 200 } 201 } 202