1 use { 2 super::errors::ControlMessagesError, crate::messages::define::msg_type_id, 3 byteorder::BigEndian, bytesio::bytes_writer::AsyncBytesWriter, 4 }; 5 6 pub struct ProtocolControlMessagesWriter { 7 writer: AsyncBytesWriter, 8 //amf0_writer: Amf0Writer, 9 } 10 11 impl ProtocolControlMessagesWriter { new(writer: AsyncBytesWriter) -> Self12 pub fn new(writer: AsyncBytesWriter) -> Self { 13 Self { writer } 14 } write_control_message_header( &mut self, msg_type_id: u8, len: u32, ) -> Result<(), ControlMessagesError>15 pub fn write_control_message_header( 16 &mut self, 17 msg_type_id: u8, 18 len: u32, 19 ) -> Result<(), ControlMessagesError> { 20 //0 1 2 3 4 5 6 7 21 //+-+-+-+-+-+-+-+-+ 22 //|fmt| cs id | 23 //+-+-+-+-+-+-+-+-+ 24 // 0x0 0x02 25 self.writer.write_u8(0x02)?; //fmt 0 and csid 2 //0x0 << 6 | 0x02 26 self.writer.write_u24::<BigEndian>(0)?; //timestamp 3 bytes and value 0 27 self.writer.write_u24::<BigEndian>(len)?; //msg length 28 self.writer.write_u8(msg_type_id)?; //msg type id 29 self.writer.write_u32::<BigEndian>(0)?; //msg stream ID 0 30 31 Ok(()) 32 } write_set_chunk_size( &mut self, chunk_size: u32, ) -> Result<(), ControlMessagesError>33 pub async fn write_set_chunk_size( 34 &mut self, 35 chunk_size: u32, 36 ) -> Result<(), ControlMessagesError> { 37 self.write_control_message_header(msg_type_id::SET_CHUNK_SIZE, 4)?; 38 self.writer 39 .write_u32::<BigEndian>(chunk_size & 0x7FFFFFFF)?; //first bit must be 0 40 41 self.writer.flush().await?; 42 Ok(()) 43 } 44 write_abort_message( &mut self, chunk_stream_id: u32, ) -> Result<(), ControlMessagesError>45 pub async fn write_abort_message( 46 &mut self, 47 chunk_stream_id: u32, 48 ) -> Result<(), ControlMessagesError> { 49 self.write_control_message_header(msg_type_id::ABORT, 4)?; 50 self.writer.write_u32::<BigEndian>(chunk_stream_id)?; 51 52 self.writer.flush().await?; 53 Ok(()) 54 } 55 write_acknowledgement( &mut self, sequence_number: u32, ) -> Result<(), ControlMessagesError>56 pub async fn write_acknowledgement( 57 &mut self, 58 sequence_number: u32, 59 ) -> Result<(), ControlMessagesError> { 60 self.write_control_message_header(msg_type_id::ACKNOWLEDGEMENT, 4)?; 61 self.writer.write_u32::<BigEndian>(sequence_number)?; 62 63 self.writer.flush().await?; 64 Ok(()) 65 } 66 write_window_acknowledgement_size( &mut self, window_size: u32, ) -> Result<(), ControlMessagesError>67 pub async fn write_window_acknowledgement_size( 68 &mut self, 69 window_size: u32, 70 ) -> Result<(), ControlMessagesError> { 71 self.write_control_message_header(msg_type_id::WIN_ACKNOWLEDGEMENT_SIZE, 4)?; 72 self.writer.write_u32::<BigEndian>(window_size)?; 73 74 self.writer.flush().await?; 75 Ok(()) 76 } 77 write_set_peer_bandwidth( &mut self, window_size: u32, limit_type: u8, ) -> Result<(), ControlMessagesError>78 pub async fn write_set_peer_bandwidth( 79 &mut self, 80 window_size: u32, 81 limit_type: u8, 82 ) -> Result<(), ControlMessagesError> { 83 self.write_control_message_header(msg_type_id::SET_PEER_BANDWIDTH, 5)?; 84 self.writer.write_u32::<BigEndian>(window_size)?; 85 self.writer.write_u8(limit_type)?; 86 87 self.writer.flush().await?; 88 89 Ok(()) 90 } 91 } 92