1 use { 2 super::errors::NetConnectionError, 3 crate::{ 4 amf0::{amf0_writer::Amf0Writer, define::Amf0ValueType}, 5 chunk::{define as chunk_define, packetizer::ChunkPacketizer, ChunkInfo}, 6 messages::define as messages_define, 7 }, 8 bytesio::bytesio::TNetIO, 9 indexmap::IndexMap, 10 std::sync::Arc, 11 tokio::sync::Mutex, 12 }; 13 #[derive(Clone, Default, Debug)] 14 pub struct ConnectProperties { 15 pub app: Option<String>, // Server application name, e.g.: testapp 16 pub flash_ver: Option<String>, // Flash Player version, FMSc/1.0 17 pub swf_url: Option<String>, // URL of the source SWF file file://C:/FlvPlayer.swf 18 pub tc_url: Option<String>, // URL of the Server, rtmp://host:1935/testapp/instance1 19 pub fpad: Option<bool>, // True if proxy is being used. 20 pub capabilities: Option<f64>, // double default: 15 21 pub audio_codecs: Option<f64>, // double default: 4071 22 pub video_codecs: Option<f64>, // double default: 252 23 pub video_function: Option<f64>, // double default: 1 24 pub object_encoding: Option<f64>, 25 pub page_url: Option<String>, // http://host/sample.html 26 pub pub_type: Option<String>, 27 } 28 29 impl ConnectProperties { new(app_name: String) -> Self30 pub fn new(app_name: String) -> Self { 31 Self { 32 app: Some(app_name), 33 flash_ver: Some("LNX 9,0,124,2".to_string()), 34 swf_url: Some("".to_string()), 35 tc_url: Some("".to_string()), 36 fpad: Some(false), 37 capabilities: Some(15_f64), 38 audio_codecs: Some(4071_f64), 39 video_codecs: Some(252_f64), 40 video_function: Some(1_f64), 41 object_encoding: Some(0_f64), 42 page_url: Some("".to_string()), 43 pub_type: Some("nonprivate".to_string()), 44 } 45 } new_none() -> Self46 pub fn new_none() -> Self { 47 Self { 48 app: None, 49 flash_ver: None, 50 swf_url: None, 51 tc_url: None, 52 fpad: None, 53 capabilities: None, 54 audio_codecs: None, 55 video_codecs: None, 56 video_function: None, 57 object_encoding: None, 58 page_url: None, 59 pub_type: None, 60 } 61 } 62 } 63 64 pub struct NetConnection { 65 amf0_writer: Amf0Writer, 66 packetizer: ChunkPacketizer, 67 } 68 69 impl NetConnection { new(io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>) -> Self70 pub fn new(io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>) -> Self { 71 Self { 72 amf0_writer: Amf0Writer::new(), 73 packetizer: ChunkPacketizer::new(io), 74 } 75 } 76 write_chunk(&mut self) -> Result<(), NetConnectionError>77 async fn write_chunk(&mut self) -> Result<(), NetConnectionError> { 78 let data = self.amf0_writer.extract_current_bytes(); 79 let mut chunk_info = ChunkInfo::new( 80 chunk_define::csid_type::COMMAND_AMF0_AMF3, 81 chunk_define::chunk_type::TYPE_0, 82 0, 83 data.len() as u32, 84 messages_define::msg_type_id::COMMAND_AMF0, 85 0, 86 data, 87 ); 88 89 self.packetizer.write_chunk(&mut chunk_info).await?; 90 Ok(()) 91 } 92 write_connect_with_value( &mut self, transaction_id: &f64, properties: IndexMap<String, Amf0ValueType>, ) -> Result<(), NetConnectionError>93 pub async fn write_connect_with_value( 94 &mut self, 95 transaction_id: &f64, 96 properties: IndexMap<String, Amf0ValueType>, 97 ) -> Result<(), NetConnectionError> { 98 self.amf0_writer.write_string(&String::from("connect"))?; 99 self.amf0_writer.write_number(transaction_id)?; 100 101 self.amf0_writer.write_object(&properties)?; 102 103 self.write_chunk().await 104 } write_connect( &mut self, transaction_id: &f64, properties: &ConnectProperties, ) -> Result<(), NetConnectionError>105 pub async fn write_connect( 106 &mut self, 107 transaction_id: &f64, 108 properties: &ConnectProperties, 109 ) -> Result<(), NetConnectionError> { 110 self.amf0_writer.write_string(&String::from("connect"))?; 111 self.amf0_writer.write_number(transaction_id)?; 112 113 let mut properties_map = IndexMap::new(); 114 115 if let Some(app) = properties.app.clone() { 116 properties_map.insert(String::from("app"), Amf0ValueType::UTF8String(app)); 117 } 118 119 if let Some(pub_type) = properties.pub_type.clone() { 120 properties_map.insert(String::from("type"), Amf0ValueType::UTF8String(pub_type)); 121 } 122 123 if let Some(flash_ver) = properties.flash_ver.clone() { 124 properties_map.insert( 125 String::from("flashVer"), 126 Amf0ValueType::UTF8String(flash_ver), 127 ); 128 } 129 130 if let Some(tc_url) = properties.tc_url.clone() { 131 properties_map.insert(String::from("tcUrl"), Amf0ValueType::UTF8String(tc_url)); 132 } 133 134 if let Some(swf_url) = properties.swf_url.clone() { 135 properties_map.insert(String::from("swfUrl"), Amf0ValueType::UTF8String(swf_url)); 136 } 137 138 if let Some(page_url) = properties.page_url.clone() { 139 properties_map.insert(String::from("pageUrl"), Amf0ValueType::UTF8String(page_url)); 140 } 141 142 if let Some(fpad) = properties.fpad { 143 properties_map.insert(String::from("fpad"), Amf0ValueType::Boolean(fpad)); 144 } 145 146 if let Some(capabilities) = properties.capabilities { 147 properties_map.insert( 148 String::from("capabilities"), 149 Amf0ValueType::Number(capabilities), 150 ); 151 } 152 153 if let Some(audio_codecs) = properties.audio_codecs { 154 properties_map.insert( 155 String::from("audioCodecs"), 156 Amf0ValueType::Number(audio_codecs), 157 ); 158 } 159 160 if let Some(video_codecs) = properties.video_codecs { 161 properties_map.insert( 162 String::from("videoCodecs"), 163 Amf0ValueType::Number(video_codecs), 164 ); 165 } 166 167 if let Some(video_function) = properties.video_function { 168 properties_map.insert( 169 String::from("videoFunction"), 170 Amf0ValueType::Number(video_function), 171 ); 172 } 173 174 if let Some(object_encoding) = properties.object_encoding { 175 properties_map.insert( 176 String::from("objectEncoding"), 177 Amf0ValueType::Number(object_encoding), 178 ); 179 } 180 self.amf0_writer.write_object(&properties_map)?; 181 182 self.write_chunk().await 183 } 184 #[allow(clippy::too_many_arguments)] write_connect_response( &mut self, transaction_id: &f64, fmsver: &str, capabilities: &f64, code: &str, level: &str, description: &str, encoding: &f64, ) -> Result<(), NetConnectionError>185 pub async fn write_connect_response( 186 &mut self, 187 transaction_id: &f64, 188 fmsver: &str, 189 capabilities: &f64, 190 code: &str, 191 level: &str, 192 description: &str, 193 encoding: &f64, 194 ) -> Result<(), NetConnectionError> { 195 self.amf0_writer.write_string(&String::from("_result"))?; 196 self.amf0_writer.write_number(transaction_id)?; 197 198 let mut properties_map_a = IndexMap::new(); 199 200 properties_map_a.insert( 201 String::from("fmsVer"), 202 Amf0ValueType::UTF8String(fmsver.to_owned()), 203 ); 204 properties_map_a.insert( 205 String::from("capabilities"), 206 Amf0ValueType::Number(*capabilities), 207 ); 208 209 self.amf0_writer.write_object(&properties_map_a)?; 210 211 let mut properties_map_b = IndexMap::new(); 212 213 properties_map_b.insert( 214 String::from("level"), 215 Amf0ValueType::UTF8String(level.to_owned()), 216 ); 217 properties_map_b.insert( 218 String::from("code"), 219 Amf0ValueType::UTF8String(code.to_owned()), 220 ); 221 properties_map_b.insert( 222 String::from("description"), 223 Amf0ValueType::UTF8String(description.to_owned()), 224 ); 225 properties_map_b.insert( 226 String::from("objectEncoding"), 227 Amf0ValueType::Number(*encoding), 228 ); 229 230 self.amf0_writer.write_object(&properties_map_b)?; 231 232 self.write_chunk().await 233 } 234 write_create_stream( &mut self, transaction_id: &f64, ) -> Result<(), NetConnectionError>235 pub async fn write_create_stream( 236 &mut self, 237 transaction_id: &f64, 238 ) -> Result<(), NetConnectionError> { 239 self.amf0_writer 240 .write_string(&String::from("createStream"))?; 241 self.amf0_writer.write_number(transaction_id)?; 242 self.amf0_writer.write_null()?; 243 244 self.write_chunk().await 245 } 246 write_create_stream_response( &mut self, transaction_id: &f64, stream_id: &f64, ) -> Result<(), NetConnectionError>247 pub async fn write_create_stream_response( 248 &mut self, 249 transaction_id: &f64, 250 stream_id: &f64, 251 ) -> Result<(), NetConnectionError> { 252 self.amf0_writer.write_string(&String::from("_result"))?; 253 self.amf0_writer.write_number(transaction_id)?; 254 self.amf0_writer.write_null()?; 255 self.amf0_writer.write_number(stream_id)?; 256 257 self.write_chunk().await 258 } 259 write_get_stream_length( &mut self, transaction_id: &f64, stream_name: &String, ) -> Result<(), NetConnectionError>260 pub async fn write_get_stream_length( 261 &mut self, 262 transaction_id: &f64, 263 stream_name: &String, 264 ) -> Result<(), NetConnectionError> { 265 self.amf0_writer 266 .write_string(&String::from("getStreamLength"))?; 267 self.amf0_writer.write_number(transaction_id)?; 268 self.amf0_writer.write_null()?; 269 self.amf0_writer.write_string(stream_name)?; 270 271 self.write_chunk().await 272 } 273 error( &mut self, transaction_id: &f64, code: &str, level: &str, description: &str, ) -> Result<(), NetConnectionError>274 pub async fn error( 275 &mut self, 276 transaction_id: &f64, 277 code: &str, 278 level: &str, 279 description: &str, 280 ) -> Result<(), NetConnectionError> { 281 self.amf0_writer.write_string(&String::from("_error"))?; 282 self.amf0_writer.write_number(transaction_id)?; 283 self.amf0_writer.write_null()?; 284 285 let mut properties_map = IndexMap::new(); 286 287 properties_map.insert( 288 String::from("level"), 289 Amf0ValueType::UTF8String(level.to_owned()), 290 ); 291 properties_map.insert( 292 String::from("code"), 293 Amf0ValueType::UTF8String(code.to_owned()), 294 ); 295 properties_map.insert( 296 String::from("description"), 297 Amf0ValueType::UTF8String(description.to_owned()), 298 ); 299 self.amf0_writer.write_object(&properties_map)?; 300 301 self.write_chunk().await 302 } 303 } 304