1 /** 2 TCP/UDP connection and server handling. 3 4 Copyright: © 2012-2016 Sönke Ludwig 5 Authors: Sönke Ludwig 6 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 7 */ 8 module vibe.core.net; 9 10 import eventcore.core; 11 import std.exception : enforce; 12 import std.format : format; 13 import std.functional : toDelegate; 14 import std.socket : AddressFamily, UnknownAddress; 15 import vibe.core.internal.release; 16 import vibe.core.log; 17 import vibe.core.stream; 18 import vibe.internal.async; 19 import core.time : Duration; 20 21 @safe: 22 23 24 /** 25 Resolves the given host name/IP address string. 26 27 Setting use_dns to false will only allow IP address strings but also guarantees 28 that the call will not block. 29 */ 30 NetworkAddress resolveHost(string host, AddressFamily address_family = AddressFamily.UNSPEC, bool use_dns = true) 31 { 32 return resolveHost(host, cast(ushort)address_family, use_dns); 33 } 34 /// ditto 35 NetworkAddress resolveHost(string host, ushort address_family, bool use_dns = true) 36 { 37 import std.socket : parseAddress; 38 version (Windows) import core.sys.windows.winsock2 : sockaddr_in, sockaddr_in6; 39 else import core.sys.posix.netinet.in_ : sockaddr_in, sockaddr_in6; 40 41 enforce(host.length > 0, "Host name must not be empty."); 42 if (isMaybeIPAddress(host)) { 43 auto addr = parseAddress(host); 44 enforce(address_family == AddressFamily.UNSPEC || addr.addressFamily == address_family); 45 NetworkAddress ret; 46 ret.family = addr.addressFamily; 47 switch (addr.addressFamily) with(AddressFamily) { 48 default: throw new Exception("Unsupported address family"); 49 case INET: *ret.sockAddrInet4 = () @trusted { return *cast(sockaddr_in*)addr.name; } (); break; 50 case INET6: *ret.sockAddrInet6 = () @trusted { return *cast(sockaddr_in6*)addr.name; } (); break; 51 } 52 return ret; 53 } else { 54 enforce(use_dns, "Malformed IP address string."); 55 NetworkAddress res; 56 bool success = false; 57 alias waitable = Waitable!(DNSLookupCallback, 58 cb => eventDriver.dns.lookupHost(host, cb), 59 (cb, id) => eventDriver.dns.cancelLookup(id), 60 (DNSLookupID, DNSStatus status, scope RefAddress[] addrs) { 61 if (status == DNSStatus.ok && addrs.length > 0) { 62 try res = NetworkAddress(addrs[0]); 63 catch (Exception e) { logDiagnostic("Failed to store address from DNS lookup: %s", e.msg); } 64 success = true; 65 } 66 } 67 ); 68 69 asyncAwaitAny!(true, waitable); 70 71 enforce(success, "Failed to lookup host '"~host~"'."); 72 return res; 73 } 74 } 75 76 77 /** 78 Starts listening on the specified port. 79 80 'connection_callback' will be called for each client that connects to the 81 server socket. Each new connection gets its own fiber. The stream parameter 82 then allows to perform blocking I/O on the client socket. 83 84 The address parameter can be used to specify the network 85 interface on which the server socket is supposed to listen for connections. 86 By default, all IPv4 and IPv6 interfaces will be used. 87 */ 88 TCPListener[] listenTCP(ushort port, TCPConnectionDelegate connection_callback, TCPListenOptions options = TCPListenOptions.defaults) 89 { 90 TCPListener[] ret; 91 try ret ~= listenTCP(port, connection_callback, "::", options); 92 catch (Exception e) logDiagnostic("Failed to listen on \"::\": %s", e.msg); 93 try ret ~= listenTCP(port, connection_callback, "0.0.0.0", options); 94 catch (Exception e) logDiagnostic("Failed to listen on \"0.0.0.0\": %s", e.msg); 95 enforce(ret.length > 0, format("Failed to listen on all interfaces on port %s", port)); 96 return ret; 97 } 98 /// ditto 99 TCPListener listenTCP(ushort port, TCPConnectionDelegate connection_callback, string address, TCPListenOptions options = TCPListenOptions.defaults) 100 { 101 auto addr = resolveHost(address); 102 addr.port = port; 103 StreamListenOptions sopts = StreamListenOptions.defaults; 104 if (options & TCPListenOptions.reuseAddress) 105 sopts |= StreamListenOptions.reuseAddress; 106 else 107 sopts &= ~StreamListenOptions.reuseAddress; 108 if (options & TCPListenOptions.reusePort) 109 sopts |= StreamListenOptions.reusePort; 110 else 111 sopts &= ~StreamListenOptions.reusePort; 112 scope addrc = new RefAddress(addr.sockAddr, addr.sockAddrLen); 113 auto sock = eventDriver.sockets.listenStream(addrc, sopts, 114 (StreamListenSocketFD ls, StreamSocketFD s, scope RefAddress addr) @safe nothrow { 115 import vibe.core.core : runTask; 116 auto conn = TCPConnection(s, addr); 117 runTask(connection_callback, conn); 118 }); 119 enforce(sock != StreamListenSocketFD.invalid, "Failed to listen on "~addr.toString()); 120 return TCPListener(sock); 121 } 122 123 /// Compatibility overload - use an `@safe nothrow` callback instead. 124 deprecated("Use a @safe nothrow callback instead.") 125 TCPListener[] listenTCP(ushort port, void delegate(TCPConnection) connection_callback, TCPListenOptions options = TCPListenOptions.defaults) 126 { 127 TCPListener[] ret; 128 try ret ~= listenTCP(port, connection_callback, "::", options); 129 catch (Exception e) logDiagnostic("Failed to listen on \"::\": %s", e.msg); 130 try ret ~= listenTCP(port, connection_callback, "0.0.0.0", options); 131 catch (Exception e) logDiagnostic("Failed to listen on \"0.0.0.0\": %s", e.msg); 132 enforce(ret.length > 0, format("Failed to listen on all interfaces on port %s", port)); 133 return ret; 134 } 135 /// ditto 136 deprecated("Use a @safe nothrow callback instead.") 137 TCPListener listenTCP(ushort port, void delegate(TCPConnection) connection_callback, string address, TCPListenOptions options = TCPListenOptions.defaults) 138 { 139 return listenTCP(port, (conn) @trusted nothrow { 140 try connection_callback(conn); 141 catch (Exception e) { 142 logError("Handling of connection failed: %s", e.msg); 143 conn.close(); 144 logDebug("Full error: %s", e); 145 } 146 }, address, options); 147 } 148 149 /** 150 Starts listening on the specified port. 151 152 This function is the same as listenTCP but takes a function callback instead of a delegate. 153 */ 154 TCPListener[] listenTCP_s(ushort port, TCPConnectionFunction connection_callback, TCPListenOptions options = TCPListenOptions.defaults) @trusted 155 { 156 return listenTCP(port, toDelegate(connection_callback), options); 157 } 158 /// ditto 159 TCPListener listenTCP_s(ushort port, TCPConnectionFunction connection_callback, string address, TCPListenOptions options = TCPListenOptions.defaults) @trusted 160 { 161 return listenTCP(port, toDelegate(connection_callback), address, options); 162 } 163 164 /** 165 Establishes a connection to the given host/port. 166 */ 167 TCPConnection connectTCP(string host, ushort port, string bind_interface = null, 168 ushort bind_port = 0, Duration timeout = Duration.max) 169 { 170 NetworkAddress addr = resolveHost(host); 171 addr.port = port; 172 if (addr.family != AddressFamily.UNIX) 173 addr.port = port; 174 175 NetworkAddress bind_address; 176 if (bind_interface.length) bind_address = resolveHost(bind_interface, addr.family); 177 else { 178 bind_address.family = addr.family; 179 if (bind_address.family == AddressFamily.INET) bind_address.sockAddrInet4.sin_addr.s_addr = 0; 180 else if (bind_address.family != AddressFamily.UNIX) bind_address.sockAddrInet6.sin6_addr.s6_addr[] = 0; 181 } 182 if (addr.family != AddressFamily.UNIX) 183 bind_address.port = bind_port; 184 185 return connectTCP(addr, bind_address, timeout); 186 } 187 /// ditto 188 TCPConnection connectTCP(NetworkAddress addr, NetworkAddress bind_address = anyAddress, 189 Duration timeout = Duration.max) 190 { 191 import std.conv : to; 192 193 if (bind_address.family == AddressFamily.UNSPEC) { 194 bind_address.family = addr.family; 195 if (bind_address.family == AddressFamily.INET) bind_address.sockAddrInet4.sin_addr.s_addr = 0; 196 else if (bind_address.family != AddressFamily.UNIX) bind_address.sockAddrInet6.sin6_addr.s6_addr[] = 0; 197 if (bind_address.family != AddressFamily.UNIX) 198 bind_address.port = 0; 199 } 200 enforce(addr.family == bind_address.family, "Destination address and bind address have different address families."); 201 202 return () @trusted { // scope 203 scope uaddr = new RefAddress(addr.sockAddr, addr.sockAddrLen); 204 scope baddr = new RefAddress(bind_address.sockAddr, bind_address.sockAddrLen); 205 206 bool cancelled; 207 StreamSocketFD sock; 208 ConnectStatus status; 209 210 alias waiter = Waitable!(ConnectCallback, 211 cb => eventDriver.sockets.connectStream(uaddr, baddr, cb), 212 (cb, sock_fd) { 213 cancelled = true; 214 if (sock_fd != SocketFD.invalid) 215 eventDriver.sockets.cancelConnectStream(sock_fd); 216 }, 217 (fd, st) { sock = fd; status = st; } 218 ); 219 220 asyncAwaitAny!(true, waiter)(timeout); 221 222 enforce(!cancelled, "Failed to connect to " ~ addr.toString() ~ 223 ": timeout"); 224 225 if (status != ConnectStatus.connected) { 226 if (sock != SocketFD.invalid) 227 eventDriver.sockets.releaseRef(sock); 228 229 enforce(false, "Failed to connect to "~addr.toString()~": "~status.to!string); 230 assert(false); 231 } 232 233 return TCPConnection(sock, uaddr); 234 } (); 235 } 236 237 238 /** 239 Creates a bound UDP socket suitable for sending and receiving packets. 240 */ 241 UDPConnection listenUDP(ref NetworkAddress addr) 242 { 243 return UDPConnection(addr); 244 } 245 /// ditto 246 UDPConnection listenUDP(ushort port, string bind_address = "0.0.0.0") 247 { 248 auto addr = resolveHost(bind_address, AddressFamily.UNSPEC, false); 249 addr.port = port; 250 return UDPConnection(addr); 251 } 252 253 NetworkAddress anyAddress() 254 { 255 NetworkAddress ret; 256 ret.family = AddressFamily.UNSPEC; 257 return ret; 258 } 259 260 261 /// Callback invoked for incoming TCP connections. 262 @safe nothrow alias TCPConnectionDelegate = void delegate(TCPConnection stream); 263 /// ditto 264 @safe nothrow alias TCPConnectionFunction = void function(TCPConnection stream); 265 266 267 /** 268 Represents a network/socket address. 269 */ 270 struct NetworkAddress { 271 import std.algorithm.comparison : max; 272 import std.socket : Address; 273 274 version (Windows) import core.sys.windows.winsock2; 275 else import core.sys.posix.netinet.in_; 276 277 version(Posix) import core.sys.posix.sys.un : sockaddr_un; 278 279 @safe: 280 281 private union { 282 sockaddr addr; 283 version (Posix) sockaddr_un addr_unix; 284 sockaddr_in addr_ip4; 285 sockaddr_in6 addr_ip6; 286 } 287 288 enum socklen_t sockAddrMaxLen = max(addr.sizeof, addr_ip6.sizeof); 289 290 291 this(Address addr) 292 @trusted 293 { 294 assert(addr !is null); 295 switch (addr.addressFamily) { 296 default: throw new Exception("Unsupported address family."); 297 case AddressFamily.INET: 298 this.family = AddressFamily.INET; 299 assert(addr.nameLen >= sockaddr_in.sizeof); 300 *this.sockAddrInet4 = *cast(sockaddr_in*)addr.name; 301 break; 302 case AddressFamily.INET6: 303 this.family = AddressFamily.INET6; 304 assert(addr.nameLen >= sockaddr_in6.sizeof); 305 *this.sockAddrInet6 = *cast(sockaddr_in6*)addr.name; 306 break; 307 version (Posix) { 308 case AddressFamily.UNIX: 309 this.family = AddressFamily.UNIX; 310 assert(addr.nameLen >= sockaddr_un.sizeof); 311 *this.sockAddrUnix = *cast(sockaddr_un*)addr.name; 312 break; 313 } 314 } 315 } 316 317 /** Family of the socket address. 318 */ 319 @property ushort family() const pure nothrow { return addr.sa_family; } 320 /// ditto 321 @property void family(AddressFamily val) pure nothrow { addr.sa_family = cast(ubyte)val; } 322 /// ditto 323 @property void family(ushort val) pure nothrow { addr.sa_family = cast(ubyte)val; } 324 325 /** The port in host byte order. 326 */ 327 @property ushort port() 328 const pure nothrow { 329 ushort nport; 330 switch (this.family) { 331 default: assert(false, "port() called for invalid address family."); 332 case AF_INET: nport = addr_ip4.sin_port; break; 333 case AF_INET6: nport = addr_ip6.sin6_port; break; 334 } 335 return () @trusted { return ntoh(nport); } (); 336 } 337 /// ditto 338 @property void port(ushort val) 339 pure nothrow { 340 auto nport = () @trusted { return hton(val); } (); 341 switch (this.family) { 342 default: assert(false, "port() called for invalid address family."); 343 case AF_INET: addr_ip4.sin_port = nport; break; 344 case AF_INET6: addr_ip6.sin6_port = nport; break; 345 } 346 } 347 348 /** A pointer to a sockaddr struct suitable for passing to socket functions. 349 */ 350 @property inout(sockaddr)* sockAddr() inout pure nothrow { return &addr; } 351 352 /** Size of the sockaddr struct that is returned by sockAddr(). 353 */ 354 @property socklen_t sockAddrLen() 355 const pure nothrow { 356 switch (this.family) { 357 default: assert(false, "sockAddrLen() called for invalid address family."); 358 case AF_INET: return addr_ip4.sizeof; 359 case AF_INET6: return addr_ip6.sizeof; 360 version (Posix) { 361 case AF_UNIX: return addr_unix.sizeof; 362 } 363 } 364 } 365 366 @property inout(sockaddr_in)* sockAddrInet4() inout pure nothrow 367 in { assert (family == AF_INET); } 368 do { return &addr_ip4; } 369 370 @property inout(sockaddr_in6)* sockAddrInet6() inout pure nothrow 371 in { assert (family == AF_INET6); } 372 do { return &addr_ip6; } 373 374 version (Posix) { 375 @property inout(sockaddr_un)* sockAddrUnix() inout pure nothrow 376 in { assert (family == AddressFamily.UNIX); } 377 do { return &addr_unix; } 378 } 379 380 /** Returns a string representation of the IP address 381 */ 382 string toAddressString() 383 const nothrow { 384 import std.array : appender; 385 auto ret = appender!string(); 386 ret.reserve(40); 387 toAddressString(str => ret.put(str)); 388 return ret.data; 389 } 390 /// ditto 391 void toAddressString(scope void delegate(const(char)[]) @safe sink) 392 const nothrow { 393 import std.array : appender; 394 import std.format : formattedWrite; 395 ubyte[2] _dummy = void; // Workaround for DMD regression in master 396 397 scope (failure) assert(false); 398 399 switch (this.family) { 400 default: assert(false, "toAddressString() called for invalid address family."); 401 case AF_UNSPEC: 402 sink("<UNSPEC>"); 403 break; 404 case AF_INET: { 405 ubyte[4] ip = () @trusted { return (cast(ubyte*)&addr_ip4.sin_addr.s_addr)[0 .. 4]; } (); 406 sink.formattedWrite("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3]); 407 } break; 408 case AF_INET6: { 409 ubyte[16] ip = addr_ip6.sin6_addr.s6_addr; 410 foreach (i; 0 .. 8) { 411 if (i > 0) sink(":"); 412 _dummy[] = ip[i*2 .. i*2+2]; 413 sink.formattedWrite("%x", bigEndianToNative!ushort(_dummy)); 414 } 415 } break; 416 version (Posix) { 417 case AddressFamily.UNIX: 418 import std.traits : hasMember; 419 import std..string : fromStringz; 420 static if (hasMember!(sockaddr_un, "sun_len")) 421 sink(() @trusted { return cast(char[])addr_unix.sun_path[0..addr_unix.sun_len]; } ()); 422 else 423 sink(() @trusted { return (cast(char*)addr_unix.sun_path.ptr).fromStringz; } ()); 424 break; 425 } 426 } 427 } 428 429 /** Returns a full string representation of the address, including the port number. 430 */ 431 string toString() 432 const nothrow { 433 import std.array : appender; 434 auto ret = appender!string(); 435 toString(str => ret.put(str)); 436 return ret.data; 437 } 438 /// ditto 439 void toString(scope void delegate(const(char)[]) @safe sink) 440 const nothrow { 441 import std.format : formattedWrite; 442 try { 443 switch (this.family) { 444 default: assert(false, "toString() called for invalid address family."); 445 case AF_UNSPEC: 446 sink("<UNSPEC>"); 447 break; 448 case AF_INET: 449 toAddressString(sink); 450 sink.formattedWrite(":%s", port); 451 break; 452 case AF_INET6: 453 sink("["); 454 toAddressString(sink); 455 sink.formattedWrite("]:%s", port); 456 break; 457 case AddressFamily.UNIX: 458 toAddressString(sink); 459 break; 460 } 461 } catch (Exception e) { 462 assert(false, "Unexpected exception: "~e.msg); 463 } 464 } 465 466 version(Have_libev) {} 467 else { 468 unittest { 469 void test(string ip) { 470 auto res = () @trusted { return resolveHost(ip, AF_UNSPEC, false); } ().toAddressString(); 471 assert(res == ip, 472 "IP "~ip~" yielded wrong string representation: "~res); 473 } 474 test("1.2.3.4"); 475 test("102:304:506:708:90a:b0c:d0e:f10"); 476 } 477 } 478 } 479 480 /** 481 Represents a single TCP connection. 482 */ 483 struct TCPConnection { 484 @safe: 485 486 import core.time : seconds; 487 import vibe.internal.array : BatchBuffer; 488 //static assert(isConnectionStream!TCPConnection); 489 490 static struct Context { 491 BatchBuffer!ubyte readBuffer; 492 bool tcpNoDelay = false; 493 bool keepAlive = false; 494 Duration readTimeout = Duration.max; 495 string remoteAddressString; 496 shared(NativeEventDriver) driver; 497 } 498 499 private { 500 StreamSocketFD m_socket; 501 Context* m_context; 502 } 503 504 private this(StreamSocketFD socket, scope RefAddress remote_address) 505 nothrow { 506 import std.exception : enforce; 507 508 m_socket = socket; 509 m_context = () @trusted { return &eventDriver.sockets.userData!Context(socket); } (); 510 m_context.readBuffer.capacity = 4096; 511 m_context.driver = () @trusted { return cast(shared)eventDriver; } (); 512 } 513 514 this(this) 515 nothrow { 516 if (m_socket != StreamSocketFD.invalid) 517 eventDriver.sockets.addRef(m_socket); 518 } 519 520 ~this() 521 nothrow { 522 if (m_socket != StreamSocketFD.invalid) 523 releaseHandle!"sockets"(m_socket, m_context.driver); 524 } 525 526 bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != StreamSocketFD.invalid; } 527 528 @property void tcpNoDelay(bool enabled) nothrow { eventDriver.sockets.setTCPNoDelay(m_socket, enabled); m_context.tcpNoDelay = enabled; } 529 @property bool tcpNoDelay() const nothrow { return m_context.tcpNoDelay; } 530 @property void keepAlive(bool enabled) nothrow { eventDriver.sockets.setKeepAlive(m_socket, enabled); m_context.keepAlive = enabled; } 531 @property bool keepAlive() const nothrow { return m_context.keepAlive; } 532 @property void readTimeout(Duration duration) { m_context.readTimeout = duration; } 533 @property Duration readTimeout() const nothrow { return m_context.readTimeout; } 534 @property string peerAddress() const nothrow { return this.remoteAddress.toString(); } 535 @property NetworkAddress localAddress() const nothrow { 536 NetworkAddress naddr; 537 scope addr = new RefAddress(naddr.sockAddr, naddr.sockAddrMaxLen); 538 if (!eventDriver.sockets.getLocalAddress(m_socket, addr)) 539 logWarn("Failed to get local address for TCP connection"); 540 return naddr; 541 } 542 @property NetworkAddress remoteAddress() const nothrow { 543 NetworkAddress naddr; 544 scope addr = new RefAddress(naddr.sockAddr, naddr.sockAddrMaxLen); 545 if (!eventDriver.sockets.getRemoteAddress(m_socket, addr)) 546 logWarn("Failed to get remote address for TCP connection"); 547 return naddr; 548 } 549 @property bool connected() 550 const nothrow { 551 if (m_socket == StreamSocketFD.invalid) return false; 552 auto s = eventDriver.sockets.getConnectionState(m_socket); 553 return s >= ConnectionState.connected && s < ConnectionState.activeClose; 554 } 555 @property bool empty() { return leastSize == 0; } 556 557 @property ulong leastSize() 558 { 559 if (!m_context) return 0; 560 561 auto res = waitForDataEx(m_context.readTimeout); 562 if (res == WaitForDataStatus.timeout) 563 throw new ReadTimeoutException("Read operation timed out"); 564 565 return m_context.readBuffer.length; 566 } 567 568 @property bool dataAvailableForRead() { return waitForData(0.seconds); } 569 570 void close() 571 nothrow { 572 //logInfo("close %s", cast(int)m_fd); 573 if (m_socket != StreamSocketFD.invalid) { 574 eventDriver.sockets.shutdown(m_socket, true, true); 575 releaseHandle!"sockets"(m_socket, m_context.driver); 576 m_socket = StreamSocketFD.invalid; 577 m_context = null; 578 } 579 } 580 581 bool waitForData(Duration timeout = Duration.max) 582 { 583 return waitForDataEx(timeout) == WaitForDataStatus.dataAvailable; 584 } 585 586 WaitForDataStatus waitForDataEx(Duration timeout = Duration.max) 587 { 588 mixin(tracer); 589 if (!m_context) return WaitForDataStatus.noMoreData; 590 if (m_context.readBuffer.length > 0) return WaitForDataStatus.dataAvailable; 591 auto mode = timeout <= 0.seconds ? IOMode.immediate : IOMode.once; 592 593 bool cancelled; 594 IOStatus status; 595 size_t nbytes; 596 597 alias waiter = Waitable!(IOCallback, 598 cb => eventDriver.sockets.read(m_socket, m_context.readBuffer.peekDst(), mode, cb), 599 (cb) { cancelled = true; eventDriver.sockets.cancelRead(m_socket); }, 600 (sock, st, nb) { 601 if (m_socket == StreamSocketFD.invalid) { 602 cancelled = true; 603 return; 604 } 605 assert(sock == m_socket); status = st; nbytes = nb; 606 } 607 ); 608 609 asyncAwaitAny!(true, waiter)(timeout); 610 611 if (!m_context) return WaitForDataStatus.noMoreData; 612 if (cancelled) return WaitForDataStatus.timeout; 613 614 logTrace("Socket %s, read %s bytes: %s", m_socket, nbytes, status); 615 616 assert(m_context.readBuffer.length == 0); 617 m_context.readBuffer.putN(nbytes); 618 switch (status) { 619 default: 620 logDebug("Error status when waiting for data: %s", status); 621 break; 622 case IOStatus.ok: break; 623 case IOStatus.wouldBlock: assert(mode == IOMode.immediate); break; 624 case IOStatus.disconnected: break; 625 } 626 627 return m_context.readBuffer.length > 0 ? WaitForDataStatus.dataAvailable : WaitForDataStatus.noMoreData; 628 } 629 630 631 /** Waits asynchronously for new data to arrive. 632 633 This function can be used to detach the `TCPConnection` from a 634 running task while waiting for data, so that the associated memory 635 resources are available for other operations. 636 637 Note that `read_ready_callback` may be called from outside of a 638 task, so no blocking operations may be performed. Instead, an existing 639 task should be notified, or a new one started with `runTask`. 640 641 Params: 642 read_ready_callback = A callback taking a `bool` parameter that 643 signals the read-readiness of the connection 644 timeout = Optional timeout to limit the maximum wait time 645 646 Returns: 647 If the read readiness can be determined immediately, it will be 648 returned as `WaitForDataAsyncStatus.dataAvailable` or 649 `WaitForDataAsyncStatus.noModeData` and the callback will not be 650 invoked. Otherwise `WaitForDataAsyncStatus.waiting` is returned 651 and the callback will be invoked once the status can be 652 determined or the specified timeout is reached. 653 */ 654 WaitForDataAsyncStatus waitForDataAsync(CALLABLE)(CALLABLE read_ready_callback, Duration timeout = Duration.max) 655 if (is(typeof(() @safe { read_ready_callback(true); } ()))) 656 { 657 mixin(tracer); 658 import vibe.core.core : Timer, setTimer; 659 660 if (!m_context) 661 return WaitForDataAsyncStatus.noMoreData; 662 663 if (m_context.readBuffer.length > 0) 664 return WaitForDataAsyncStatus.dataAvailable; 665 666 if (timeout <= 0.seconds) { 667 auto rs = waitForData(0.seconds); 668 return rs ? WaitForDataAsyncStatus.dataAvailable : WaitForDataAsyncStatus.noMoreData; 669 } 670 671 static final class WaitContext { 672 import std.algorithm.mutation : move; 673 674 CALLABLE callback; 675 TCPConnection connection; 676 Timer timer; 677 678 this(CALLABLE callback, TCPConnection connection, Duration timeout) 679 { 680 this.callback = callback; 681 this.connection = connection; 682 if (timeout < Duration.max) 683 this.timer = setTimer(timeout, &onTimeout); 684 } 685 686 void onTimeout() 687 { 688 eventDriver.sockets.cancelRead(connection.m_socket); 689 invoke(false); 690 } 691 692 void onData(StreamSocketFD, IOStatus st, size_t nb) 693 { 694 if (timer) timer.stop(); 695 assert(connection.m_context.readBuffer.length == 0); 696 connection.m_context.readBuffer.putN(nb); 697 invoke(connection.m_context.readBuffer.length > 0); 698 } 699 700 void invoke(bool status) 701 { 702 auto cb = move(callback); 703 connection = TCPConnection.init; 704 timer = Timer.init; 705 cb(status); 706 } 707 } 708 709 // FIXME: make this work without a heap allocation! 710 auto context = new WaitContext(read_ready_callback, this, timeout); 711 712 eventDriver.sockets.read(m_socket, m_context.readBuffer.peekDst(), 713 IOMode.once, &context.onData); 714 715 return WaitForDataAsyncStatus.waiting; 716 } 717 718 const(ubyte)[] peek() { return m_context ? m_context.readBuffer.peek() : null; } 719 720 void skip(ulong count) 721 { 722 import std.algorithm.comparison : min; 723 724 m_context.readTimeout.loopWithTimeout!((remaining) { 725 waitForData(remaining); 726 auto n = min(count, m_context.readBuffer.length); 727 m_context.readBuffer.popFrontN(n); 728 count -= n; 729 return count == 0; 730 }, ReadTimeoutException); 731 } 732 733 size_t read(scope ubyte[] dst, IOMode mode) 734 { 735 mixin(tracer); 736 import std.algorithm.comparison : min; 737 if (!dst.length) return 0; 738 if (m_context.readBuffer.length >= dst.length) { 739 m_context.readBuffer.read(dst); 740 return dst.length; 741 } 742 size_t nbytes = 0; 743 m_context.readTimeout.loopWithTimeout!((remaining) { 744 if (m_context.readBuffer.length == 0) { 745 if (mode == IOMode.immediate || mode == IOMode.once && nbytes > 0) 746 return true; 747 enforce(waitForData(remaining), "Reached end of stream while reading data."); 748 } 749 assert(m_context.readBuffer.length > 0); 750 auto l = min(dst.length, m_context.readBuffer.length); 751 m_context.readBuffer.read(dst[0 .. l]); 752 dst = dst[l .. $]; 753 nbytes += l; 754 return dst.length == 0; 755 }, ReadTimeoutException); 756 return nbytes; 757 } 758 759 void read(scope ubyte[] dst) { auto r = read(dst, IOMode.all); assert(r == dst.length); } 760 761 size_t write(in ubyte[] bytes, IOMode mode) 762 { 763 mixin(tracer); 764 if (bytes.length == 0) return 0; 765 766 auto res = asyncAwait!(IOCallback, 767 cb => eventDriver.sockets.write(m_socket, bytes, mode, cb), 768 cb => eventDriver.sockets.cancelWrite(m_socket)); 769 770 switch (res[1]) { 771 default: 772 throw new Exception("Error writing data to socket."); 773 case IOStatus.ok: 774 assert(mode != IOMode.all || res[2] == bytes.length); 775 break; 776 case IOStatus.disconnected: 777 if (mode == IOMode.all && res[2] != bytes.length) 778 throw new Exception("Connection closed while writing data."); 779 break; 780 } 781 782 return res[2]; 783 } 784 785 void write(in ubyte[] bytes) { auto r = write(bytes, IOMode.all); assert(r == bytes.length); } 786 void write(in char[] bytes) { write(cast(const(ubyte)[])bytes); } 787 void write(InputStream stream) { write(stream, 0); } 788 789 void flush() { 790 mixin(tracer); 791 } 792 void finalize() {} 793 void write(InputStream)(InputStream stream, ulong nbytes = 0) if (isInputStream!InputStream) { writeDefault(stream, nbytes); } 794 795 private void writeDefault(InputStream)(InputStream stream, ulong nbytes = 0) 796 if (isInputStream!InputStream) 797 { 798 import std.algorithm.comparison : min; 799 import vibe.internal.allocator : theAllocator, makeArray, dispose; 800 801 scope buffer = () @trusted { return cast(ubyte[]) theAllocator.allocate(64*1024); }(); 802 scope (exit) () @trusted { theAllocator.dispose(buffer); }(); 803 804 //logTrace("default write %d bytes, empty=%s", nbytes, stream.empty); 805 if( nbytes == 0 ){ 806 while( !stream.empty ){ 807 size_t chunk = min(stream.leastSize, buffer.length); 808 assert(chunk > 0, "leastSize returned zero for non-empty stream."); 809 //logTrace("read pipe chunk %d", chunk); 810 stream.read(buffer[0 .. chunk]); 811 write(buffer[0 .. chunk]); 812 } 813 } else { 814 while( nbytes > 0 ){ 815 size_t chunk = min(nbytes, buffer.length); 816 //logTrace("read pipe chunk %d", chunk); 817 stream.read(buffer[0 .. chunk]); 818 write(buffer[0 .. chunk]); 819 nbytes -= chunk; 820 } 821 } 822 } 823 } 824 825 /** Represents possible return values for 826 TCPConnection.waitForDataAsync. 827 */ 828 enum WaitForDataAsyncStatus { 829 noMoreData, 830 dataAvailable, 831 waiting, 832 } 833 834 enum WaitForDataStatus { 835 dataAvailable, 836 noMoreData, 837 timeout 838 } 839 840 unittest { // test compilation of callback with scoped destruction 841 static struct CB { 842 ~this() {} 843 this(this) {} 844 void opCall(bool) {} 845 } 846 847 void test() { 848 TCPConnection c; 849 CB cb; 850 c.waitForDataAsync(cb); 851 } 852 } 853 854 855 mixin validateConnectionStream!TCPConnection; 856 857 private void loopWithTimeout(alias LoopBody, ExceptionType = Exception)(Duration timeout, 858 immutable string timeoutMsg = "Operation timed out.") 859 { 860 import core.time : seconds, MonoTime; 861 862 MonoTime now; 863 if (timeout != Duration.max) 864 now = MonoTime.currTime(); 865 866 do { 867 if (LoopBody(timeout)) 868 return; 869 870 if (timeout != Duration.max) { 871 auto prev = now; 872 now = MonoTime.currTime(); 873 if (now > prev) timeout -= now - prev; 874 } 875 } while (timeout > 0.seconds); 876 877 throw new ExceptionType(timeoutMsg); 878 } 879 880 881 /** 882 Represents a listening TCP socket. 883 */ 884 struct TCPListener { 885 // FIXME: copying may lead to dangling FDs - this somehow needs to employ reference counting without breaking 886 // the previous behavior of keeping the socket alive when the listener isn't stored. At the same time, 887 // stopListening() needs to keep working. 888 private { 889 static struct Context { 890 shared(NativeEventDriver) driver; 891 } 892 893 StreamListenSocketFD m_socket; 894 Context* m_context; 895 } 896 897 this(StreamListenSocketFD socket) 898 { 899 m_socket = socket; 900 m_context = () @trusted { return &eventDriver.sockets.userData!Context(m_socket); } (); 901 m_context.driver = () @trusted { return cast(shared)eventDriver; } (); 902 } 903 904 bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != StreamListenSocketFD.invalid; } 905 906 /// The local address at which TCP connections are accepted. 907 @property NetworkAddress bindAddress() 908 { 909 NetworkAddress ret; 910 scope ra = new RefAddress(ret.sockAddr, ret.sockAddrMaxLen); 911 enforce(eventDriver.sockets.getLocalAddress(m_socket, ra), 912 "Failed to query bind address of listening socket."); 913 return ret; 914 } 915 916 /// Stops listening and closes the socket. 917 void stopListening() 918 { 919 if (m_socket != StreamListenSocketFD.invalid) { 920 releaseHandle!"sockets"(m_socket, m_context.driver); 921 m_socket = StreamListenSocketFD.invalid; 922 } 923 } 924 } 925 926 927 /** 928 Represents a bound and possibly 'connected' UDP socket. 929 */ 930 struct UDPConnection { 931 static struct Context { 932 bool canBroadcast; 933 shared(NativeEventDriver) driver; 934 } 935 936 private { 937 DatagramSocketFD m_socket; 938 Context* m_context; 939 } 940 941 private this(ref NetworkAddress bind_address) 942 { 943 scope baddr = new RefAddress(bind_address.sockAddr, bind_address.sockAddrLen); 944 m_socket = eventDriver.sockets.createDatagramSocket(baddr, null); 945 enforce(m_socket != DatagramSocketFD.invalid, "Failed to create datagram socket."); 946 m_context = () @trusted { return &eventDriver.sockets.userData!Context(m_socket); } (); 947 m_context.driver = () @trusted { return cast(shared)eventDriver; } (); 948 } 949 950 951 this(this) 952 nothrow { 953 if (m_socket != DatagramSocketFD.invalid) 954 eventDriver.sockets.addRef(m_socket); 955 } 956 957 ~this() 958 nothrow { 959 if (m_socket != DatagramSocketFD.invalid) 960 releaseHandle!"sockets"(m_socket, m_context.driver); 961 } 962 963 bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != DatagramSocketFD.invalid; } 964 965 /** Returns the address to which the UDP socket is bound. 966 */ 967 @property string bindAddress() const { return localAddress.toString(); } 968 969 /** Determines if the socket is allowed to send to broadcast addresses. 970 */ 971 @property bool canBroadcast() const { return m_context.canBroadcast; } 972 /// ditto 973 @property void canBroadcast(bool val) { enforce(eventDriver.sockets.setBroadcast(m_socket, val), "Failed to set UDP broadcast flag."); m_context.canBroadcast = val; } 974 975 /// The local/bind address of the underlying socket. 976 @property NetworkAddress localAddress() const nothrow { 977 NetworkAddress naddr; 978 scope addr = new RefAddress(naddr.sockAddr, naddr.sockAddrMaxLen); 979 try { 980 enforce(eventDriver.sockets.getLocalAddress(m_socket, addr), "Failed to query socket address."); 981 } catch (Exception e) { logWarn("Failed to get local address for TCP connection: %s", e.msg); } 982 return naddr; 983 } 984 985 /** Set IP multicast loopback mode. 986 987 This is on by default. All packets send will also loopback if enabled. 988 Useful if more than one application is running on same host and both need each other's packets. 989 */ 990 @property void multicastLoopback(bool loop) 991 { 992 enforce(eventDriver.sockets.setOption(m_socket, DatagramSocketOption.multicastLoopback, loop), 993 "Failed to set multicast loopback mode."); 994 } 995 996 /** Become a member of an IP multicast group. 997 998 The multiaddr parameter should be in the range 239.0.0.0-239.255.255.255. 999 See https://www.iana.org/assignments/multicast-addresses/multicast-addresses.xml#multicast-addresses-12 1000 and https://www.iana.org/assignments/ipv6-multicast-addresses/ipv6-multicast-addresses.xhtml 1001 */ 1002 void addMembership(ref NetworkAddress multiaddr, uint interface_index = 0) 1003 { 1004 scope addr = new RefAddress(multiaddr.sockAddr, multiaddr.sockAddrMaxLen); 1005 enforce(eventDriver.sockets.joinMulticastGroup(m_socket, addr, interface_index), 1006 "Failed to add multicast membership."); 1007 } 1008 1009 /** Stops listening for datagrams and frees all resources. 1010 */ 1011 void close() 1012 { 1013 if (m_socket != DatagramSocketFD.invalid) { 1014 releaseHandle!"sockets"(m_socket, m_context.driver); 1015 m_socket = DatagramSocketFD.init; 1016 m_context = null; 1017 } 1018 } 1019 1020 /** Locks the UDP connection to a certain peer. 1021 1022 Once connected, the UDPConnection can only communicate with the specified peer. 1023 Otherwise communication with any reachable peer is possible. 1024 */ 1025 void connect(string host, ushort port) 1026 { 1027 auto address = resolveHost(host); 1028 address.port = port; 1029 connect(address); 1030 } 1031 /// ditto 1032 void connect(NetworkAddress address) 1033 { 1034 scope addr = new RefAddress(address.sockAddr, address.sockAddrLen); 1035 eventDriver.sockets.setTargetAddress(m_socket, addr); 1036 } 1037 1038 /** Sends a single packet. 1039 1040 If peer_address is given, the packet is send to that address. Otherwise the packet 1041 will be sent to the address specified by a call to connect(). 1042 */ 1043 void send(in ubyte[] data, in NetworkAddress* peer_address = null) 1044 { 1045 scope addrc = new RefAddress; 1046 if (peer_address) 1047 addrc.set(() @trusted { return (cast(NetworkAddress*)peer_address).sockAddr; } (), peer_address.sockAddrLen); 1048 1049 IOStatus status; 1050 size_t nbytes; 1051 bool cancelled; 1052 1053 alias waitable = Waitable!(DatagramIOCallback, 1054 cb => eventDriver.sockets.send(m_socket, data, IOMode.once, peer_address ? addrc : null, cb), 1055 (cb) { cancelled = true; eventDriver.sockets.cancelSend(m_socket); }, 1056 (DatagramSocketFD, IOStatus status_, size_t nbytes_, scope RefAddress addr) 1057 { 1058 status = status_; 1059 nbytes = nbytes_; 1060 } 1061 ); 1062 1063 asyncAwaitAny!(true, waitable); 1064 1065 enforce(!cancelled && status == IOStatus.ok, "Failed to send packet."); 1066 enforce(nbytes == data.length, "Packet was only sent partially."); 1067 } 1068 1069 /** Receives a single packet. 1070 1071 If a buffer is given, it must be large enough to hold the full packet. 1072 1073 The timeout overload will throw an Exception if no data arrives before the 1074 specified duration has elapsed. 1075 */ 1076 ubyte[] recv(ubyte[] buf = null, NetworkAddress* peer_address = null) 1077 { 1078 return recv(Duration.max, buf, peer_address); 1079 } 1080 /// ditto 1081 ubyte[] recv(Duration timeout, ubyte[] buf = null, NetworkAddress* peer_address = null) 1082 { 1083 import std.socket : Address; 1084 if (buf.length == 0) buf = new ubyte[65536]; 1085 1086 IOStatus status; 1087 size_t nbytes; 1088 bool cancelled; 1089 1090 alias waitable = Waitable!(DatagramIOCallback, 1091 cb => eventDriver.sockets.receive(m_socket, buf, IOMode.once, cb), 1092 (cb) { cancelled = true; eventDriver.sockets.cancelReceive(m_socket); }, 1093 (DatagramSocketFD, IOStatus status_, size_t nbytes_, scope RefAddress addr) 1094 { 1095 status = status_; 1096 nbytes = nbytes_; 1097 if (status_ == IOStatus.ok && peer_address) { 1098 try *peer_address = NetworkAddress(addr); 1099 catch (Exception e) logWarn("Failed to store datagram source address: %s", e.msg); 1100 } 1101 } 1102 ); 1103 1104 asyncAwaitAny!(true, waitable)(timeout); 1105 enforce(!cancelled, "Receive timeout."); 1106 enforce(status == IOStatus.ok, "Failed to receive packet."); 1107 return buf[0 .. nbytes]; 1108 } 1109 } 1110 1111 1112 /** 1113 Flags to control the behavior of listenTCP. 1114 */ 1115 enum TCPListenOptions { 1116 /// Don't enable any particular option 1117 none = 0, 1118 /// Deprecated: causes incoming connections to be distributed across the thread pool 1119 distribute = 1<<0, 1120 /// Disables automatic closing of the connection when the connection callback exits 1121 disableAutoClose = 1<<1, 1122 /** Enable port reuse on linux kernel version >=3.9, do nothing on other OS 1123 Does not affect libasync driver because it is always enabled by libasync. 1124 */ 1125 reusePort = 1<<2, 1126 /// Enable address reuse 1127 reuseAddress = 1<<3, 1128 /// 1129 defaults = reuseAddress 1130 } 1131 1132 private pure nothrow { 1133 import std.bitmanip; 1134 1135 ushort ntoh(ushort val) 1136 { 1137 version (LittleEndian) return swapEndian(val); 1138 else version (BigEndian) return val; 1139 else static assert(false, "Unknown endianness."); 1140 } 1141 1142 ushort hton(ushort val) 1143 { 1144 version (LittleEndian) return swapEndian(val); 1145 else version (BigEndian) return val; 1146 else static assert(false, "Unknown endianness."); 1147 } 1148 } 1149 1150 private enum tracer = ""; 1151 1152 1153 /// Thrown by TCPConnection read-alike operations when timeout is reached. 1154 class ReadTimeoutException: Exception 1155 { 1156 @safe pure nothrow this(string message, 1157 Throwable next, 1158 string file =__FILE__, 1159 size_t line = __LINE__) 1160 { 1161 super(message, next, file, line); 1162 } 1163 1164 @safe pure nothrow this(string message, 1165 string file =__FILE__, 1166 size_t line = __LINE__, 1167 Throwable next = null) 1168 { 1169 super(message, file, line, next); 1170 } 1171 } 1172 1173 1174 // check whether the given name is not a valid host name, but may instead 1175 // be a valid IP address. This is used as a quick check to avoid 1176 // unnecessary address parsing or DNS queries 1177 private bool isMaybeIPAddress(in char[] name) 1178 { 1179 import std.algorithm.searching : all, canFind; 1180 import std.ascii : isDigit; 1181 1182 // could be an IPv6 address, but ':' is invalid in host names 1183 if (name.canFind(':')) return true; 1184 1185 // an IPv4 address is at least 7 characters (0.0.0.0) 1186 if (name.length < 7) return false; 1187 1188 // no valid TLD consists of only digits 1189 return name.canFind('.') && name.all!(ch => ch.isDigit || ch == '.'); 1190 } 1191 1192 unittest { 1193 assert(isMaybeIPAddress("0.0.0.0")); 1194 assert(isMaybeIPAddress("::1")); 1195 assert(isMaybeIPAddress("aabb::1f")); 1196 assert(!isMaybeIPAddress("example.com")); 1197 assert(!isMaybeIPAddress("12.com")); 1198 assert(!isMaybeIPAddress("1.1.1.t12")); 1199 }