1 use super::define::msg_type_id; 2 use super::define::RtmpMessageData; 3 use super::errors::MessageError; 4 use super::errors::MessageErrorValue; 5 use crate::chunk::ChunkInfo; 6 use netio::bytes_reader::BytesReader; 7 8 use crate::amf0::amf0_markers; 9 use crate::amf0::amf0_reader::Amf0Reader; 10 11 use crate::protocol_control_messages::reader::ProtocolControlMessageReader; 12 13 pub struct MessageParser { 14 chunk_info: ChunkInfo, 15 } 16 17 impl MessageParser { 18 pub fn new(chunk_info: ChunkInfo) -> Self { 19 Self { 20 chunk_info: chunk_info, 21 } 22 } 23 pub fn parse(&mut self) -> Result<RtmpMessageData, MessageError> { 24 let mut reader = BytesReader::new(self.chunk_info.payload.clone()); 25 26 match self.chunk_info.message_header.msg_type_id { 27 msg_type_id::COMMAND_AMF0 | msg_type_id::COMMAND_AMF3 => { 28 if self.chunk_info.message_header.msg_type_id == msg_type_id::COMMAND_AMF3 { 29 reader.read_u8()?; 30 } 31 let mut amf_reader = Amf0Reader::new(reader); 32 33 let command_name = amf_reader.read_with_type(amf0_markers::STRING)?; 34 let transaction_id = amf_reader.read_with_type(amf0_markers::NUMBER)?; 35 36 //The third value can be an object or NULL object 37 let command_obj_raw = amf_reader.read_with_type(amf0_markers::OBJECT); 38 let command_obj = match command_obj_raw { 39 Ok(val) => val, 40 Err(_) => amf_reader.read_with_type(amf0_markers::NULL)?, 41 }; 42 43 let others = amf_reader.read_all()?; 44 45 return Ok(RtmpMessageData::Amf0Command { 46 command_name: command_name, 47 transaction_id: transaction_id, 48 command_object: command_obj, 49 others, 50 }); 51 } 52 // msg_types::COMMAND_AMF3 => {} 53 msg_type_id::AUDIO => { 54 return Ok(RtmpMessageData::AudioData { 55 data: self.chunk_info.payload.clone(), 56 }) 57 } 58 msg_type_id::VIDEO => { 59 return Ok(RtmpMessageData::VideoData { 60 data: self.chunk_info.payload.clone(), 61 }) 62 } 63 64 msg_type_id::USER_CONTROL_EVENT => {} 65 66 msg_type_id::SET_CHUNK_SIZE => { 67 let chunk_size = ProtocolControlMessageReader::new(reader).read_set_chunk_size()?; 68 return Ok(RtmpMessageData::SetChunkSize { 69 chunk_size: chunk_size, 70 }); 71 } 72 msg_type_id::ABORT => { 73 let chunk_stream_id = 74 ProtocolControlMessageReader::new(reader).read_abort_message()?; 75 return Ok(RtmpMessageData::AbortMessage { 76 chunk_stream_id: chunk_stream_id, 77 }); 78 } 79 msg_type_id::ACKNOWLEDGEMENT => { 80 let sequence_number = 81 ProtocolControlMessageReader::new(reader).read_acknowledgement()?; 82 return Ok(RtmpMessageData::Acknowledgement { 83 sequence_number: sequence_number, 84 }); 85 } 86 msg_type_id::WIN_ACKNOWLEDGEMENT_SIZE => { 87 let size = 88 ProtocolControlMessageReader::new(reader).read_window_acknowledgement_size()?; 89 return Ok(RtmpMessageData::WindowAcknowledgementSize { size: size }); 90 } 91 msg_type_id::SET_PEER_BANDWIDTH => { 92 let properties = 93 ProtocolControlMessageReader::new(reader).read_set_peer_bandwidth()?; 94 return Ok(RtmpMessageData::SetPeerBandwidth { 95 properties: properties, 96 }); 97 } 98 99 msg_type_id::DATA_AMF0 | msg_type_id::DATA_AMF3 => {} 100 101 msg_type_id::SHARED_OBJ_AMF3 | msg_type_id::SHARED_OBJ_AMF0 => {} 102 103 msg_type_id::AGGREGATE => {} 104 105 _ => { 106 return Err(MessageError { 107 value: MessageErrorValue::UnknowMessageType, 108 }); 109 } 110 } 111 return Err(MessageError { 112 value: MessageErrorValue::UnknowMessageType, 113 }); 114 } 115 } 116