1 use { 2 super::{define, errors::EventMessagesError}, 3 crate::messages::define::msg_type_id, 4 byteorder::BigEndian, 5 bytesio::bytes_writer::AsyncBytesWriter, 6 }; 7 8 pub struct EventMessagesWriter { 9 writer: AsyncBytesWriter, 10 // amf0_writer: Amf0Writer, 11 } 12 13 impl EventMessagesWriter { new(writer: AsyncBytesWriter) -> Self14 pub fn new(writer: AsyncBytesWriter) -> Self { 15 Self { writer } 16 } write_control_message_header(&mut self, len: u32) -> Result<(), EventMessagesError>17 fn write_control_message_header(&mut self, len: u32) -> Result<(), EventMessagesError> { 18 //0 1 2 3 4 5 6 7 19 //+-+-+-+-+-+-+-+-+ 20 //|fmt| cs id | 21 //+-+-+-+-+-+-+-+-+ 22 // 0x0 0x02 23 24 self.writer.write_u8(0x02)?; //fmt 0 and csid 2 //0x0 << 6 | 0x02 25 self.writer.write_u24::<BigEndian>(0)?; //timestamp 3 bytes and value 0 26 self.writer.write_u24::<BigEndian>(len)?; //msg length 27 self.writer.write_u8(msg_type_id::USER_CONTROL_EVENT)?; //msg type id 28 self.writer.write_u32::<BigEndian>(0)?; //msg stream ID 0 29 30 Ok(()) 31 } 32 write_stream_begin(&mut self, stream_id: u32) -> Result<(), EventMessagesError>33 pub async fn write_stream_begin(&mut self, stream_id: u32) -> Result<(), EventMessagesError> { 34 self.write_control_message_header(6)?; 35 self.writer 36 .write_u16::<BigEndian>(define::RTMP_EVENT_STREAM_BEGIN)?; 37 self.writer.write_u32::<BigEndian>(stream_id)?; 38 39 self.writer.flush().await?; 40 Ok(()) 41 } 42 write_stream_eof(&mut self, stream_id: u32) -> Result<(), EventMessagesError>43 pub async fn write_stream_eof(&mut self, stream_id: u32) -> Result<(), EventMessagesError> { 44 self.write_control_message_header(6)?; 45 self.writer 46 .write_u16::<BigEndian>(define::RTMP_EVENT_STREAM_EOF)?; 47 self.writer.write_u32::<BigEndian>(stream_id)?; 48 49 self.writer.flush().await?; 50 51 Ok(()) 52 } 53 write_stream_dry(&mut self, stream_id: u32) -> Result<(), EventMessagesError>54 pub async fn write_stream_dry(&mut self, stream_id: u32) -> Result<(), EventMessagesError> { 55 self.write_control_message_header(6)?; 56 self.writer 57 .write_u16::<BigEndian>(define::RTMP_EVENT_STREAM_DRY)?; 58 self.writer.write_u32::<BigEndian>(stream_id)?; 59 60 self.writer.flush().await?; 61 62 Ok(()) 63 } 64 //this function may contain bugs. write_set_buffer_length( &mut self, stream_id: u32, ms: u32, ) -> Result<(), EventMessagesError>65 pub async fn write_set_buffer_length( 66 &mut self, 67 stream_id: u32, 68 ms: u32, 69 ) -> Result<(), EventMessagesError> { 70 self.write_control_message_header(10)?; 71 self.writer 72 .write_u16::<BigEndian>(define::RTMP_EVENT_SET_BUFFER_LENGTH)?; 73 self.writer.write_u32::<BigEndian>(stream_id)?; 74 self.writer.write_u32::<BigEndian>(ms)?; 75 76 self.writer.flush().await?; 77 78 Ok(()) 79 } 80 write_stream_is_record( &mut self, stream_id: u32, ) -> Result<(), EventMessagesError>81 pub async fn write_stream_is_record( 82 &mut self, 83 stream_id: u32, 84 ) -> Result<(), EventMessagesError> { 85 self.write_control_message_header(6)?; 86 self.writer 87 .write_u16::<BigEndian>(define::RTMP_EVENT_STREAM_IS_RECORDED)?; 88 self.writer.write_u32::<BigEndian>(stream_id)?; 89 90 self.writer.flush().await?; 91 92 Ok(()) 93 } 94 write_ping_request(&mut self, timestamp: u32) -> Result<(), EventMessagesError>95 pub async fn write_ping_request(&mut self, timestamp: u32) -> Result<(), EventMessagesError> { 96 self.write_control_message_header(6)?; 97 self.writer 98 .write_u16::<BigEndian>(define::RTMP_EVENT_PING)?; 99 self.writer.write_u32::<BigEndian>(timestamp)?; 100 101 self.writer.flush().await?; 102 103 Ok(()) 104 } 105 write_ping_response(&mut self, timestamp: u32) -> Result<(), EventMessagesError>106 pub async fn write_ping_response(&mut self, timestamp: u32) -> Result<(), EventMessagesError> { 107 self.write_control_message_header(6)?; 108 self.writer 109 .write_u16::<BigEndian>(define::RTMP_EVENT_PONG)?; 110 self.writer.write_u32::<BigEndian>(timestamp)?; 111 112 self.writer.flush().await?; 113 114 Ok(()) 115 } 116 } 117