1 use { 2 super::errors::NetStreamError, 3 crate::{ 4 amf0::{amf0_writer::Amf0Writer, define::Amf0ValueType}, 5 chunk::{define as chunk_define, packetizer::ChunkPacketizer, ChunkInfo}, 6 messages::define as messages_define, 7 }, 8 bytesio::bytesio::TNetIO, 9 indexmap::IndexMap, 10 std::sync::Arc, 11 tokio::sync::Mutex, 12 }; 13 14 pub struct NetStreamWriter { 15 amf0_writer: Amf0Writer, 16 packetizer: ChunkPacketizer, 17 } 18 19 impl NetStreamWriter { new(io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>) -> Self20 pub fn new(io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>) -> Self { 21 Self { 22 amf0_writer: Amf0Writer::new(), 23 packetizer: ChunkPacketizer::new(io), 24 } 25 } write_chunk(&mut self, msg_stream_id: u32) -> Result<(), NetStreamError>26 async fn write_chunk(&mut self, msg_stream_id: u32) -> Result<(), NetStreamError> { 27 let data = self.amf0_writer.extract_current_bytes(); 28 29 let mut chunk_info = ChunkInfo::new( 30 chunk_define::csid_type::COMMAND_AMF0_AMF3, 31 chunk_define::chunk_type::TYPE_0, 32 0, 33 data.len() as u32, 34 messages_define::msg_type_id::COMMAND_AMF0, 35 msg_stream_id, 36 data, 37 ); 38 39 self.packetizer.write_chunk(&mut chunk_info).await?; 40 Ok(()) 41 } write_play( &mut self, transaction_id: &f64, stream_name: &String, start: &f64, duration: &f64, reset: &bool, ) -> Result<(), NetStreamError>42 pub async fn write_play( 43 &mut self, 44 transaction_id: &f64, 45 stream_name: &String, 46 start: &f64, 47 duration: &f64, 48 reset: &bool, 49 ) -> Result<(), NetStreamError> { 50 self.amf0_writer.write_string(&String::from("play"))?; 51 self.amf0_writer.write_number(transaction_id)?; 52 self.amf0_writer.write_null()?; 53 self.amf0_writer.write_string(stream_name)?; 54 self.amf0_writer.write_number(start)?; 55 self.amf0_writer.write_number(duration)?; 56 self.amf0_writer.write_bool(reset)?; 57 58 self.write_chunk(0).await 59 } write_delete_stream( &mut self, transaction_id: &f64, stream_id: &f64, ) -> Result<(), NetStreamError>60 pub async fn write_delete_stream( 61 &mut self, 62 transaction_id: &f64, 63 stream_id: &f64, 64 ) -> Result<(), NetStreamError> { 65 self.amf0_writer 66 .write_string(&String::from("deleteStream"))?; 67 self.amf0_writer.write_number(transaction_id)?; 68 self.amf0_writer.write_null()?; 69 self.amf0_writer.write_number(stream_id)?; 70 71 self.write_chunk(0).await 72 } 73 write_close_stream( &mut self, transaction_id: &f64, stream_id: &f64, ) -> Result<(), NetStreamError>74 pub async fn write_close_stream( 75 &mut self, 76 transaction_id: &f64, 77 stream_id: &f64, 78 ) -> Result<(), NetStreamError> { 79 self.amf0_writer 80 .write_string(&String::from("closeStream"))?; 81 self.amf0_writer.write_number(transaction_id)?; 82 self.amf0_writer.write_null()?; 83 self.amf0_writer.write_number(stream_id)?; 84 85 self.write_chunk(0).await 86 } 87 write_release_stream( &mut self, transaction_id: &f64, stream_name: &String, ) -> Result<(), NetStreamError>88 pub async fn write_release_stream( 89 &mut self, 90 transaction_id: &f64, 91 stream_name: &String, 92 ) -> Result<(), NetStreamError> { 93 self.amf0_writer 94 .write_string(&String::from("releaseStream"))?; 95 self.amf0_writer.write_number(transaction_id)?; 96 self.amf0_writer.write_null()?; 97 self.amf0_writer.write_string(stream_name)?; 98 99 self.write_chunk(0).await 100 } 101 write_fcpublish( &mut self, transaction_id: &f64, stream_name: &String, ) -> Result<(), NetStreamError>102 pub async fn write_fcpublish( 103 &mut self, 104 transaction_id: &f64, 105 stream_name: &String, 106 ) -> Result<(), NetStreamError> { 107 self.amf0_writer.write_string(&String::from("FCPublish"))?; 108 self.amf0_writer.write_number(transaction_id)?; 109 self.amf0_writer.write_null()?; 110 self.amf0_writer.write_string(stream_name)?; 111 112 self.write_chunk(0).await 113 } 114 115 #[allow(dead_code)] write_receive_audio( &mut self, transaction_id: &f64, enable: &bool, ) -> Result<(), NetStreamError>116 async fn write_receive_audio( 117 &mut self, 118 transaction_id: &f64, 119 enable: &bool, 120 ) -> Result<(), NetStreamError> { 121 self.amf0_writer 122 .write_string(&String::from("receiveAudio"))?; 123 self.amf0_writer.write_number(transaction_id)?; 124 self.amf0_writer.write_null()?; 125 self.amf0_writer.write_bool(enable)?; 126 127 self.write_chunk(0).await 128 } 129 #[allow(dead_code)] write_receive_video( &mut self, transaction_id: &f64, enable: &bool, ) -> Result<(), NetStreamError>130 async fn write_receive_video( 131 &mut self, 132 transaction_id: &f64, 133 enable: &bool, 134 ) -> Result<(), NetStreamError> { 135 self.amf0_writer 136 .write_string(&String::from("receiveVideo"))?; 137 self.amf0_writer.write_number(transaction_id)?; 138 self.amf0_writer.write_null()?; 139 self.amf0_writer.write_bool(enable)?; 140 141 self.write_chunk(0).await 142 } write_publish( &mut self, transaction_id: &f64, stream_name: &String, stream_type: &String, ) -> Result<(), NetStreamError>143 pub async fn write_publish( 144 &mut self, 145 transaction_id: &f64, 146 stream_name: &String, 147 stream_type: &String, 148 ) -> Result<(), NetStreamError> { 149 self.amf0_writer.write_string(&String::from("publish"))?; 150 self.amf0_writer.write_number(transaction_id)?; 151 self.amf0_writer.write_null()?; 152 self.amf0_writer.write_string(stream_name)?; 153 self.amf0_writer.write_string(stream_type)?; 154 155 self.write_chunk(0).await 156 } 157 #[allow(dead_code)] write_seek(&mut self, transaction_id: &f64, ms: &f64) -> Result<(), NetStreamError>158 async fn write_seek(&mut self, transaction_id: &f64, ms: &f64) -> Result<(), NetStreamError> { 159 self.amf0_writer.write_string(&String::from("seek"))?; 160 self.amf0_writer.write_number(transaction_id)?; 161 self.amf0_writer.write_null()?; 162 self.amf0_writer.write_number(ms)?; 163 164 self.write_chunk(0).await 165 } 166 #[allow(dead_code)] write_pause( &mut self, transaction_id: &f64, pause: &bool, ms: &f64, ) -> Result<(), NetStreamError>167 async fn write_pause( 168 &mut self, 169 transaction_id: &f64, 170 pause: &bool, 171 ms: &f64, 172 ) -> Result<(), NetStreamError> { 173 self.amf0_writer.write_string(&String::from("pause"))?; 174 self.amf0_writer.write_number(transaction_id)?; 175 self.amf0_writer.write_null()?; 176 self.amf0_writer.write_bool(pause)?; 177 self.amf0_writer.write_number(ms)?; 178 179 self.write_chunk(0).await 180 } 181 182 #[allow(dead_code)] write_on_bw_done( &mut self, transaction_id: &f64, bandwidth: &f64, ) -> Result<(), NetStreamError>183 async fn write_on_bw_done( 184 &mut self, 185 transaction_id: &f64, 186 bandwidth: &f64, 187 ) -> Result<(), NetStreamError> { 188 self.amf0_writer.write_string(&String::from("onBWDone"))?; 189 self.amf0_writer.write_number(transaction_id)?; 190 self.amf0_writer.write_null()?; 191 self.amf0_writer.write_number(bandwidth)?; 192 193 self.write_chunk(0).await 194 } 195 write_on_status( &mut self, transaction_id: &f64, level: &str, code: &str, description: &str, ) -> Result<(), NetStreamError>196 pub async fn write_on_status( 197 &mut self, 198 transaction_id: &f64, 199 level: &str, 200 code: &str, 201 description: &str, 202 ) -> Result<(), NetStreamError> { 203 self.amf0_writer.write_string(&String::from("onStatus"))?; 204 self.amf0_writer.write_number(transaction_id)?; 205 self.amf0_writer.write_null()?; 206 207 let mut properties_map = IndexMap::new(); 208 209 properties_map.insert( 210 String::from("level"), 211 Amf0ValueType::UTF8String(level.to_owned()), 212 ); 213 properties_map.insert( 214 String::from("code"), 215 Amf0ValueType::UTF8String(code.to_owned()), 216 ); 217 properties_map.insert( 218 String::from("description"), 219 Amf0ValueType::UTF8String(description.to_owned()), 220 ); 221 222 self.amf0_writer.write_object(&properties_map)?; 223 224 self.write_chunk(1).await 225 } 226 } 227