1 /** 2 TCP/UDP connection and server handling. 3 4 Copyright: © 2012-2016 Sönke Ludwig 5 Authors: Sönke Ludwig 6 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 7 */ 8 module vibe.core.net; 9 10 import eventcore.core; 11 import std.exception : enforce; 12 import std.format : format; 13 import std.functional : toDelegate; 14 import std.socket : AddressFamily, UnknownAddress; 15 import vibe.core.internal.release; 16 import vibe.core.log; 17 import vibe.core.stream; 18 import vibe.internal.async; 19 import core.time : Duration; 20 21 @safe: 22 23 24 /** 25 Resolves the given host name/IP address string. 26 27 This routine converts a string to a `NetworkAddress`. If the string is an IP 28 address, no network traffic will be generated and the routine will not block. 29 If it is not, a DNS query will be issued, unless forbidden by the `use_dns` 30 parameter, in which case the query is guaranteed not to block. 31 32 Params: 33 host = The string to resolve, either an IP address or a hostname 34 family = The desired address family to return. By default, returns 35 the family that `host` is in. If a value is specified, this routine 36 will throw if the value found for `host` doesn't match this parameter. 37 use_dns = Whether to use the DNS if `host` is not an IP address. 38 If `false` and `host` is not an IP, this routine will throw. 39 Defaults to `true`. 40 timeout = If `use_dns` is `true`, the `Duration` to use as timeout 41 for the DNS lookup. 42 43 Throws: 44 In case of lookup failure, if `family` doesn't match `host`, 45 or if `use_dns` is `false` but `host` is not an IP. 46 See the parameter description for more details. 47 48 Returns: 49 A valid `NetworkAddress` matching `host`. 50 */ 51 NetworkAddress resolveHost(string host, AddressFamily family = AddressFamily.UNSPEC, bool use_dns = true, Duration timeout = Duration.max) 52 { 53 return resolveHost(host, cast(ushort)family, use_dns, timeout); 54 } 55 56 /// ditto 57 NetworkAddress resolveHost(string host, ushort family, bool use_dns = true, Duration timeout = Duration.max) 58 { 59 import std.socket : parseAddress; 60 version (Windows) import core.sys.windows.winsock2 : sockaddr_in, sockaddr_in6; 61 else import core.sys.posix.netinet.in_ : sockaddr_in, sockaddr_in6; 62 63 enforce(host.length > 0, "Host name must not be empty."); 64 // Fast path: If it looks like an IP address, we don't need to resolve it 65 if (isMaybeIPAddress(host)) { 66 auto addr = parseAddress(host); 67 enforce(family == AddressFamily.UNSPEC || addr.addressFamily == family); 68 NetworkAddress ret; 69 ret.family = addr.addressFamily; 70 switch (addr.addressFamily) with(AddressFamily) { 71 default: throw new Exception("Unsupported address family"); 72 case INET: *ret.sockAddrInet4 = () @trusted { return *cast(sockaddr_in*)addr.name; } (); break; 73 case INET6: *ret.sockAddrInet6 = () @trusted { return *cast(sockaddr_in6*)addr.name; } (); break; 74 } 75 return ret; 76 } 77 78 // Otherwise we need to do a DNS lookup 79 enforce(use_dns, "Malformed IP address string."); 80 NetworkAddress res; 81 bool success = false; 82 alias waitable = Waitable!(DNSLookupCallback, 83 cb => eventDriver.dns.lookupHost(host, cb), 84 (cb, id) => eventDriver.dns.cancelLookup(id), 85 (DNSLookupID, DNSStatus status, scope RefAddress[] addrs) { 86 if (status == DNSStatus.ok) { 87 foreach (addr; addrs) { 88 if (family != AddressFamily.UNSPEC && addr.addressFamily != family) continue; 89 try res = NetworkAddress(addr); 90 catch (Exception e) { logDiagnostic("Failed to store address from DNS lookup: %s", e.msg); } 91 success = true; 92 break; 93 } 94 } 95 } 96 ); 97 98 asyncAwaitAny!(true, waitable)(timeout); 99 100 enforce(success, "Failed to lookup host '"~host~"'."); 101 return res; 102 } 103 104 105 /** 106 Starts listening on the specified port. 107 108 'connection_callback' will be called for each client that connects to the 109 server socket. Each new connection gets its own fiber. The stream parameter 110 then allows to perform blocking I/O on the client socket. 111 112 The address parameter can be used to specify the network 113 interface on which the server socket is supposed to listen for connections. 114 By default, all IPv4 and IPv6 interfaces will be used. 115 */ 116 TCPListener[] listenTCP(ushort port, TCPConnectionDelegate connection_callback, TCPListenOptions options = TCPListenOptions.defaults) 117 { 118 TCPListener[] ret; 119 try ret ~= listenTCP(port, connection_callback, "::", options); 120 catch (Exception e) logDiagnostic("Failed to listen on \"::\": %s", e.msg); 121 try ret ~= listenTCP(port, connection_callback, "0.0.0.0", options); 122 catch (Exception e) logDiagnostic("Failed to listen on \"0.0.0.0\": %s", e.msg); 123 enforce(ret.length > 0, format("Failed to listen on all interfaces on port %s", port)); 124 return ret; 125 } 126 /// ditto 127 TCPListener listenTCP(ushort port, TCPConnectionDelegate connection_callback, string address, TCPListenOptions options = TCPListenOptions.defaults) 128 { 129 auto addr = resolveHost(address); 130 addr.port = port; 131 StreamListenOptions sopts = StreamListenOptions.defaults; 132 if (options & TCPListenOptions.reuseAddress) 133 sopts |= StreamListenOptions.reuseAddress; 134 else 135 sopts &= ~StreamListenOptions.reuseAddress; 136 if (options & TCPListenOptions.reusePort) 137 sopts |= StreamListenOptions.reusePort; 138 else 139 sopts &= ~StreamListenOptions.reusePort; 140 scope addrc = new RefAddress(addr.sockAddr, addr.sockAddrLen); 141 auto sock = eventDriver.sockets.listenStream(addrc, sopts, 142 (StreamListenSocketFD ls, StreamSocketFD s, scope RefAddress addr) @safe nothrow { 143 import vibe.core.core : runTask; 144 auto conn = TCPConnection(s, addr); 145 runTask(connection_callback, conn); 146 }); 147 enforce(sock != StreamListenSocketFD.invalid, "Failed to listen on "~addr.toString()); 148 return TCPListener(sock); 149 } 150 151 /// Compatibility overload - use an `@safe nothrow` callback instead. 152 deprecated("Use a @safe nothrow callback instead.") 153 TCPListener[] listenTCP(ushort port, void delegate(TCPConnection) connection_callback, TCPListenOptions options = TCPListenOptions.defaults) 154 { 155 TCPListener[] ret; 156 try ret ~= listenTCP(port, connection_callback, "::", options); 157 catch (Exception e) logDiagnostic("Failed to listen on \"::\": %s", e.msg); 158 try ret ~= listenTCP(port, connection_callback, "0.0.0.0", options); 159 catch (Exception e) logDiagnostic("Failed to listen on \"0.0.0.0\": %s", e.msg); 160 enforce(ret.length > 0, format("Failed to listen on all interfaces on port %s", port)); 161 return ret; 162 } 163 /// ditto 164 deprecated("Use a @safe nothrow callback instead.") 165 TCPListener listenTCP(ushort port, void delegate(TCPConnection) connection_callback, string address, TCPListenOptions options = TCPListenOptions.defaults) 166 { 167 return listenTCP(port, (conn) @trusted nothrow { 168 try connection_callback(conn); 169 catch (Exception e) { 170 e.logException("Handling of connection failed"); 171 conn.close(); 172 } 173 }, address, options); 174 } 175 176 /** 177 Starts listening on the specified port. 178 179 This function is the same as listenTCP but takes a function callback instead of a delegate. 180 */ 181 TCPListener[] listenTCP_s(ushort port, TCPConnectionFunction connection_callback, TCPListenOptions options = TCPListenOptions.defaults) @trusted 182 { 183 return listenTCP(port, toDelegate(connection_callback), options); 184 } 185 /// ditto 186 TCPListener listenTCP_s(ushort port, TCPConnectionFunction connection_callback, string address, TCPListenOptions options = TCPListenOptions.defaults) @trusted 187 { 188 return listenTCP(port, toDelegate(connection_callback), address, options); 189 } 190 191 /** 192 Establishes a connection to the given host/port. 193 */ 194 TCPConnection connectTCP(string host, ushort port, string bind_interface = null, 195 ushort bind_port = 0, Duration timeout = Duration.max) 196 { 197 NetworkAddress addr = resolveHost(host); 198 addr.port = port; 199 if (addr.family != AddressFamily.UNIX) 200 addr.port = port; 201 202 NetworkAddress bind_address; 203 if (bind_interface.length) bind_address = resolveHost(bind_interface, addr.family); 204 else { 205 bind_address.family = addr.family; 206 if (bind_address.family == AddressFamily.INET) bind_address.sockAddrInet4.sin_addr.s_addr = 0; 207 else if (bind_address.family != AddressFamily.UNIX) bind_address.sockAddrInet6.sin6_addr.s6_addr[] = 0; 208 } 209 if (addr.family != AddressFamily.UNIX) 210 bind_address.port = bind_port; 211 212 return connectTCP(addr, bind_address, timeout); 213 } 214 /// ditto 215 TCPConnection connectTCP(NetworkAddress addr, NetworkAddress bind_address = anyAddress, 216 Duration timeout = Duration.max) 217 { 218 import std.conv : to; 219 220 if (bind_address.family == AddressFamily.UNSPEC) { 221 bind_address.family = addr.family; 222 if (bind_address.family == AddressFamily.INET) bind_address.sockAddrInet4.sin_addr.s_addr = 0; 223 else if (bind_address.family != AddressFamily.UNIX) bind_address.sockAddrInet6.sin6_addr.s6_addr[] = 0; 224 if (bind_address.family != AddressFamily.UNIX) 225 bind_address.port = 0; 226 } 227 enforce(addr.family == bind_address.family, "Destination address and bind address have different address families."); 228 229 return () @trusted { // scope 230 scope uaddr = new RefAddress(addr.sockAddr, addr.sockAddrLen); 231 scope baddr = new RefAddress(bind_address.sockAddr, bind_address.sockAddrLen); 232 233 bool cancelled; 234 StreamSocketFD sock; 235 ConnectStatus status; 236 237 alias waiter = Waitable!(ConnectCallback, 238 cb => eventDriver.sockets.connectStream(uaddr, baddr, cb), 239 (cb, sock_fd) { 240 cancelled = true; 241 if (sock_fd != SocketFD.invalid) 242 eventDriver.sockets.cancelConnectStream(sock_fd); 243 }, 244 (fd, st) { sock = fd; status = st; } 245 ); 246 247 asyncAwaitAny!(true, waiter)(timeout); 248 249 enforce(!cancelled, "Failed to connect to " ~ addr.toString() ~ 250 ": timeout"); 251 252 if (status != ConnectStatus.connected) { 253 if (sock != SocketFD.invalid) 254 eventDriver.sockets.releaseRef(sock); 255 256 enforce(false, "Failed to connect to "~addr.toString()~": "~status.to!string); 257 assert(false); 258 } 259 260 return TCPConnection(sock, uaddr); 261 } (); 262 } 263 264 265 /** 266 Creates a bound UDP socket suitable for sending and receiving packets. 267 */ 268 UDPConnection listenUDP(ref NetworkAddress addr, UDPListenOptions options = UDPListenOptions.none) 269 { 270 return UDPConnection(addr, options); 271 } 272 /// ditto 273 UDPConnection listenUDP(ushort port, string bind_address = "0.0.0.0", 274 UDPListenOptions options = UDPListenOptions.none) 275 { 276 auto addr = resolveHost(bind_address, AddressFamily.UNSPEC, false); 277 addr.port = port; 278 return UDPConnection(addr, options); 279 } 280 281 NetworkAddress anyAddress() 282 { 283 NetworkAddress ret; 284 ret.family = AddressFamily.UNSPEC; 285 return ret; 286 } 287 288 289 /// Callback invoked for incoming TCP connections. 290 @safe nothrow alias TCPConnectionDelegate = void delegate(TCPConnection stream); 291 /// ditto 292 @safe nothrow alias TCPConnectionFunction = void function(TCPConnection stream); 293 294 295 /** 296 Represents a network/socket address. 297 */ 298 struct NetworkAddress { 299 import std.algorithm.comparison : max; 300 import std.socket : Address; 301 302 version (Windows) import core.sys.windows.winsock2; 303 else import core.sys.posix.netinet.in_; 304 305 version(Posix) import core.sys.posix.sys.un : sockaddr_un; 306 307 @safe: 308 309 private union { 310 sockaddr addr; 311 version (Posix) sockaddr_un addr_unix; 312 sockaddr_in addr_ip4; 313 sockaddr_in6 addr_ip6; 314 } 315 316 enum socklen_t sockAddrMaxLen = max(addr.sizeof, addr_ip6.sizeof); 317 318 319 this(Address addr) 320 @trusted 321 { 322 assert(addr !is null); 323 switch (addr.addressFamily) { 324 default: throw new Exception("Unsupported address family."); 325 case AddressFamily.INET: 326 this.family = AddressFamily.INET; 327 assert(addr.nameLen >= sockaddr_in.sizeof); 328 *this.sockAddrInet4 = *cast(sockaddr_in*)addr.name; 329 break; 330 case AddressFamily.INET6: 331 this.family = AddressFamily.INET6; 332 assert(addr.nameLen >= sockaddr_in6.sizeof); 333 *this.sockAddrInet6 = *cast(sockaddr_in6*)addr.name; 334 break; 335 version (Posix) { 336 case AddressFamily.UNIX: 337 this.family = AddressFamily.UNIX; 338 assert(addr.nameLen >= sockaddr_un.sizeof); 339 *this.sockAddrUnix = *cast(sockaddr_un*)addr.name; 340 break; 341 } 342 } 343 } 344 345 /** Family of the socket address. 346 */ 347 @property ushort family() const pure nothrow { return addr.sa_family; } 348 /// ditto 349 @property void family(AddressFamily val) pure nothrow { addr.sa_family = cast(ubyte)val; } 350 /// ditto 351 @property void family(ushort val) pure nothrow { addr.sa_family = cast(ubyte)val; } 352 353 /** The port in host byte order. 354 */ 355 @property ushort port() 356 const pure nothrow { 357 ushort nport; 358 switch (this.family) { 359 default: assert(false, "port() called for invalid address family."); 360 case AF_INET: nport = addr_ip4.sin_port; break; 361 case AF_INET6: nport = addr_ip6.sin6_port; break; 362 } 363 return () @trusted { return ntoh(nport); } (); 364 } 365 /// ditto 366 @property void port(ushort val) 367 pure nothrow { 368 auto nport = () @trusted { return hton(val); } (); 369 switch (this.family) { 370 default: assert(false, "port() called for invalid address family."); 371 case AF_INET: addr_ip4.sin_port = nport; break; 372 case AF_INET6: addr_ip6.sin6_port = nport; break; 373 } 374 } 375 376 /** A pointer to a sockaddr struct suitable for passing to socket functions. 377 */ 378 @property inout(sockaddr)* sockAddr() inout return pure nothrow { return &addr; } 379 380 /** Size of the sockaddr struct that is returned by sockAddr(). 381 */ 382 @property socklen_t sockAddrLen() 383 const pure nothrow { 384 switch (this.family) { 385 default: assert(false, "sockAddrLen() called for invalid address family."); 386 case AF_INET: return addr_ip4.sizeof; 387 case AF_INET6: return addr_ip6.sizeof; 388 version (Posix) { 389 case AF_UNIX: return addr_unix.sizeof; 390 } 391 } 392 } 393 394 @property inout(sockaddr_in)* sockAddrInet4() inout return pure nothrow 395 in { assert (family == AF_INET); } 396 do { return &addr_ip4; } 397 398 @property inout(sockaddr_in6)* sockAddrInet6() inout return pure nothrow 399 in { assert (family == AF_INET6); } 400 do { return &addr_ip6; } 401 402 version (Posix) { 403 @property inout(sockaddr_un)* sockAddrUnix() inout return pure nothrow 404 in { assert (family == AddressFamily.UNIX); } 405 do { return &addr_unix; } 406 } 407 408 /** Returns a string representation of the IP address 409 */ 410 string toAddressString() 411 const nothrow { 412 import std.array : appender; 413 auto ret = appender!string(); 414 ret.reserve(40); 415 toAddressString(str => ret.put(str)); 416 return ret.data; 417 } 418 /// ditto 419 void toAddressString(scope void delegate(const(char)[]) @safe sink) 420 const nothrow { 421 import std.array : appender; 422 import std.format : formattedWrite; 423 ubyte[2] _dummy = void; // Workaround for DMD regression in master 424 425 scope (failure) assert(false); 426 427 switch (this.family) { 428 default: assert(false, "toAddressString() called for invalid address family."); 429 case AF_UNSPEC: 430 sink("<UNSPEC>"); 431 break; 432 case AF_INET: { 433 ubyte[4] ip = () @trusted { return (cast(ubyte*)&addr_ip4.sin_addr.s_addr)[0 .. 4]; } (); 434 sink.formattedWrite("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3]); 435 } break; 436 case AF_INET6: { 437 ubyte[16] ip = addr_ip6.sin6_addr.s6_addr; 438 foreach (i; 0 .. 8) { 439 if (i > 0) sink(":"); 440 _dummy[] = ip[i*2 .. i*2+2]; 441 sink.formattedWrite("%x", bigEndianToNative!ushort(_dummy)); 442 } 443 } break; 444 version (Posix) { 445 case AddressFamily.UNIX: 446 import std.traits : hasMember; 447 import std.string : fromStringz; 448 static if (hasMember!(sockaddr_un, "sun_len")) 449 sink(() @trusted { return cast(char[])addr_unix.sun_path[0..addr_unix.sun_len]; } ()); 450 else 451 sink(() @trusted { return (cast(char*)addr_unix.sun_path.ptr).fromStringz; } ()); 452 break; 453 } 454 } 455 } 456 457 /** Returns a full string representation of the address, including the port number. 458 */ 459 string toString() 460 const nothrow { 461 import std.array : appender; 462 auto ret = appender!string(); 463 toString(str => ret.put(str)); 464 return ret.data; 465 } 466 /// ditto 467 void toString(scope void delegate(const(char)[]) @safe sink) 468 const nothrow { 469 import std.format : formattedWrite; 470 try { 471 switch (this.family) { 472 default: assert(false, "toString() called for invalid address family."); 473 case AF_UNSPEC: 474 sink("<UNSPEC>"); 475 break; 476 case AF_INET: 477 toAddressString(sink); 478 sink.formattedWrite(":%s", port); 479 break; 480 case AF_INET6: 481 sink("["); 482 toAddressString(sink); 483 sink.formattedWrite("]:%s", port); 484 break; 485 case AddressFamily.UNIX: 486 toAddressString(sink); 487 break; 488 } 489 } catch (Exception e) { 490 assert(false, "Unexpected exception: "~e.msg); 491 } 492 } 493 494 unittest { 495 void test(string ip) { 496 auto res = () @trusted { return resolveHost(ip, AF_UNSPEC, false); } ().toAddressString(); 497 assert(res == ip, 498 "IP "~ip~" yielded wrong string representation: "~res); 499 } 500 test("1.2.3.4"); 501 test("102:304:506:708:90a:b0c:d0e:f10"); 502 } 503 } 504 505 /** 506 Represents a single TCP connection. 507 */ 508 struct TCPConnection { 509 @safe: 510 511 import core.time : seconds; 512 import vibe.internal.array : BatchBuffer; 513 //static assert(isConnectionStream!TCPConnection); 514 515 static struct Context { 516 BatchBuffer!ubyte readBuffer; 517 bool tcpNoDelay = false; 518 bool keepAlive = false; 519 Duration readTimeout = Duration.max; 520 string remoteAddressString; 521 shared(NativeEventDriver) driver; 522 } 523 524 private { 525 StreamSocketFD m_socket; 526 Context* m_context; 527 } 528 529 private this(StreamSocketFD socket, scope RefAddress remote_address) 530 nothrow { 531 import std.exception : enforce; 532 533 m_socket = socket; 534 m_context = () @trusted { return &eventDriver.sockets.userData!Context(socket); } (); 535 m_context.readBuffer.capacity = 4096; 536 m_context.driver = () @trusted { return cast(shared)eventDriver; } (); 537 } 538 539 this(this) 540 nothrow { 541 if (m_socket != StreamSocketFD.invalid) 542 eventDriver.sockets.addRef(m_socket); 543 } 544 545 ~this() 546 nothrow { 547 if (m_socket != StreamSocketFD.invalid) 548 releaseHandle!"sockets"(m_socket, m_context.driver); 549 } 550 551 @property int fd() const nothrow { return cast(int)m_socket; } 552 553 bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != StreamSocketFD.invalid; } 554 555 @property void tcpNoDelay(bool enabled) nothrow { eventDriver.sockets.setTCPNoDelay(m_socket, enabled); m_context.tcpNoDelay = enabled; } 556 @property bool tcpNoDelay() const nothrow { return m_context.tcpNoDelay; } 557 @property void keepAlive(bool enabled) nothrow { eventDriver.sockets.setKeepAlive(m_socket, enabled); m_context.keepAlive = enabled; } 558 @property bool keepAlive() const nothrow { return m_context.keepAlive; } 559 @property void readTimeout(Duration duration) { m_context.readTimeout = duration; } 560 @property Duration readTimeout() const nothrow { return m_context.readTimeout; } 561 @property string peerAddress() const nothrow { return this.remoteAddress.toString(); } 562 @property NetworkAddress localAddress() const nothrow { 563 NetworkAddress naddr; 564 scope addr = new RefAddress(naddr.sockAddr, naddr.sockAddrMaxLen); 565 if (!eventDriver.sockets.getLocalAddress(m_socket, addr)) 566 logWarn("Failed to get local address for TCP connection"); 567 return naddr; 568 } 569 @property NetworkAddress remoteAddress() const nothrow { 570 NetworkAddress naddr; 571 scope addr = new RefAddress(naddr.sockAddr, naddr.sockAddrMaxLen); 572 if (!eventDriver.sockets.getRemoteAddress(m_socket, addr)) 573 logWarn("Failed to get remote address for TCP connection"); 574 return naddr; 575 } 576 @property bool connected() 577 const nothrow { 578 if (m_socket == StreamSocketFD.invalid) return false; 579 auto s = eventDriver.sockets.getConnectionState(m_socket); 580 return s >= ConnectionState.connected && s < ConnectionState.activeClose; 581 } 582 @property bool empty() { return leastSize == 0; } 583 584 @property ulong leastSize() 585 { 586 if (!m_context) return 0; 587 588 auto res = waitForDataEx(m_context.readTimeout); 589 if (res == WaitForDataStatus.timeout) 590 throw new ReadTimeoutException("Read operation timed out"); 591 592 return m_context.readBuffer.length; 593 } 594 595 @property bool dataAvailableForRead() { return waitForData(0.seconds); } 596 597 void close() 598 nothrow { 599 //logInfo("close %s", cast(int)m_fd); 600 if (m_socket != StreamSocketFD.invalid) { 601 eventDriver.sockets.shutdown(m_socket, true, true); 602 releaseHandle!"sockets"(m_socket, m_context.driver); 603 m_socket = StreamSocketFD.invalid; 604 m_context = null; 605 } 606 } 607 608 bool waitForData(Duration timeout = Duration.max) 609 { 610 return waitForDataEx(timeout) == WaitForDataStatus.dataAvailable; 611 } 612 613 WaitForDataStatus waitForDataEx(Duration timeout = Duration.max) 614 { 615 mixin(tracer); 616 if (!m_context) return WaitForDataStatus.noMoreData; 617 if (m_context.readBuffer.length > 0) return WaitForDataStatus.dataAvailable; 618 auto mode = timeout <= 0.seconds ? IOMode.immediate : IOMode.once; 619 620 bool cancelled; 621 IOStatus status; 622 size_t nbytes; 623 624 alias waiter = Waitable!(IOCallback, 625 cb => eventDriver.sockets.read(m_socket, m_context.readBuffer.peekDst(), mode, cb), 626 (cb) { cancelled = true; eventDriver.sockets.cancelRead(m_socket); }, 627 (sock, st, nb) { 628 if (m_socket == StreamSocketFD.invalid) { 629 cancelled = true; 630 return; 631 } 632 assert(sock == m_socket); status = st; nbytes = nb; 633 } 634 ); 635 636 asyncAwaitAny!(true, waiter)(timeout); 637 638 if (!m_context) return WaitForDataStatus.noMoreData; 639 // NOTE: for IOMode.immediate, no actual timeout occurrs, but the read 640 // fails immediately with wouldBlock 641 if (cancelled || status == IOStatus.wouldBlock) 642 return WaitForDataStatus.timeout; 643 644 logTrace("Socket %s, read %s bytes: %s", m_socket, nbytes, status); 645 646 assert(m_context.readBuffer.length == 0); 647 m_context.readBuffer.putN(nbytes); 648 switch (status) { 649 default: 650 logDebug("Error status when waiting for data: %s", status); 651 break; 652 case IOStatus.ok: break; 653 case IOStatus.wouldBlock: assert(mode == IOMode.immediate); break; 654 case IOStatus.disconnected: break; 655 } 656 657 return m_context.readBuffer.length > 0 ? WaitForDataStatus.dataAvailable : WaitForDataStatus.noMoreData; 658 } 659 660 661 /** Waits asynchronously for new data to arrive. 662 663 This function can be used to detach the `TCPConnection` from a 664 running task while waiting for data, so that the associated memory 665 resources are available for other operations. 666 667 Note that `read_ready_callback` may be called from outside of a 668 task, so no blocking operations may be performed. Instead, an existing 669 task should be notified, or a new one started with `runTask`. 670 671 Params: 672 read_ready_callback = A callback taking a `bool` parameter that 673 signals the read-readiness of the connection 674 timeout = Optional timeout to limit the maximum wait time 675 676 Returns: 677 If the read readiness can be determined immediately, it will be 678 returned as `WaitForDataAsyncStatus.dataAvailable` or 679 `WaitForDataAsyncStatus.noModeData` and the callback will not be 680 invoked. Otherwise `WaitForDataAsyncStatus.waiting` is returned 681 and the callback will be invoked once the status can be 682 determined or the specified timeout is reached. 683 */ 684 WaitForDataAsyncStatus waitForDataAsync(CALLABLE)(CALLABLE read_ready_callback, Duration timeout = Duration.max) 685 if (is(typeof(() @safe { read_ready_callback(true); } ()))) 686 { 687 mixin(tracer); 688 import vibe.core.core : Timer, setTimer; 689 690 if (!m_context) 691 return WaitForDataAsyncStatus.noMoreData; 692 693 if (m_context.readBuffer.length > 0) 694 return WaitForDataAsyncStatus.dataAvailable; 695 696 if (timeout <= 0.seconds) { 697 auto rs = waitForData(0.seconds); 698 return rs ? WaitForDataAsyncStatus.dataAvailable : WaitForDataAsyncStatus.noMoreData; 699 } 700 701 static final class WaitContext { 702 import std.algorithm.mutation : move; 703 704 CALLABLE callback; 705 TCPConnection connection; 706 Timer timer; 707 708 this(CALLABLE callback, TCPConnection connection, Duration timeout) 709 { 710 this.callback = callback; 711 this.connection = connection; 712 if (timeout < Duration.max) 713 this.timer = setTimer(timeout, &onTimeout); 714 } 715 716 void onTimeout() 717 { 718 eventDriver.sockets.cancelRead(connection.m_socket); 719 invoke(false); 720 } 721 722 void onData(StreamSocketFD, IOStatus st, size_t nb) 723 { 724 if (timer) timer.stop(); 725 assert(connection.m_context.readBuffer.length == 0); 726 connection.m_context.readBuffer.putN(nb); 727 invoke(connection.m_context.readBuffer.length > 0); 728 } 729 730 void invoke(bool status) 731 { 732 auto cb = move(callback); 733 connection = TCPConnection.init; 734 timer = Timer.init; 735 cb(status); 736 } 737 } 738 739 // FIXME: make this work without a heap allocation! 740 auto context = new WaitContext(read_ready_callback, this, timeout); 741 742 eventDriver.sockets.read(m_socket, m_context.readBuffer.peekDst(), 743 IOMode.once, &context.onData); 744 745 return WaitForDataAsyncStatus.waiting; 746 } 747 748 const(ubyte)[] peek() { return m_context ? m_context.readBuffer.peek() : null; } 749 750 void skip(ulong count) 751 { 752 import std.algorithm.comparison : min; 753 754 m_context.readTimeout.loopWithTimeout!((remaining) { 755 waitForData(remaining); 756 auto n = min(count, m_context.readBuffer.length); 757 m_context.readBuffer.popFrontN(n); 758 count -= n; 759 return count == 0; 760 }, ReadTimeoutException); 761 } 762 763 size_t read(scope ubyte[] dst, IOMode mode) 764 { 765 mixin(tracer); 766 import std.algorithm.comparison : min; 767 if (!dst.length) return 0; 768 if (m_context.readBuffer.length >= dst.length) { 769 m_context.readBuffer.read(dst); 770 return dst.length; 771 } 772 size_t nbytes = 0; 773 m_context.readTimeout.loopWithTimeout!((remaining) { 774 if (m_context.readBuffer.length == 0) { 775 if (mode == IOMode.immediate || mode == IOMode.once && nbytes > 0) 776 return true; 777 enforce(waitForData(remaining), "Reached end of stream while reading data."); 778 } 779 assert(m_context.readBuffer.length > 0); 780 auto l = min(dst.length, m_context.readBuffer.length); 781 m_context.readBuffer.read(dst[0 .. l]); 782 dst = dst[l .. $]; 783 nbytes += l; 784 return dst.length == 0; 785 }, ReadTimeoutException); 786 return nbytes; 787 } 788 789 void read(scope ubyte[] dst) { auto r = read(dst, IOMode.all); assert(r == dst.length); } 790 791 size_t write(in ubyte[] bytes, IOMode mode) 792 { 793 mixin(tracer); 794 if (bytes.length == 0) return 0; 795 796 auto res = asyncAwait!(IOCallback, 797 cb => eventDriver.sockets.write(m_socket, bytes, mode, cb), 798 cb => eventDriver.sockets.cancelWrite(m_socket)); 799 800 switch (res[1]) { 801 default: 802 throw new Exception("Error writing data to socket."); 803 case IOStatus.ok: 804 assert(mode != IOMode.all || res[2] == bytes.length); 805 break; 806 case IOStatus.disconnected: 807 if (mode == IOMode.all && res[2] != bytes.length) 808 throw new Exception("Connection closed while writing data."); 809 break; 810 } 811 812 return res[2]; 813 } 814 815 void write(in ubyte[] bytes) { auto r = write(bytes, IOMode.all); assert(r == bytes.length); } 816 void write(in char[] bytes) { write(cast(const(ubyte)[])bytes); } 817 void write(InputStream stream) { write(stream, 0); } 818 819 void flush() { 820 mixin(tracer); 821 } 822 void finalize() {} 823 void write(InputStream)(InputStream stream, ulong nbytes = 0) if (isInputStream!InputStream) { writeDefault(stream, nbytes); } 824 825 private void writeDefault(InputStream)(InputStream stream, ulong nbytes = 0) 826 if (isInputStream!InputStream) 827 { 828 import std.algorithm.comparison : min; 829 import vibe.internal.allocator : theAllocator, makeArray, dispose; 830 831 scope buffer = () @trusted { return cast(ubyte[]) theAllocator.allocate(64*1024); }(); 832 scope (exit) () @trusted { theAllocator.dispose(buffer); }(); 833 834 //logTrace("default write %d bytes, empty=%s", nbytes, stream.empty); 835 if( nbytes == 0 ){ 836 while( !stream.empty ){ 837 size_t chunk = min(stream.leastSize, buffer.length); 838 assert(chunk > 0, "leastSize returned zero for non-empty stream."); 839 //logTrace("read pipe chunk %d", chunk); 840 stream.read(buffer[0 .. chunk]); 841 write(buffer[0 .. chunk]); 842 } 843 } else { 844 while( nbytes > 0 ){ 845 size_t chunk = min(nbytes, buffer.length); 846 //logTrace("read pipe chunk %d", chunk); 847 stream.read(buffer[0 .. chunk]); 848 write(buffer[0 .. chunk]); 849 nbytes -= chunk; 850 } 851 } 852 } 853 } 854 855 /** Represents possible return values for 856 TCPConnection.waitForDataAsync. 857 */ 858 enum WaitForDataAsyncStatus { 859 noMoreData, 860 dataAvailable, 861 waiting, 862 } 863 864 enum WaitForDataStatus { 865 dataAvailable, 866 noMoreData, 867 timeout 868 } 869 870 unittest { // test compilation of callback with scoped destruction 871 static struct CB { 872 ~this() {} 873 this(this) {} 874 void opCall(bool) {} 875 } 876 877 void test() { 878 TCPConnection c; 879 CB cb; 880 c.waitForDataAsync(cb); 881 } 882 } 883 884 885 mixin validateConnectionStream!TCPConnection; 886 887 private void loopWithTimeout(alias LoopBody, ExceptionType = Exception)(Duration timeout, 888 immutable string timeoutMsg = "Operation timed out.") 889 { 890 import core.time : seconds, MonoTime; 891 892 MonoTime now; 893 if (timeout != Duration.max) 894 now = MonoTime.currTime(); 895 896 do { 897 if (LoopBody(timeout)) 898 return; 899 900 if (timeout != Duration.max) { 901 auto prev = now; 902 now = MonoTime.currTime(); 903 if (now > prev) timeout -= now - prev; 904 } 905 } while (timeout > 0.seconds); 906 907 throw new ExceptionType(timeoutMsg); 908 } 909 910 911 /** 912 Represents a listening TCP socket. 913 */ 914 struct TCPListener { 915 // FIXME: copying may lead to dangling FDs - this somehow needs to employ reference counting without breaking 916 // the previous behavior of keeping the socket alive when the listener isn't stored. At the same time, 917 // stopListening() needs to keep working. 918 private { 919 static struct Context { 920 shared(NativeEventDriver) driver; 921 } 922 923 StreamListenSocketFD m_socket; 924 Context* m_context; 925 } 926 927 this(StreamListenSocketFD socket) 928 { 929 m_socket = socket; 930 m_context = () @trusted { return &eventDriver.sockets.userData!Context(m_socket); } (); 931 m_context.driver = () @trusted { return cast(shared)eventDriver; } (); 932 } 933 934 bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != StreamListenSocketFD.invalid; } 935 936 /// The local address at which TCP connections are accepted. 937 @property NetworkAddress bindAddress() 938 { 939 NetworkAddress ret; 940 scope ra = new RefAddress(ret.sockAddr, ret.sockAddrMaxLen); 941 enforce(eventDriver.sockets.getLocalAddress(m_socket, ra), 942 "Failed to query bind address of listening socket."); 943 return ret; 944 } 945 946 /// Stops listening and closes the socket. 947 void stopListening() 948 { 949 if (m_socket != StreamListenSocketFD.invalid) { 950 releaseHandle!"sockets"(m_socket, m_context.driver); 951 m_socket = StreamListenSocketFD.invalid; 952 } 953 } 954 } 955 956 957 /** 958 Represents a bound and possibly 'connected' UDP socket. 959 */ 960 struct UDPConnection { 961 static struct Context { 962 bool canBroadcast; 963 shared(NativeEventDriver) driver; 964 } 965 966 private { 967 DatagramSocketFD m_socket; 968 Context* m_context; 969 } 970 971 private this(ref NetworkAddress bind_address, UDPListenOptions options) 972 { 973 DatagramCreateOptions copts; 974 if (options & UDPListenOptions.reuseAddress) copts |= DatagramCreateOptions.reuseAddress; 975 if (options & UDPListenOptions.reusePort) copts |= DatagramCreateOptions.reusePort; 976 977 scope baddr = new RefAddress(bind_address.sockAddr, bind_address.sockAddrLen); 978 m_socket = eventDriver.sockets.createDatagramSocket(baddr, null, copts); 979 enforce(m_socket != DatagramSocketFD.invalid, "Failed to create datagram socket."); 980 m_context = () @trusted { return &eventDriver.sockets.userData!Context(m_socket); } (); 981 m_context.driver = () @trusted { return cast(shared)eventDriver; } (); 982 } 983 984 985 this(this) 986 nothrow { 987 if (m_socket != DatagramSocketFD.invalid) 988 eventDriver.sockets.addRef(m_socket); 989 } 990 991 ~this() 992 nothrow { 993 if (m_socket != DatagramSocketFD.invalid) 994 releaseHandle!"sockets"(m_socket, m_context.driver); 995 } 996 997 @property int fd() const nothrow { return cast(int)m_socket; } 998 999 bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != DatagramSocketFD.invalid; } 1000 1001 /** Returns the address to which the UDP socket is bound. 1002 */ 1003 @property string bindAddress() const { return localAddress.toString(); } 1004 1005 /** Determines if the socket is allowed to send to broadcast addresses. 1006 */ 1007 @property bool canBroadcast() const { return m_context.canBroadcast; } 1008 /// ditto 1009 @property void canBroadcast(bool val) { enforce(eventDriver.sockets.setBroadcast(m_socket, val), "Failed to set UDP broadcast flag."); m_context.canBroadcast = val; } 1010 1011 /// The local/bind address of the underlying socket. 1012 @property NetworkAddress localAddress() const nothrow { 1013 NetworkAddress naddr; 1014 scope addr = new RefAddress(naddr.sockAddr, naddr.sockAddrMaxLen); 1015 try { 1016 enforce(eventDriver.sockets.getLocalAddress(m_socket, addr), "Failed to query socket address."); 1017 } catch (Exception e) { logWarn("Failed to get local address for TCP connection: %s", e.msg); } 1018 return naddr; 1019 } 1020 1021 /** Set IP multicast loopback mode. 1022 1023 This is on by default. All packets send will also loopback if enabled. 1024 Useful if more than one application is running on same host and both need each other's packets. 1025 */ 1026 @property void multicastLoopback(bool loop) 1027 { 1028 enforce(eventDriver.sockets.setOption(m_socket, DatagramSocketOption.multicastLoopback, loop), 1029 "Failed to set multicast loopback mode."); 1030 } 1031 1032 /** Become a member of an IP multicast group. 1033 1034 The multiaddr parameter should be in the range 239.0.0.0-239.255.255.255. 1035 See https://www.iana.org/assignments/multicast-addresses/multicast-addresses.xml#multicast-addresses-12 1036 and https://www.iana.org/assignments/ipv6-multicast-addresses/ipv6-multicast-addresses.xhtml 1037 */ 1038 void addMembership(ref NetworkAddress multiaddr, uint interface_index = 0) 1039 { 1040 scope addr = new RefAddress(multiaddr.sockAddr, multiaddr.sockAddrMaxLen); 1041 enforce(eventDriver.sockets.joinMulticastGroup(m_socket, addr, interface_index), 1042 "Failed to add multicast membership."); 1043 } 1044 1045 /** Stops listening for datagrams and frees all resources. 1046 */ 1047 void close() 1048 { 1049 if (m_socket != DatagramSocketFD.invalid) { 1050 releaseHandle!"sockets"(m_socket, m_context.driver); 1051 m_socket = DatagramSocketFD.init; 1052 m_context = null; 1053 } 1054 } 1055 1056 /** Locks the UDP connection to a certain peer. 1057 1058 Once connected, the UDPConnection can only communicate with the specified peer. 1059 Otherwise communication with any reachable peer is possible. 1060 */ 1061 void connect(string host, ushort port) 1062 { 1063 auto address = resolveHost(host); 1064 address.port = port; 1065 connect(address); 1066 } 1067 /// ditto 1068 void connect(NetworkAddress address) 1069 { 1070 scope addr = new RefAddress(address.sockAddr, address.sockAddrLen); 1071 eventDriver.sockets.setTargetAddress(m_socket, addr); 1072 } 1073 1074 /** Sends a single packet. 1075 1076 If peer_address is given, the packet is send to that address. Otherwise the packet 1077 will be sent to the address specified by a call to connect(). 1078 */ 1079 void send(in ubyte[] data, in NetworkAddress* peer_address = null) 1080 { 1081 scope addrc = new RefAddress; 1082 if (peer_address) 1083 addrc.set(() @trusted { return (cast(NetworkAddress*)peer_address).sockAddr; } (), peer_address.sockAddrLen); 1084 1085 IOStatus status; 1086 size_t nbytes; 1087 bool cancelled; 1088 1089 alias waitable = Waitable!(DatagramIOCallback, 1090 cb => eventDriver.sockets.send(m_socket, data, IOMode.once, peer_address ? addrc : null, cb), 1091 (cb) { cancelled = true; eventDriver.sockets.cancelSend(m_socket); }, 1092 (DatagramSocketFD, IOStatus status_, size_t nbytes_, scope RefAddress addr) 1093 { 1094 status = status_; 1095 nbytes = nbytes_; 1096 } 1097 ); 1098 1099 asyncAwaitAny!(true, waitable); 1100 1101 enforce(!cancelled && status == IOStatus.ok, "Failed to send packet."); 1102 enforce(nbytes == data.length, "Packet was only sent partially."); 1103 } 1104 1105 /** Receives a single packet. 1106 1107 If a buffer is given, it must be large enough to hold the full packet. 1108 1109 The timeout overload will throw an Exception if no data arrives before the 1110 specified duration has elapsed. 1111 */ 1112 ubyte[] recv(ubyte[] buf = null, NetworkAddress* peer_address = null) 1113 { 1114 return recv(Duration.max, buf, peer_address); 1115 } 1116 /// ditto 1117 ubyte[] recv(Duration timeout, ubyte[] buf = null, NetworkAddress* peer_address = null) 1118 { 1119 import std.socket : Address; 1120 if (buf.length == 0) buf = new ubyte[65536]; 1121 1122 IOStatus status; 1123 size_t nbytes; 1124 bool cancelled; 1125 1126 alias waitable = Waitable!(DatagramIOCallback, 1127 cb => eventDriver.sockets.receive(m_socket, buf, IOMode.once, cb), 1128 (cb) { cancelled = true; eventDriver.sockets.cancelReceive(m_socket); }, 1129 (DatagramSocketFD, IOStatus status_, size_t nbytes_, scope RefAddress addr) 1130 { 1131 status = status_; 1132 nbytes = nbytes_; 1133 if (status_ == IOStatus.ok && peer_address) { 1134 try *peer_address = NetworkAddress(addr); 1135 catch (Exception e) logWarn("Failed to store datagram source address: %s", e.msg); 1136 } 1137 } 1138 ); 1139 1140 asyncAwaitAny!(true, waitable)(timeout); 1141 enforce(!cancelled, "Receive timeout."); 1142 enforce(status == IOStatus.ok, "Failed to receive packet."); 1143 return buf[0 .. nbytes]; 1144 } 1145 } 1146 1147 1148 /** 1149 Flags to control the behavior of listenTCP. 1150 */ 1151 enum TCPListenOptions { 1152 /// Don't enable any particular option 1153 none = 0, 1154 /// Deprecated: causes incoming connections to be distributed across the thread pool 1155 distribute = 1<<0, 1156 /// Disables automatic closing of the connection when the connection callback exits 1157 disableAutoClose = 1<<1, 1158 /** Enable port reuse on linux kernel version >=3.9, do nothing on other OS 1159 Does not affect libasync driver because it is always enabled by libasync. 1160 */ 1161 reusePort = 1<<2, 1162 /// Enable address reuse 1163 reuseAddress = 1<<3, 1164 /// 1165 defaults = reuseAddress 1166 } 1167 1168 1169 /** Flags to control the behavior of `listenUDP` 1170 */ 1171 enum UDPListenOptions { 1172 /// No options 1173 none = 0, 1174 /// Enable address reuse 1175 reuseAddress = 1 << 0, 1176 /// Enable port reuse 1177 reusePort = 1 << 1 1178 } 1179 1180 private pure nothrow { 1181 import std.bitmanip; 1182 1183 ushort ntoh(ushort val) 1184 { 1185 version (LittleEndian) return swapEndian(val); 1186 else version (BigEndian) return val; 1187 else static assert(false, "Unknown endianness."); 1188 } 1189 1190 ushort hton(ushort val) 1191 { 1192 version (LittleEndian) return swapEndian(val); 1193 else version (BigEndian) return val; 1194 else static assert(false, "Unknown endianness."); 1195 } 1196 } 1197 1198 private enum tracer = ""; 1199 1200 1201 /// Thrown by TCPConnection read-alike operations when timeout is reached. 1202 class ReadTimeoutException: Exception 1203 { 1204 @safe pure nothrow @nogc: 1205 this(string message, Throwable next, string file =__FILE__, size_t line = __LINE__) 1206 { 1207 super(message, next, file, line); 1208 } 1209 1210 this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) 1211 { 1212 super(message, file, line, next); 1213 } 1214 } 1215 1216 1217 // check whether the given name is not a valid host name, but may instead 1218 // be a valid IP address. This is used as a quick check to avoid 1219 // unnecessary address parsing or DNS queries 1220 private bool isMaybeIPAddress(in char[] name) 1221 { 1222 import std.algorithm.searching : all, canFind; 1223 import std.ascii : isDigit; 1224 1225 // could be an IPv6 address, but ':' is invalid in host names 1226 if (name.canFind(':')) return true; 1227 1228 // an IPv4 address is at least 7 characters (0.0.0.0) 1229 if (name.length < 7) return false; 1230 1231 // no valid TLD consists of only digits 1232 return name.canFind('.') && name.all!(ch => ch.isDigit || ch == '.'); 1233 } 1234 1235 unittest { 1236 assert(isMaybeIPAddress("0.0.0.0")); 1237 assert(isMaybeIPAddress("::1")); 1238 assert(isMaybeIPAddress("aabb::1f")); 1239 assert(!isMaybeIPAddress("example.com")); 1240 assert(!isMaybeIPAddress("12.com")); 1241 assert(!isMaybeIPAddress("1.1.1.t12")); 1242 }