1 use crate::wasi::clocks::monotonic_clock; 2 use crate::wasi::io::poll::{self, Pollable}; 3 use crate::wasi::io::streams::{InputStream, OutputStream, StreamError}; 4 use crate::wasi::random; 5 use crate::wasi::sockets::instance_network; 6 use crate::wasi::sockets::ip_name_lookup; 7 use crate::wasi::sockets::network::{ 8 ErrorCode, IpAddress, IpAddressFamily, IpSocketAddress, Ipv4SocketAddress, Ipv6SocketAddress, 9 Network, 10 }; 11 use crate::wasi::sockets::tcp::TcpSocket; 12 use crate::wasi::sockets::udp::{ 13 IncomingDatagram, IncomingDatagramStream, OutgoingDatagram, OutgoingDatagramStream, UdpSocket, 14 }; 15 use crate::wasi::sockets::{tcp_create_socket, udp_create_socket}; 16 use std::ops::Range; 17 18 const TIMEOUT_NS: u64 = 1_000_000_000; 19 20 pub fn supports_ipv6() -> bool { 21 std::env::var("DISABLE_IPV6").is_err() 22 } 23 24 impl Pollable { 25 pub fn block_until(&self, timeout: &Pollable) -> Result<(), ErrorCode> { 26 let ready = poll::poll(&[self, timeout]); 27 assert!(ready.len() > 0); 28 match ready[0] { 29 0 => Ok(()), 30 1 => Err(ErrorCode::Timeout), 31 _ => unreachable!(), 32 } 33 } 34 } 35 36 impl InputStream { 37 pub fn blocking_read_to_end(&self) -> Result<Vec<u8>, crate::wasi::io::error::Error> { 38 let mut data = vec![]; 39 loop { 40 match self.blocking_read(1024 * 1024) { 41 Ok(chunk) => data.extend(chunk), 42 Err(StreamError::Closed) => return Ok(data), 43 Err(StreamError::LastOperationFailed(e)) => return Err(e), 44 } 45 } 46 } 47 } 48 49 impl OutputStream { 50 pub fn blocking_write_util(&self, mut bytes: &[u8]) -> Result<(), StreamError> { 51 let timeout = monotonic_clock::subscribe_duration(TIMEOUT_NS); 52 let pollable = self.subscribe(); 53 54 while !bytes.is_empty() { 55 pollable.block_until(&timeout).expect("write timed out"); 56 57 let permit = self.check_write()?; 58 59 let len = bytes.len().min(permit as usize); 60 let (chunk, rest) = bytes.split_at(len); 61 62 self.write(chunk)?; 63 64 self.blocking_flush()?; 65 66 bytes = rest; 67 } 68 Ok(()) 69 } 70 } 71 72 impl Network { 73 pub fn default() -> Network { 74 instance_network::instance_network() 75 } 76 77 pub fn blocking_resolve_addresses(&self, name: &str) -> Result<Vec<IpAddress>, ErrorCode> { 78 let stream = ip_name_lookup::resolve_addresses(&self, name)?; 79 80 let timeout = monotonic_clock::subscribe_duration(TIMEOUT_NS); 81 let pollable = stream.subscribe(); 82 83 let mut addresses = vec![]; 84 85 loop { 86 match stream.resolve_next_address() { 87 Ok(Some(addr)) => { 88 addresses.push(addr); 89 } 90 Ok(None) => match addresses[..] { 91 [] => return Err(ErrorCode::NameUnresolvable), 92 _ => return Ok(addresses), 93 }, 94 Err(ErrorCode::WouldBlock) => { 95 pollable.block_until(&timeout)?; 96 } 97 Err(err) => return Err(err), 98 } 99 } 100 } 101 102 /// Same as `Network::blocking_resolve_addresses` but ignores post validation errors 103 /// 104 /// The ignored error codes signal that the input passed validation 105 /// and a lookup was actually attempted, but failed. These are ignored to 106 /// make the CI tests less flaky. 107 pub fn permissive_blocking_resolve_addresses( 108 &self, 109 name: &str, 110 ) -> Result<Vec<IpAddress>, ErrorCode> { 111 match self.blocking_resolve_addresses(name) { 112 Err(ErrorCode::NameUnresolvable | ErrorCode::TemporaryResolverFailure) => Ok(vec![]), 113 r => r, 114 } 115 } 116 } 117 118 impl TcpSocket { 119 pub fn new(address_family: IpAddressFamily) -> Result<TcpSocket, ErrorCode> { 120 tcp_create_socket::create_tcp_socket(address_family) 121 } 122 123 pub fn blocking_bind( 124 &self, 125 network: &Network, 126 local_address: IpSocketAddress, 127 ) -> Result<(), ErrorCode> { 128 let timeout = monotonic_clock::subscribe_duration(TIMEOUT_NS); 129 let sub = self.subscribe(); 130 131 self.start_bind(&network, local_address)?; 132 133 loop { 134 match self.finish_bind() { 135 Err(ErrorCode::WouldBlock) => sub.block_until(&timeout)?, 136 result => return result, 137 } 138 } 139 } 140 141 pub fn blocking_listen(&self) -> Result<(), ErrorCode> { 142 let timeout = monotonic_clock::subscribe_duration(TIMEOUT_NS); 143 let sub = self.subscribe(); 144 145 self.start_listen()?; 146 147 loop { 148 match self.finish_listen() { 149 Err(ErrorCode::WouldBlock) => sub.block_until(&timeout)?, 150 result => return result, 151 } 152 } 153 } 154 155 pub fn blocking_connect( 156 &self, 157 network: &Network, 158 remote_address: IpSocketAddress, 159 ) -> Result<(InputStream, OutputStream), ErrorCode> { 160 let timeout = monotonic_clock::subscribe_duration(TIMEOUT_NS); 161 let sub = self.subscribe(); 162 163 self.start_connect(&network, remote_address)?; 164 165 loop { 166 match self.finish_connect() { 167 Err(ErrorCode::WouldBlock) => sub.block_until(&timeout)?, 168 result => return result, 169 } 170 } 171 } 172 173 pub fn blocking_accept(&self) -> Result<(TcpSocket, InputStream, OutputStream), ErrorCode> { 174 let timeout = monotonic_clock::subscribe_duration(TIMEOUT_NS); 175 let sub = self.subscribe(); 176 177 loop { 178 match self.accept() { 179 Err(ErrorCode::WouldBlock) => sub.block_until(&timeout)?, 180 result => return result, 181 } 182 } 183 } 184 } 185 186 impl UdpSocket { 187 pub fn new(address_family: IpAddressFamily) -> Result<UdpSocket, ErrorCode> { 188 udp_create_socket::create_udp_socket(address_family) 189 } 190 191 pub fn blocking_bind( 192 &self, 193 network: &Network, 194 local_address: IpSocketAddress, 195 ) -> Result<(), ErrorCode> { 196 let timeout = monotonic_clock::subscribe_duration(TIMEOUT_NS); 197 let sub = self.subscribe(); 198 199 self.start_bind(&network, local_address)?; 200 201 loop { 202 match self.finish_bind() { 203 Err(ErrorCode::WouldBlock) => sub.block_until(&timeout)?, 204 result => return result, 205 } 206 } 207 } 208 209 pub fn blocking_bind_unspecified(&self, network: &Network) -> Result<(), ErrorCode> { 210 let ip = IpAddress::new_unspecified(self.address_family()); 211 let port = 0; 212 213 self.blocking_bind(network, IpSocketAddress::new(ip, port)) 214 } 215 } 216 217 impl OutgoingDatagramStream { 218 fn blocking_check_send(&self, timeout: &Pollable) -> Result<u64, ErrorCode> { 219 let sub = self.subscribe(); 220 221 loop { 222 match self.check_send() { 223 Ok(0) => sub.block_until(timeout)?, 224 result => return result, 225 } 226 } 227 } 228 229 pub fn blocking_send(&self, mut datagrams: &[OutgoingDatagram]) -> Result<(), ErrorCode> { 230 let timeout = monotonic_clock::subscribe_duration(TIMEOUT_NS); 231 232 while !datagrams.is_empty() { 233 let permit = self.blocking_check_send(&timeout)?; 234 let chunk_len = datagrams.len().min(permit as usize); 235 match self.send(&datagrams[..chunk_len]) { 236 Ok(0) => {} 237 Ok(packets_sent) => { 238 let packets_sent = packets_sent as usize; 239 datagrams = &datagrams[packets_sent..]; 240 } 241 Err(err) => return Err(err), 242 } 243 } 244 245 Ok(()) 246 } 247 } 248 249 impl IncomingDatagramStream { 250 pub fn blocking_receive(&self, count: Range<u64>) -> Result<Vec<IncomingDatagram>, ErrorCode> { 251 let timeout = monotonic_clock::subscribe_duration(TIMEOUT_NS); 252 let pollable = self.subscribe(); 253 let mut datagrams = vec![]; 254 255 loop { 256 match self.receive(count.end - datagrams.len() as u64) { 257 Ok(mut chunk) => { 258 datagrams.append(&mut chunk); 259 260 if datagrams.len() >= count.start as usize { 261 return Ok(datagrams); 262 } else { 263 pollable.block_until(&timeout)?; 264 } 265 } 266 Err(err) => return Err(err), 267 } 268 } 269 } 270 } 271 272 impl IpAddress { 273 pub const IPV4_BROADCAST: IpAddress = IpAddress::Ipv4((255, 255, 255, 255)); 274 275 pub const IPV4_LOOPBACK: IpAddress = IpAddress::Ipv4((127, 0, 0, 1)); 276 pub const IPV6_LOOPBACK: IpAddress = IpAddress::Ipv6((0, 0, 0, 0, 0, 0, 0, 1)); 277 278 pub const IPV4_UNSPECIFIED: IpAddress = IpAddress::Ipv4((0, 0, 0, 0)); 279 pub const IPV6_UNSPECIFIED: IpAddress = IpAddress::Ipv6((0, 0, 0, 0, 0, 0, 0, 0)); 280 281 pub const IPV4_MAPPED_LOOPBACK: IpAddress = 282 IpAddress::Ipv6((0, 0, 0, 0, 0, 0xFFFF, 0x7F00, 0x0001)); 283 284 pub const fn new_loopback(family: IpAddressFamily) -> IpAddress { 285 match family { 286 IpAddressFamily::Ipv4 => Self::IPV4_LOOPBACK, 287 IpAddressFamily::Ipv6 => Self::IPV6_LOOPBACK, 288 } 289 } 290 291 pub const fn new_unspecified(family: IpAddressFamily) -> IpAddress { 292 match family { 293 IpAddressFamily::Ipv4 => Self::IPV4_UNSPECIFIED, 294 IpAddressFamily::Ipv6 => Self::IPV6_UNSPECIFIED, 295 } 296 } 297 298 pub const fn family(&self) -> IpAddressFamily { 299 match self { 300 IpAddress::Ipv4(_) => IpAddressFamily::Ipv4, 301 IpAddress::Ipv6(_) => IpAddressFamily::Ipv6, 302 } 303 } 304 } 305 306 impl PartialEq for IpAddress { 307 fn eq(&self, other: &Self) -> bool { 308 match (self, other) { 309 (Self::Ipv4(left), Self::Ipv4(right)) => left == right, 310 (Self::Ipv6(left), Self::Ipv6(right)) => left == right, 311 _ => false, 312 } 313 } 314 } 315 316 impl IpSocketAddress { 317 pub const fn new(ip: IpAddress, port: u16) -> IpSocketAddress { 318 match ip { 319 IpAddress::Ipv4(addr) => IpSocketAddress::Ipv4(Ipv4SocketAddress { 320 port, 321 address: addr, 322 }), 323 IpAddress::Ipv6(addr) => IpSocketAddress::Ipv6(Ipv6SocketAddress { 324 port, 325 address: addr, 326 flow_info: 0, 327 scope_id: 0, 328 }), 329 } 330 } 331 332 pub const fn ip(&self) -> IpAddress { 333 match self { 334 IpSocketAddress::Ipv4(addr) => IpAddress::Ipv4(addr.address), 335 IpSocketAddress::Ipv6(addr) => IpAddress::Ipv6(addr.address), 336 } 337 } 338 339 pub const fn port(&self) -> u16 { 340 match self { 341 IpSocketAddress::Ipv4(addr) => addr.port, 342 IpSocketAddress::Ipv6(addr) => addr.port, 343 } 344 } 345 346 pub const fn family(&self) -> IpAddressFamily { 347 match self { 348 IpSocketAddress::Ipv4(_) => IpAddressFamily::Ipv4, 349 IpSocketAddress::Ipv6(_) => IpAddressFamily::Ipv6, 350 } 351 } 352 } 353 354 impl PartialEq for Ipv4SocketAddress { 355 fn eq(&self, other: &Self) -> bool { 356 self.port == other.port && self.address == other.address 357 } 358 } 359 360 impl PartialEq for Ipv6SocketAddress { 361 fn eq(&self, other: &Self) -> bool { 362 self.port == other.port 363 && self.flow_info == other.flow_info 364 && self.address == other.address 365 && self.scope_id == other.scope_id 366 } 367 } 368 369 impl PartialEq for IpSocketAddress { 370 fn eq(&self, other: &Self) -> bool { 371 match (self, other) { 372 (Self::Ipv4(l0), Self::Ipv4(r0)) => l0 == r0, 373 (Self::Ipv6(l0), Self::Ipv6(r0)) => l0 == r0, 374 _ => false, 375 } 376 } 377 } 378 379 fn generate_random_u16(range: Range<u16>) -> u16 { 380 let start = range.start as u64; 381 let end = range.end as u64; 382 let port = start + (random::random::get_random_u64() % (end - start)); 383 port as u16 384 } 385 386 /// Execute the inner function with a randomly generated port. 387 /// To prevent random failures, we make a few attempts before giving up. 388 pub fn attempt_random_port<F>( 389 local_address: IpAddress, 390 mut f: F, 391 ) -> Result<IpSocketAddress, ErrorCode> 392 where 393 F: FnMut(IpSocketAddress) -> Result<(), ErrorCode>, 394 { 395 const MAX_ATTEMPTS: u32 = 10; 396 let mut i = 0; 397 loop { 398 i += 1; 399 400 let port: u16 = generate_random_u16(1024..u16::MAX); 401 let sock_addr = IpSocketAddress::new(local_address, port); 402 403 match f(sock_addr) { 404 Ok(_) => return Ok(sock_addr), 405 Err(e) if i >= MAX_ATTEMPTS => return Err(e), 406 // Try again if the port is already taken. This can sometimes show up as `AccessDenied` on Windows. 407 Err(ErrorCode::AddressInUse | ErrorCode::AccessDenied) => {} 408 Err(e) => return Err(e), 409 } 410 } 411 } 412