1e2687b83SHarlanC use { 2e2687b83SHarlanC super::{ 3e2687b83SHarlanC define::{msg_type_id, RtmpMessageData}, 4*69de9bbdSHarlanC errors::MessageError, 5e2687b83SHarlanC }, 6e2687b83SHarlanC crate::{ 7f9029ceaSHarlanC amf0::{amf0_markers, amf0_reader::Amf0Reader}, 8e2687b83SHarlanC chunk::ChunkInfo, 9e2687b83SHarlanC protocol_control_messages::reader::ProtocolControlMessageReader, 10e2687b83SHarlanC user_control_messages::reader::EventMessagesReader, 11b1840569SHarlanC // utils, 12e2687b83SHarlanC }, 1388325f54SHarlanC bytesio::bytes_reader::BytesReader, 14e2687b83SHarlanC }; 15ff1f1192SHarlanC 16ff1f1192SHarlanC pub struct MessageParser { 17ff1f1192SHarlanC chunk_info: ChunkInfo, 18ff1f1192SHarlanC } 19ff1f1192SHarlanC 20ff1f1192SHarlanC impl MessageParser { new(chunk_info: ChunkInfo) -> Self21a3d19cccSHarlanC pub fn new(chunk_info: ChunkInfo) -> Self { 220ca99c20SHarlan Self { chunk_info } 23ff1f1192SHarlanC } parse(self) -> Result<Option<RtmpMessageData>, MessageError>24*69de9bbdSHarlanC pub fn parse(self) -> Result<Option<RtmpMessageData>, MessageError> { 25cbcc815dSHarlanC let mut reader = BytesReader::new(self.chunk_info.payload); 26ff1f1192SHarlanC 27ff1f1192SHarlanC match self.chunk_info.message_header.msg_type_id { 28ff1f1192SHarlanC msg_type_id::COMMAND_AMF0 | msg_type_id::COMMAND_AMF3 => { 293312fa17SHarlanC if self.chunk_info.message_header.msg_type_id == msg_type_id::COMMAND_AMF3 { 30ff1f1192SHarlanC reader.read_u8()?; 31ff1f1192SHarlanC } 32ff1f1192SHarlanC let mut amf_reader = Amf0Reader::new(reader); 33ff1f1192SHarlanC 34ff1f1192SHarlanC let command_name = amf_reader.read_with_type(amf0_markers::STRING)?; 35ff1f1192SHarlanC let transaction_id = amf_reader.read_with_type(amf0_markers::NUMBER)?; 36ff1f1192SHarlanC 371d352d98SHarlanC // match command_name.clone() { 381d352d98SHarlanC // Amf0ValueType::UTF8String(val) => { 391d352d98SHarlanC // log::info!("command_name:{}", val); 401d352d98SHarlanC // } 411d352d98SHarlanC // _ => {} 421d352d98SHarlanC // } 431d352d98SHarlanC 44ff1f1192SHarlanC //The third value can be an object or NULL object 45ff1f1192SHarlanC let command_obj_raw = amf_reader.read_with_type(amf0_markers::OBJECT); 46ff1f1192SHarlanC let command_obj = match command_obj_raw { 47ff1f1192SHarlanC Ok(val) => val, 48ff1f1192SHarlanC Err(_) => amf_reader.read_with_type(amf0_markers::NULL)?, 49ff1f1192SHarlanC }; 50ff1f1192SHarlanC 51ff1f1192SHarlanC let others = amf_reader.read_all()?; 52ff1f1192SHarlanC 53*69de9bbdSHarlanC return Ok(Some(RtmpMessageData::Amf0Command { 5485c0af6aSLuca Barbato command_name, 5585c0af6aSLuca Barbato transaction_id, 56ff1f1192SHarlanC command_object: command_obj, 57ff1f1192SHarlanC others, 58*69de9bbdSHarlanC })); 59ff1f1192SHarlanC } 60fe91dfa7SHarlanC 613312fa17SHarlanC msg_type_id::AUDIO => { 625de1eabbSHarlanC log::trace!( 635de1eabbSHarlanC "receive audio msg , msg length is{}\n", 64fe91dfa7SHarlanC self.chunk_info.message_header.msg_length 65fe91dfa7SHarlanC ); 66fe91dfa7SHarlanC 67*69de9bbdSHarlanC return Ok(Some(RtmpMessageData::AudioData { 68cbcc815dSHarlanC data: reader.extract_remaining_bytes(), 69*69de9bbdSHarlanC })); 703312fa17SHarlanC } 713312fa17SHarlanC msg_type_id::VIDEO => { 725de1eabbSHarlanC log::trace!( 735de1eabbSHarlanC "receive video msg , msg length is{}\n", 74fe91dfa7SHarlanC self.chunk_info.message_header.msg_length 75fe91dfa7SHarlanC ); 76*69de9bbdSHarlanC return Ok(Some(RtmpMessageData::VideoData { 77cbcc815dSHarlanC data: reader.extract_remaining_bytes(), 78*69de9bbdSHarlanC })); 793312fa17SHarlanC } 8097f0b5afSHarlanC msg_type_id::USER_CONTROL_EVENT => { 815de1eabbSHarlanC log::trace!( 825de1eabbSHarlanC "receive user control event msg , msg length is{}\n", 83cc18a6e9SHarlanC self.chunk_info.message_header.msg_length 84cc18a6e9SHarlanC ); 8597f0b5afSHarlanC let data = EventMessagesReader::new(reader).parse_event()?; 86*69de9bbdSHarlanC return Ok(Some(data)); 8797f0b5afSHarlanC } 88ff1f1192SHarlanC msg_type_id::SET_CHUNK_SIZE => { 89ff1f1192SHarlanC let chunk_size = ProtocolControlMessageReader::new(reader).read_set_chunk_size()?; 90*69de9bbdSHarlanC return Ok(Some(RtmpMessageData::SetChunkSize { chunk_size })); 91ff1f1192SHarlanC } 92ff1f1192SHarlanC msg_type_id::ABORT => { 93ff1f1192SHarlanC let chunk_stream_id = 94ff1f1192SHarlanC ProtocolControlMessageReader::new(reader).read_abort_message()?; 95*69de9bbdSHarlanC return Ok(Some(RtmpMessageData::AbortMessage { chunk_stream_id })); 96ff1f1192SHarlanC } 97ff1f1192SHarlanC msg_type_id::ACKNOWLEDGEMENT => { 98ff1f1192SHarlanC let sequence_number = 99ff1f1192SHarlanC ProtocolControlMessageReader::new(reader).read_acknowledgement()?; 100*69de9bbdSHarlanC return Ok(Some(RtmpMessageData::Acknowledgement { sequence_number })); 101ff1f1192SHarlanC } 102ff1f1192SHarlanC msg_type_id::WIN_ACKNOWLEDGEMENT_SIZE => { 103ff1f1192SHarlanC let size = 104ff1f1192SHarlanC ProtocolControlMessageReader::new(reader).read_window_acknowledgement_size()?; 105*69de9bbdSHarlanC return Ok(Some(RtmpMessageData::WindowAcknowledgementSize { size })); 106ff1f1192SHarlanC } 107ff1f1192SHarlanC msg_type_id::SET_PEER_BANDWIDTH => { 108ff1f1192SHarlanC let properties = 109ff1f1192SHarlanC ProtocolControlMessageReader::new(reader).read_set_peer_bandwidth()?; 110*69de9bbdSHarlanC return Ok(Some(RtmpMessageData::SetPeerBandwidth { properties })); 111ff1f1192SHarlanC } 112fe91dfa7SHarlanC msg_type_id::DATA_AMF0 | msg_type_id::DATA_AMF3 => { 11361bf3e1bSHarlanC //let values = Amf0Reader::new(reader).read_all()?; 114*69de9bbdSHarlanC return Ok(Some(RtmpMessageData::AmfData { 115cbcc815dSHarlanC raw_data: reader.extract_remaining_bytes(), 116*69de9bbdSHarlanC })); 117fe91dfa7SHarlanC } 118ff1f1192SHarlanC 119ff1f1192SHarlanC msg_type_id::SHARED_OBJ_AMF3 | msg_type_id::SHARED_OBJ_AMF0 => {} 120ff1f1192SHarlanC 121ff1f1192SHarlanC msg_type_id::AGGREGATE => {} 122ff1f1192SHarlanC 123*69de9bbdSHarlanC _ => {} 124ff1f1192SHarlanC } 125*69de9bbdSHarlanC log::warn!( 126c8d4d932SHarlan "the msg_type_id is not processed: {}", 127c8d4d932SHarlan self.chunk_info.message_header.msg_type_id 128c8d4d932SHarlan ); 129*69de9bbdSHarlanC Ok(None) 130ff1f1192SHarlanC } 131ff1f1192SHarlanC } 1324745db95SHarlanC 1334745db95SHarlanC #[cfg(test)] 1344745db95SHarlanC mod tests { 1354745db95SHarlanC 1364745db95SHarlanC use super::MessageParser; 1374745db95SHarlanC use crate::chunk::unpacketizer::ChunkUnpacketizer; 1384745db95SHarlanC use crate::chunk::unpacketizer::UnpackResult; 1394745db95SHarlanC 1404745db95SHarlanC #[test] test_message_parse()1414745db95SHarlanC fn test_message_parse() { 1424745db95SHarlanC let mut unpacker = ChunkUnpacketizer::new(); 1434745db95SHarlanC 1444745db95SHarlanC let data: [u8; 205] = [ 1454745db95SHarlanC 2, 0, 0, 0, 0, 0, 4, 1, 0, 0, 0, 0, 0, 0, 16, 0, //set chunk size 1464745db95SHarlanC //connect 1474745db95SHarlanC 3, //|format+csid| 1484745db95SHarlanC 0, 0, 0, //timestamp 1494745db95SHarlanC 0, 0, 177, //msg_length 1504745db95SHarlanC 20, //msg_type_id 0x14 1514745db95SHarlanC 0, 0, 0, 0, //msg_stream_id 1524745db95SHarlanC 2, 0, 7, 99, 111, 110, 110, 101, 99, 116, 0, 63, 240, 0, 0, 0, 0, 0, 0, //body 1534745db95SHarlanC 3, 0, 3, 97, 112, 112, 2, 0, 6, 104, 97, 114, 108, 97, 110, 0, 4, 116, 121, 112, 101, 1544745db95SHarlanC 2, 0, 10, 110, 111, 110, 112, 114, 105, 118, 97, 116, 101, 0, 8, 102, 108, 97, 115, 1554745db95SHarlanC 104, 86, 101, 114, 2, 0, 31, 70, 77, 76, 69, 47, 51, 46, 48, 32, 40, 99, 111, 109, 112, 1564745db95SHarlanC 97, 116, 105, 98, 108, 101, 59, 32, 70, 77, 83, 99, 47, 49, 46, 48, 41, 0, 6, 115, 119, 1574745db95SHarlanC 102, 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108, 1584745db95SHarlanC 104, 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 5, 116, 99, 1594745db95SHarlanC 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108, 104, 1604745db95SHarlanC 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 0, 9, 1614745db95SHarlanC ]; 1624745db95SHarlanC 1634745db95SHarlanC unpacker.extend_data(&data[..]); 1644745db95SHarlanC 1654745db95SHarlanC loop { 1664745db95SHarlanC let result = unpacker.read_chunk(); 1674745db95SHarlanC 1684745db95SHarlanC let rv = match result { 1694745db95SHarlanC Ok(val) => val, 1704745db95SHarlanC Err(_) => { 1714745db95SHarlanC print!("end-----------"); 1724745db95SHarlanC return; 1734745db95SHarlanC } 1744745db95SHarlanC }; 1754745db95SHarlanC 1760ca99c20SHarlan if let UnpackResult::ChunkInfo(chunk_info) = rv { 177f9029ceaSHarlanC let _ = chunk_info.message_header.msg_streamd_id; 178f9029ceaSHarlanC let _ = chunk_info.message_header.timestamp; 1794745db95SHarlanC 18085c0af6aSLuca Barbato let message_parser = MessageParser::new(chunk_info); 181f9029ceaSHarlanC let _ = message_parser.parse(); 1824745db95SHarlanC } 1834745db95SHarlanC } 1844745db95SHarlanC } 1854745db95SHarlanC } 186