xref: /xiu/protocol/rtmp/src/netconnection/writer.rs (revision b36cf5da)
1 use {
2     super::errors::NetConnectionError,
3     crate::{
4         amf0::{amf0_writer::Amf0Writer, define::Amf0ValueType},
5         chunk::{define as chunk_define, packetizer::ChunkPacketizer, ChunkInfo},
6         messages::define as messages_define,
7     },
8     bytesio::bytesio::TNetIO,
9     indexmap::IndexMap,
10     std::sync::Arc,
11     tokio::sync::Mutex,
12 };
13 #[derive(Clone, Default, Debug)]
14 pub struct ConnectProperties {
15     pub app: Option<String>,         // Server application name, e.g.: testapp
16     pub flash_ver: Option<String>,   // Flash Player version, FMSc/1.0
17     pub swf_url: Option<String>,     // URL of the source SWF file file://C:/FlvPlayer.swf
18     pub tc_url: Option<String>,      // URL of the Server, rtmp://host:1935/testapp/instance1
19     pub fpad: Option<bool>,          // True if proxy is being used.
20     pub capabilities: Option<f64>,   // double default: 15
21     pub audio_codecs: Option<f64>,   // double default: 4071
22     pub video_codecs: Option<f64>,   // double default: 252
23     pub video_function: Option<f64>, // double default: 1
24     pub object_encoding: Option<f64>,
25     pub page_url: Option<String>, // http://host/sample.html
26     pub pub_type: Option<String>,
27 }
28 
29 impl ConnectProperties {
new(app_name: String) -> Self30     pub fn new(app_name: String) -> Self {
31         Self {
32             app: Some(app_name),
33             flash_ver: Some("LNX 9,0,124,2".to_string()),
34             swf_url: Some("".to_string()),
35             tc_url: Some("".to_string()),
36             fpad: Some(false),
37             capabilities: Some(15_f64),
38             audio_codecs: Some(4071_f64),
39             video_codecs: Some(252_f64),
40             video_function: Some(1_f64),
41             object_encoding: Some(0_f64),
42             page_url: Some("".to_string()),
43             pub_type: Some("nonprivate".to_string()),
44         }
45     }
new_none() -> Self46     pub fn new_none() -> Self {
47         Self {
48             app: None,
49             flash_ver: None,
50             swf_url: None,
51             tc_url: None,
52             fpad: None,
53             capabilities: None,
54             audio_codecs: None,
55             video_codecs: None,
56             video_function: None,
57             object_encoding: None,
58             page_url: None,
59             pub_type: None,
60         }
61     }
62 }
63 
64 pub struct NetConnection {
65     amf0_writer: Amf0Writer,
66     packetizer: ChunkPacketizer,
67 }
68 
69 impl NetConnection {
new(io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>) -> Self70     pub fn new(io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>) -> Self {
71         Self {
72             amf0_writer: Amf0Writer::new(),
73             packetizer: ChunkPacketizer::new(io),
74         }
75     }
76 
write_chunk(&mut self) -> Result<(), NetConnectionError>77     async fn write_chunk(&mut self) -> Result<(), NetConnectionError> {
78         let data = self.amf0_writer.extract_current_bytes();
79         let mut chunk_info = ChunkInfo::new(
80             chunk_define::csid_type::COMMAND_AMF0_AMF3,
81             chunk_define::chunk_type::TYPE_0,
82             0,
83             data.len() as u32,
84             messages_define::msg_type_id::COMMAND_AMF0,
85             0,
86             data,
87         );
88 
89         self.packetizer.write_chunk(&mut chunk_info).await?;
90         Ok(())
91     }
92 
write_connect_with_value( &mut self, transaction_id: &f64, properties: IndexMap<String, Amf0ValueType>, ) -> Result<(), NetConnectionError>93     pub async fn write_connect_with_value(
94         &mut self,
95         transaction_id: &f64,
96         properties: IndexMap<String, Amf0ValueType>,
97     ) -> Result<(), NetConnectionError> {
98         self.amf0_writer.write_string(&String::from("connect"))?;
99         self.amf0_writer.write_number(transaction_id)?;
100 
101         self.amf0_writer.write_object(&properties)?;
102 
103         self.write_chunk().await
104     }
write_connect( &mut self, transaction_id: &f64, properties: &ConnectProperties, ) -> Result<(), NetConnectionError>105     pub async fn write_connect(
106         &mut self,
107         transaction_id: &f64,
108         properties: &ConnectProperties,
109     ) -> Result<(), NetConnectionError> {
110         self.amf0_writer.write_string(&String::from("connect"))?;
111         self.amf0_writer.write_number(transaction_id)?;
112 
113         let mut properties_map = IndexMap::new();
114 
115         if let Some(app) = properties.app.clone() {
116             properties_map.insert(String::from("app"), Amf0ValueType::UTF8String(app));
117         }
118 
119         if let Some(pub_type) = properties.pub_type.clone() {
120             properties_map.insert(String::from("type"), Amf0ValueType::UTF8String(pub_type));
121         }
122 
123         if let Some(flash_ver) = properties.flash_ver.clone() {
124             properties_map.insert(
125                 String::from("flashVer"),
126                 Amf0ValueType::UTF8String(flash_ver),
127             );
128         }
129 
130         if let Some(tc_url) = properties.tc_url.clone() {
131             properties_map.insert(String::from("tcUrl"), Amf0ValueType::UTF8String(tc_url));
132         }
133 
134         if let Some(swf_url) = properties.swf_url.clone() {
135             properties_map.insert(String::from("swfUrl"), Amf0ValueType::UTF8String(swf_url));
136         }
137 
138         if let Some(page_url) = properties.page_url.clone() {
139             properties_map.insert(String::from("pageUrl"), Amf0ValueType::UTF8String(page_url));
140         }
141 
142         if let Some(fpad) = properties.fpad {
143             properties_map.insert(String::from("fpad"), Amf0ValueType::Boolean(fpad));
144         }
145 
146         if let Some(capabilities) = properties.capabilities {
147             properties_map.insert(
148                 String::from("capabilities"),
149                 Amf0ValueType::Number(capabilities),
150             );
151         }
152 
153         if let Some(audio_codecs) = properties.audio_codecs {
154             properties_map.insert(
155                 String::from("audioCodecs"),
156                 Amf0ValueType::Number(audio_codecs),
157             );
158         }
159 
160         if let Some(video_codecs) = properties.video_codecs {
161             properties_map.insert(
162                 String::from("videoCodecs"),
163                 Amf0ValueType::Number(video_codecs),
164             );
165         }
166 
167         if let Some(video_function) = properties.video_function {
168             properties_map.insert(
169                 String::from("videoFunction"),
170                 Amf0ValueType::Number(video_function),
171             );
172         }
173 
174         if let Some(object_encoding) = properties.object_encoding {
175             properties_map.insert(
176                 String::from("objectEncoding"),
177                 Amf0ValueType::Number(object_encoding),
178             );
179         }
180         self.amf0_writer.write_object(&properties_map)?;
181 
182         self.write_chunk().await
183     }
184     #[allow(clippy::too_many_arguments)]
write_connect_response( &mut self, transaction_id: &f64, fmsver: &str, capabilities: &f64, code: &str, level: &str, description: &str, encoding: &f64, ) -> Result<(), NetConnectionError>185     pub async fn write_connect_response(
186         &mut self,
187         transaction_id: &f64,
188         fmsver: &str,
189         capabilities: &f64,
190         code: &str,
191         level: &str,
192         description: &str,
193         encoding: &f64,
194     ) -> Result<(), NetConnectionError> {
195         self.amf0_writer.write_string(&String::from("_result"))?;
196         self.amf0_writer.write_number(transaction_id)?;
197 
198         let mut properties_map_a = IndexMap::new();
199 
200         properties_map_a.insert(
201             String::from("fmsVer"),
202             Amf0ValueType::UTF8String(fmsver.to_owned()),
203         );
204         properties_map_a.insert(
205             String::from("capabilities"),
206             Amf0ValueType::Number(*capabilities),
207         );
208 
209         self.amf0_writer.write_object(&properties_map_a)?;
210 
211         let mut properties_map_b = IndexMap::new();
212 
213         properties_map_b.insert(
214             String::from("level"),
215             Amf0ValueType::UTF8String(level.to_owned()),
216         );
217         properties_map_b.insert(
218             String::from("code"),
219             Amf0ValueType::UTF8String(code.to_owned()),
220         );
221         properties_map_b.insert(
222             String::from("description"),
223             Amf0ValueType::UTF8String(description.to_owned()),
224         );
225         properties_map_b.insert(
226             String::from("objectEncoding"),
227             Amf0ValueType::Number(*encoding),
228         );
229 
230         self.amf0_writer.write_object(&properties_map_b)?;
231 
232         self.write_chunk().await
233     }
234 
write_create_stream( &mut self, transaction_id: &f64, ) -> Result<(), NetConnectionError>235     pub async fn write_create_stream(
236         &mut self,
237         transaction_id: &f64,
238     ) -> Result<(), NetConnectionError> {
239         self.amf0_writer
240             .write_string(&String::from("createStream"))?;
241         self.amf0_writer.write_number(transaction_id)?;
242         self.amf0_writer.write_null()?;
243 
244         self.write_chunk().await
245     }
246 
write_create_stream_response( &mut self, transaction_id: &f64, stream_id: &f64, ) -> Result<(), NetConnectionError>247     pub async fn write_create_stream_response(
248         &mut self,
249         transaction_id: &f64,
250         stream_id: &f64,
251     ) -> Result<(), NetConnectionError> {
252         self.amf0_writer.write_string(&String::from("_result"))?;
253         self.amf0_writer.write_number(transaction_id)?;
254         self.amf0_writer.write_null()?;
255         self.amf0_writer.write_number(stream_id)?;
256 
257         self.write_chunk().await
258     }
259 
write_get_stream_length( &mut self, transaction_id: &f64, stream_name: &String, ) -> Result<(), NetConnectionError>260     pub async fn write_get_stream_length(
261         &mut self,
262         transaction_id: &f64,
263         stream_name: &String,
264     ) -> Result<(), NetConnectionError> {
265         self.amf0_writer
266             .write_string(&String::from("getStreamLength"))?;
267         self.amf0_writer.write_number(transaction_id)?;
268         self.amf0_writer.write_null()?;
269         self.amf0_writer.write_string(stream_name)?;
270 
271         self.write_chunk().await
272     }
273 
error( &mut self, transaction_id: &f64, code: &str, level: &str, description: &str, ) -> Result<(), NetConnectionError>274     pub async fn error(
275         &mut self,
276         transaction_id: &f64,
277         code: &str,
278         level: &str,
279         description: &str,
280     ) -> Result<(), NetConnectionError> {
281         self.amf0_writer.write_string(&String::from("_error"))?;
282         self.amf0_writer.write_number(transaction_id)?;
283         self.amf0_writer.write_null()?;
284 
285         let mut properties_map = IndexMap::new();
286 
287         properties_map.insert(
288             String::from("level"),
289             Amf0ValueType::UTF8String(level.to_owned()),
290         );
291         properties_map.insert(
292             String::from("code"),
293             Amf0ValueType::UTF8String(code.to_owned()),
294         );
295         properties_map.insert(
296             String::from("description"),
297             Amf0ValueType::UTF8String(description.to_owned()),
298         );
299         self.amf0_writer.write_object(&properties_map)?;
300 
301         self.write_chunk().await
302     }
303 }
304