xref: /xiu/protocol/rtmp/src/netstream/writer.rs (revision b36cf5da)
1 use {
2     super::errors::NetStreamError,
3     crate::{
4         amf0::{amf0_writer::Amf0Writer, define::Amf0ValueType},
5         chunk::{define as chunk_define, packetizer::ChunkPacketizer, ChunkInfo},
6         messages::define as messages_define,
7     },
8     bytesio::bytesio::TNetIO,
9     indexmap::IndexMap,
10     std::sync::Arc,
11     tokio::sync::Mutex,
12 };
13 
14 pub struct NetStreamWriter {
15     amf0_writer: Amf0Writer,
16     packetizer: ChunkPacketizer,
17 }
18 
19 impl NetStreamWriter {
new(io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>) -> Self20     pub fn new(io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>) -> Self {
21         Self {
22             amf0_writer: Amf0Writer::new(),
23             packetizer: ChunkPacketizer::new(io),
24         }
25     }
write_chunk(&mut self, msg_stream_id: u32) -> Result<(), NetStreamError>26     async fn write_chunk(&mut self, msg_stream_id: u32) -> Result<(), NetStreamError> {
27         let data = self.amf0_writer.extract_current_bytes();
28 
29         let mut chunk_info = ChunkInfo::new(
30             chunk_define::csid_type::COMMAND_AMF0_AMF3,
31             chunk_define::chunk_type::TYPE_0,
32             0,
33             data.len() as u32,
34             messages_define::msg_type_id::COMMAND_AMF0,
35             msg_stream_id,
36             data,
37         );
38 
39         self.packetizer.write_chunk(&mut chunk_info).await?;
40         Ok(())
41     }
write_play( &mut self, transaction_id: &f64, stream_name: &String, start: &f64, duration: &f64, reset: &bool, ) -> Result<(), NetStreamError>42     pub async fn write_play(
43         &mut self,
44         transaction_id: &f64,
45         stream_name: &String,
46         start: &f64,
47         duration: &f64,
48         reset: &bool,
49     ) -> Result<(), NetStreamError> {
50         self.amf0_writer.write_string(&String::from("play"))?;
51         self.amf0_writer.write_number(transaction_id)?;
52         self.amf0_writer.write_null()?;
53         self.amf0_writer.write_string(stream_name)?;
54         self.amf0_writer.write_number(start)?;
55         self.amf0_writer.write_number(duration)?;
56         self.amf0_writer.write_bool(reset)?;
57 
58         self.write_chunk(0).await
59     }
write_delete_stream( &mut self, transaction_id: &f64, stream_id: &f64, ) -> Result<(), NetStreamError>60     pub async fn write_delete_stream(
61         &mut self,
62         transaction_id: &f64,
63         stream_id: &f64,
64     ) -> Result<(), NetStreamError> {
65         self.amf0_writer
66             .write_string(&String::from("deleteStream"))?;
67         self.amf0_writer.write_number(transaction_id)?;
68         self.amf0_writer.write_null()?;
69         self.amf0_writer.write_number(stream_id)?;
70 
71         self.write_chunk(0).await
72     }
73 
write_close_stream( &mut self, transaction_id: &f64, stream_id: &f64, ) -> Result<(), NetStreamError>74     pub async fn write_close_stream(
75         &mut self,
76         transaction_id: &f64,
77         stream_id: &f64,
78     ) -> Result<(), NetStreamError> {
79         self.amf0_writer
80             .write_string(&String::from("closeStream"))?;
81         self.amf0_writer.write_number(transaction_id)?;
82         self.amf0_writer.write_null()?;
83         self.amf0_writer.write_number(stream_id)?;
84 
85         self.write_chunk(0).await
86     }
87 
write_release_stream( &mut self, transaction_id: &f64, stream_name: &String, ) -> Result<(), NetStreamError>88     pub async fn write_release_stream(
89         &mut self,
90         transaction_id: &f64,
91         stream_name: &String,
92     ) -> Result<(), NetStreamError> {
93         self.amf0_writer
94             .write_string(&String::from("releaseStream"))?;
95         self.amf0_writer.write_number(transaction_id)?;
96         self.amf0_writer.write_null()?;
97         self.amf0_writer.write_string(stream_name)?;
98 
99         self.write_chunk(0).await
100     }
101 
write_fcpublish( &mut self, transaction_id: &f64, stream_name: &String, ) -> Result<(), NetStreamError>102     pub async fn write_fcpublish(
103         &mut self,
104         transaction_id: &f64,
105         stream_name: &String,
106     ) -> Result<(), NetStreamError> {
107         self.amf0_writer.write_string(&String::from("FCPublish"))?;
108         self.amf0_writer.write_number(transaction_id)?;
109         self.amf0_writer.write_null()?;
110         self.amf0_writer.write_string(stream_name)?;
111 
112         self.write_chunk(0).await
113     }
114 
115     #[allow(dead_code)]
write_receive_audio( &mut self, transaction_id: &f64, enable: &bool, ) -> Result<(), NetStreamError>116     async fn write_receive_audio(
117         &mut self,
118         transaction_id: &f64,
119         enable: &bool,
120     ) -> Result<(), NetStreamError> {
121         self.amf0_writer
122             .write_string(&String::from("receiveAudio"))?;
123         self.amf0_writer.write_number(transaction_id)?;
124         self.amf0_writer.write_null()?;
125         self.amf0_writer.write_bool(enable)?;
126 
127         self.write_chunk(0).await
128     }
129     #[allow(dead_code)]
write_receive_video( &mut self, transaction_id: &f64, enable: &bool, ) -> Result<(), NetStreamError>130     async fn write_receive_video(
131         &mut self,
132         transaction_id: &f64,
133         enable: &bool,
134     ) -> Result<(), NetStreamError> {
135         self.amf0_writer
136             .write_string(&String::from("receiveVideo"))?;
137         self.amf0_writer.write_number(transaction_id)?;
138         self.amf0_writer.write_null()?;
139         self.amf0_writer.write_bool(enable)?;
140 
141         self.write_chunk(0).await
142     }
write_publish( &mut self, transaction_id: &f64, stream_name: &String, stream_type: &String, ) -> Result<(), NetStreamError>143     pub async fn write_publish(
144         &mut self,
145         transaction_id: &f64,
146         stream_name: &String,
147         stream_type: &String,
148     ) -> Result<(), NetStreamError> {
149         self.amf0_writer.write_string(&String::from("publish"))?;
150         self.amf0_writer.write_number(transaction_id)?;
151         self.amf0_writer.write_null()?;
152         self.amf0_writer.write_string(stream_name)?;
153         self.amf0_writer.write_string(stream_type)?;
154 
155         self.write_chunk(0).await
156     }
157     #[allow(dead_code)]
write_seek(&mut self, transaction_id: &f64, ms: &f64) -> Result<(), NetStreamError>158     async fn write_seek(&mut self, transaction_id: &f64, ms: &f64) -> Result<(), NetStreamError> {
159         self.amf0_writer.write_string(&String::from("seek"))?;
160         self.amf0_writer.write_number(transaction_id)?;
161         self.amf0_writer.write_null()?;
162         self.amf0_writer.write_number(ms)?;
163 
164         self.write_chunk(0).await
165     }
166     #[allow(dead_code)]
write_pause( &mut self, transaction_id: &f64, pause: &bool, ms: &f64, ) -> Result<(), NetStreamError>167     async fn write_pause(
168         &mut self,
169         transaction_id: &f64,
170         pause: &bool,
171         ms: &f64,
172     ) -> Result<(), NetStreamError> {
173         self.amf0_writer.write_string(&String::from("pause"))?;
174         self.amf0_writer.write_number(transaction_id)?;
175         self.amf0_writer.write_null()?;
176         self.amf0_writer.write_bool(pause)?;
177         self.amf0_writer.write_number(ms)?;
178 
179         self.write_chunk(0).await
180     }
181 
182     #[allow(dead_code)]
write_on_bw_done( &mut self, transaction_id: &f64, bandwidth: &f64, ) -> Result<(), NetStreamError>183     async fn write_on_bw_done(
184         &mut self,
185         transaction_id: &f64,
186         bandwidth: &f64,
187     ) -> Result<(), NetStreamError> {
188         self.amf0_writer.write_string(&String::from("onBWDone"))?;
189         self.amf0_writer.write_number(transaction_id)?;
190         self.amf0_writer.write_null()?;
191         self.amf0_writer.write_number(bandwidth)?;
192 
193         self.write_chunk(0).await
194     }
195 
write_on_status( &mut self, transaction_id: &f64, level: &str, code: &str, description: &str, ) -> Result<(), NetStreamError>196     pub async fn write_on_status(
197         &mut self,
198         transaction_id: &f64,
199         level: &str,
200         code: &str,
201         description: &str,
202     ) -> Result<(), NetStreamError> {
203         self.amf0_writer.write_string(&String::from("onStatus"))?;
204         self.amf0_writer.write_number(transaction_id)?;
205         self.amf0_writer.write_null()?;
206 
207         let mut properties_map = IndexMap::new();
208 
209         properties_map.insert(
210             String::from("level"),
211             Amf0ValueType::UTF8String(level.to_owned()),
212         );
213         properties_map.insert(
214             String::from("code"),
215             Amf0ValueType::UTF8String(code.to_owned()),
216         );
217         properties_map.insert(
218             String::from("description"),
219             Amf0ValueType::UTF8String(description.to_owned()),
220         );
221 
222         self.amf0_writer.write_object(&properties_map)?;
223 
224         self.write_chunk(1).await
225     }
226 }
227