1 use {
2     super::{define, errors::EventMessagesError},
3     crate::messages::define::msg_type_id,
4     byteorder::BigEndian,
5     bytesio::bytes_writer::AsyncBytesWriter,
6 };
7 
8 pub struct EventMessagesWriter {
9     writer: AsyncBytesWriter,
10     // amf0_writer: Amf0Writer,
11 }
12 
13 impl EventMessagesWriter {
new(writer: AsyncBytesWriter) -> Self14     pub fn new(writer: AsyncBytesWriter) -> Self {
15         Self { writer }
16     }
write_control_message_header(&mut self, len: u32) -> Result<(), EventMessagesError>17     fn write_control_message_header(&mut self, len: u32) -> Result<(), EventMessagesError> {
18         //0 1 2 3 4 5 6 7
19         //+-+-+-+-+-+-+-+-+
20         //|fmt|  cs id  |
21         //+-+-+-+-+-+-+-+-+
22         // 0x0     0x02
23 
24         self.writer.write_u8(0x02)?; //fmt 0 and csid 2 //0x0 << 6 | 0x02
25         self.writer.write_u24::<BigEndian>(0)?; //timestamp 3 bytes and value 0
26         self.writer.write_u24::<BigEndian>(len)?; //msg length
27         self.writer.write_u8(msg_type_id::USER_CONTROL_EVENT)?; //msg type id
28         self.writer.write_u32::<BigEndian>(0)?; //msg stream ID 0
29 
30         Ok(())
31     }
32 
write_stream_begin(&mut self, stream_id: u32) -> Result<(), EventMessagesError>33     pub async fn write_stream_begin(&mut self, stream_id: u32) -> Result<(), EventMessagesError> {
34         self.write_control_message_header(6)?;
35         self.writer
36             .write_u16::<BigEndian>(define::RTMP_EVENT_STREAM_BEGIN)?;
37         self.writer.write_u32::<BigEndian>(stream_id)?;
38 
39         self.writer.flush().await?;
40         Ok(())
41     }
42 
write_stream_eof(&mut self, stream_id: u32) -> Result<(), EventMessagesError>43     pub async fn write_stream_eof(&mut self, stream_id: u32) -> Result<(), EventMessagesError> {
44         self.write_control_message_header(6)?;
45         self.writer
46             .write_u16::<BigEndian>(define::RTMP_EVENT_STREAM_EOF)?;
47         self.writer.write_u32::<BigEndian>(stream_id)?;
48 
49         self.writer.flush().await?;
50 
51         Ok(())
52     }
53 
write_stream_dry(&mut self, stream_id: u32) -> Result<(), EventMessagesError>54     pub async fn write_stream_dry(&mut self, stream_id: u32) -> Result<(), EventMessagesError> {
55         self.write_control_message_header(6)?;
56         self.writer
57             .write_u16::<BigEndian>(define::RTMP_EVENT_STREAM_DRY)?;
58         self.writer.write_u32::<BigEndian>(stream_id)?;
59 
60         self.writer.flush().await?;
61 
62         Ok(())
63     }
64     //this function may contain bugs.
write_set_buffer_length( &mut self, stream_id: u32, ms: u32, ) -> Result<(), EventMessagesError>65     pub async fn write_set_buffer_length(
66         &mut self,
67         stream_id: u32,
68         ms: u32,
69     ) -> Result<(), EventMessagesError> {
70         self.write_control_message_header(10)?;
71         self.writer
72             .write_u16::<BigEndian>(define::RTMP_EVENT_SET_BUFFER_LENGTH)?;
73         self.writer.write_u32::<BigEndian>(stream_id)?;
74         self.writer.write_u32::<BigEndian>(ms)?;
75 
76         self.writer.flush().await?;
77 
78         Ok(())
79     }
80 
write_stream_is_record( &mut self, stream_id: u32, ) -> Result<(), EventMessagesError>81     pub async fn write_stream_is_record(
82         &mut self,
83         stream_id: u32,
84     ) -> Result<(), EventMessagesError> {
85         self.write_control_message_header(6)?;
86         self.writer
87             .write_u16::<BigEndian>(define::RTMP_EVENT_STREAM_IS_RECORDED)?;
88         self.writer.write_u32::<BigEndian>(stream_id)?;
89 
90         self.writer.flush().await?;
91 
92         Ok(())
93     }
94 
write_ping_request(&mut self, timestamp: u32) -> Result<(), EventMessagesError>95     pub async fn write_ping_request(&mut self, timestamp: u32) -> Result<(), EventMessagesError> {
96         self.write_control_message_header(6)?;
97         self.writer
98             .write_u16::<BigEndian>(define::RTMP_EVENT_PING)?;
99         self.writer.write_u32::<BigEndian>(timestamp)?;
100 
101         self.writer.flush().await?;
102 
103         Ok(())
104     }
105 
write_ping_response(&mut self, timestamp: u32) -> Result<(), EventMessagesError>106     pub async fn write_ping_response(&mut self, timestamp: u32) -> Result<(), EventMessagesError> {
107         self.write_control_message_header(6)?;
108         self.writer
109             .write_u16::<BigEndian>(define::RTMP_EVENT_PONG)?;
110         self.writer.write_u32::<BigEndian>(timestamp)?;
111 
112         self.writer.flush().await?;
113 
114         Ok(())
115     }
116 }
117