1 /**
2 	TCP/UDP connection and server handling.
3 
4 	Copyright: © 2012-2016 Sönke Ludwig
5 	Authors: Sönke Ludwig
6 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
7 */
8 module vibe.core.net;
9 
10 import eventcore.core;
11 import std.exception : enforce;
12 import std.format : format;
13 import std.functional : toDelegate;
14 import std.socket : AddressFamily, UnknownAddress;
15 import vibe.core.internal.release;
16 import vibe.core.log;
17 import vibe.core.stream;
18 import vibe.internal.async;
19 import core.time : Duration;
20 
21 @safe:
22 
23 
24 /**
25 	Resolves the given host name/IP address string.
26 
27 	This routine converts a string to a `NetworkAddress`. If the string is an IP
28 	address, no network traffic will be generated and the routine will not block.
29 	If it is not, a DNS query will be issued, unless forbidden by the `use_dns`
30 	parameter, in which case the query is guaranteed not to block.
31 
32 	Params:
33 		host = The string to resolve, either an IP address or a hostname
34 	    family = The desired address family to return. By default, returns
35 		    the family that `host` is in. If a value is specified, this routine
36 			will throw if the value found for `host` doesn't match this parameter.
37 		use_dns = Whether to use the DNS if `host` is not an IP address.
38 			If `false` and `host` is not an IP, this routine will throw.
39 			Defaults to `true`.
40 		timeout = If `use_dns` is `true`, the `Duration` to use as timeout
41 			for the DNS lookup.
42 
43 	Throws:
44 		In case of lookup failure, if `family` doesn't match `host`,
45 		or if `use_dns` is `false` but `host` is not an IP.
46 		See the parameter description for more details.
47 
48 	Returns:
49 		A valid `NetworkAddress` matching `host`.
50 */
51 NetworkAddress resolveHost(string host, AddressFamily family = AddressFamily.UNSPEC, bool use_dns = true, Duration timeout = Duration.max)
52 {
53 	return resolveHost(host, cast(ushort)family, use_dns, timeout);
54 }
55 
56 /// ditto
57 NetworkAddress resolveHost(string host, ushort family, bool use_dns = true, Duration timeout = Duration.max)
58 {
59 	import std.socket : parseAddress;
60 	version (Windows) import core.sys.windows.winsock2 : sockaddr_in, sockaddr_in6;
61 	else import core.sys.posix.netinet.in_ : sockaddr_in, sockaddr_in6;
62 
63 	enforce(host.length > 0, "Host name must not be empty.");
64 	// Fast path: If it looks like an IP address, we don't need to resolve it
65 	if (isMaybeIPAddress(host)) {
66 		auto addr = parseAddress(host);
67 		enforce(family == AddressFamily.UNSPEC || addr.addressFamily == family);
68 		NetworkAddress ret;
69 		ret.family = addr.addressFamily;
70 		switch (addr.addressFamily) with(AddressFamily) {
71 			default: throw new Exception("Unsupported address family");
72 			case INET: *ret.sockAddrInet4 = () @trusted { return *cast(sockaddr_in*)addr.name; } (); break;
73 			case INET6: *ret.sockAddrInet6 = () @trusted { return *cast(sockaddr_in6*)addr.name; } (); break;
74 		}
75 		return ret;
76 	}
77 
78 	// Otherwise we need to do a DNS lookup
79 	enforce(use_dns, "Malformed IP address string.");
80 	NetworkAddress res;
81 	bool success = false;
82 	alias waitable = Waitable!(DNSLookupCallback,
83 		cb => eventDriver.dns.lookupHost(host, cb),
84 		(cb, id) => eventDriver.dns.cancelLookup(id),
85 		(DNSLookupID, DNSStatus status, scope RefAddress[] addrs) {
86 			if (status == DNSStatus.ok) {
87 				foreach (addr; addrs) {
88 					if (family != AddressFamily.UNSPEC && addr.addressFamily != family) continue;
89 					try res = NetworkAddress(addr);
90 					catch (Exception e) { logDiagnostic("Failed to store address from DNS lookup: %s", e.msg); }
91 					success = true;
92 					break;
93 				}
94 			}
95 		}
96 	);
97 
98 	asyncAwaitAny!(true, waitable)(timeout);
99 
100 	enforce(success, "Failed to lookup host '"~host~"'.");
101 	return res;
102 }
103 
104 
105 /**
106 	Starts listening on the specified port.
107 
108 	'connection_callback' will be called for each client that connects to the
109 	server socket. Each new connection gets its own fiber. The stream parameter
110 	then allows to perform blocking I/O on the client socket.
111 
112 	The address parameter can be used to specify the network
113 	interface on which the server socket is supposed to listen for connections.
114 	By default, all IPv4 and IPv6 interfaces will be used.
115 */
116 TCPListener[] listenTCP(ushort port, TCPConnectionDelegate connection_callback, TCPListenOptions options = TCPListenOptions.defaults)
117 {
118 	TCPListener[] ret;
119 	try ret ~= listenTCP(port, connection_callback, "::", options);
120 	catch (Exception e) logDiagnostic("Failed to listen on \"::\": %s", e.msg);
121 	try ret ~= listenTCP(port, connection_callback, "0.0.0.0", options);
122 	catch (Exception e) logDiagnostic("Failed to listen on \"0.0.0.0\": %s", e.msg);
123 	enforce(ret.length > 0, format("Failed to listen on all interfaces on port %s", port));
124 	return ret;
125 }
126 /// ditto
127 TCPListener listenTCP(ushort port, TCPConnectionDelegate connection_callback, string address, TCPListenOptions options = TCPListenOptions.defaults)
128 {
129 	auto addr = resolveHost(address);
130 	addr.port = port;
131 	StreamListenOptions sopts = StreamListenOptions.defaults;
132 	if (options & TCPListenOptions.reuseAddress)
133 		sopts |= StreamListenOptions.reuseAddress;
134 	else
135 		sopts &= ~StreamListenOptions.reuseAddress;
136 	if (options & TCPListenOptions.reusePort)
137 		sopts |= StreamListenOptions.reusePort;
138 	else
139 		sopts &= ~StreamListenOptions.reusePort;
140 	scope addrc = new RefAddress(addr.sockAddr, addr.sockAddrLen);
141 	auto sock = eventDriver.sockets.listenStream(addrc, sopts,
142 		(StreamListenSocketFD ls, StreamSocketFD s, scope RefAddress addr) @safe nothrow {
143 			import vibe.core.core : runTask;
144 			auto conn = TCPConnection(s, addr);
145 			runTask(connection_callback, conn);
146 		});
147 	enforce(sock != StreamListenSocketFD.invalid, "Failed to listen on "~addr.toString());
148 	return TCPListener(sock);
149 }
150 
151 /// Compatibility overload - use an `@safe nothrow` callback instead.
152 deprecated("Use a @safe nothrow callback instead.")
153 TCPListener[] listenTCP(ushort port, void delegate(TCPConnection) connection_callback, TCPListenOptions options = TCPListenOptions.defaults)
154 {
155 	TCPListener[] ret;
156 	try ret ~= listenTCP(port, connection_callback, "::", options);
157 	catch (Exception e) logDiagnostic("Failed to listen on \"::\": %s", e.msg);
158 	try ret ~= listenTCP(port, connection_callback, "0.0.0.0", options);
159 	catch (Exception e) logDiagnostic("Failed to listen on \"0.0.0.0\": %s", e.msg);
160 	enforce(ret.length > 0, format("Failed to listen on all interfaces on port %s", port));
161 	return ret;
162 }
163 /// ditto
164 deprecated("Use a @safe nothrow callback instead.")
165 TCPListener listenTCP(ushort port, void delegate(TCPConnection) connection_callback, string address, TCPListenOptions options = TCPListenOptions.defaults)
166 {
167 	return listenTCP(port, (conn) @trusted nothrow {
168 		try connection_callback(conn);
169 		catch (Exception e) {
170 			e.logException("Handling of connection failed");
171 			conn.close();
172 		}
173 	}, address, options);
174 }
175 
176 /**
177 	Starts listening on the specified port.
178 
179 	This function is the same as listenTCP but takes a function callback instead of a delegate.
180 */
181 TCPListener[] listenTCP_s(ushort port, TCPConnectionFunction connection_callback, TCPListenOptions options = TCPListenOptions.defaults) @trusted
182 {
183 	return listenTCP(port, toDelegate(connection_callback), options);
184 }
185 /// ditto
186 TCPListener listenTCP_s(ushort port, TCPConnectionFunction connection_callback, string address, TCPListenOptions options = TCPListenOptions.defaults) @trusted
187 {
188 	return listenTCP(port, toDelegate(connection_callback), address, options);
189 }
190 
191 /**
192 	Establishes a connection to the given host/port.
193 */
194 TCPConnection connectTCP(string host, ushort port, string bind_interface = null,
195 	ushort bind_port = 0, Duration timeout = Duration.max)
196 {
197 	NetworkAddress addr = resolveHost(host);
198 	addr.port = port;
199 	if (addr.family != AddressFamily.UNIX)
200 		addr.port = port;
201 
202 	NetworkAddress bind_address;
203 	if (bind_interface.length) bind_address = resolveHost(bind_interface, addr.family);
204 	else {
205 		bind_address.family = addr.family;
206 		if (bind_address.family == AddressFamily.INET) bind_address.sockAddrInet4.sin_addr.s_addr = 0;
207 		else if (bind_address.family != AddressFamily.UNIX) bind_address.sockAddrInet6.sin6_addr.s6_addr[] = 0;
208 	}
209 	if (addr.family != AddressFamily.UNIX)
210 		bind_address.port = bind_port;
211 
212 	return connectTCP(addr, bind_address, timeout);
213 }
214 /// ditto
215 TCPConnection connectTCP(NetworkAddress addr, NetworkAddress bind_address = anyAddress,
216 	Duration timeout = Duration.max)
217 {
218 	import std.conv : to;
219 
220 	if (bind_address.family == AddressFamily.UNSPEC) {
221 		bind_address.family = addr.family;
222 		if (bind_address.family == AddressFamily.INET) bind_address.sockAddrInet4.sin_addr.s_addr = 0;
223 		else if (bind_address.family != AddressFamily.UNIX) bind_address.sockAddrInet6.sin6_addr.s6_addr[] = 0;
224 		if (bind_address.family != AddressFamily.UNIX)
225 			bind_address.port = 0;
226 	}
227 	enforce(addr.family == bind_address.family, "Destination address and bind address have different address families.");
228 
229 	return () @trusted { // scope
230 		scope uaddr = new RefAddress(addr.sockAddr, addr.sockAddrLen);
231 		scope baddr = new RefAddress(bind_address.sockAddr, bind_address.sockAddrLen);
232 
233 		bool cancelled;
234 		StreamSocketFD sock;
235 		ConnectStatus status;
236 
237 		alias waiter = Waitable!(ConnectCallback,
238 			cb => eventDriver.sockets.connectStream(uaddr, baddr, cb),
239 			(cb, sock_fd) {
240 				cancelled = true;
241 				if (sock_fd != SocketFD.invalid)
242 					eventDriver.sockets.cancelConnectStream(sock_fd);
243 			},
244 			(fd, st) { sock = fd; status = st; }
245 		);
246 
247 		asyncAwaitAny!(true, waiter)(timeout);
248 
249 		enforce(!cancelled, "Failed to connect to " ~ addr.toString() ~
250 			": timeout");
251 
252 		if (status != ConnectStatus.connected) {
253 			if (sock != SocketFD.invalid)
254 				eventDriver.sockets.releaseRef(sock);
255 
256 			enforce(false, "Failed to connect to "~addr.toString()~": "~status.to!string);
257 			assert(false);
258 		}
259 
260 		return TCPConnection(sock, uaddr);
261 	} ();
262 }
263 
264 
265 /**
266 	Creates a bound UDP socket suitable for sending and receiving packets.
267 */
268 UDPConnection listenUDP(ref NetworkAddress addr, UDPListenOptions options = UDPListenOptions.none)
269 {
270 	return UDPConnection(addr, options);
271 }
272 /// ditto
273 UDPConnection listenUDP(ushort port, string bind_address = "0.0.0.0",
274 	UDPListenOptions options = UDPListenOptions.none)
275 {
276 	auto addr = resolveHost(bind_address, AddressFamily.UNSPEC, false);
277 	addr.port = port;
278 	return UDPConnection(addr, options);
279 }
280 
281 NetworkAddress anyAddress()
282 {
283 	NetworkAddress ret;
284 	ret.family = AddressFamily.UNSPEC;
285 	return ret;
286 }
287 
288 
289 /// Callback invoked for incoming TCP connections.
290 @safe nothrow alias TCPConnectionDelegate = void delegate(TCPConnection stream);
291 /// ditto
292 @safe nothrow alias TCPConnectionFunction = void function(TCPConnection stream);
293 
294 
295 /**
296 	Represents a network/socket address.
297 */
298 struct NetworkAddress {
299 	import std.algorithm.comparison : max;
300 	import std.socket : Address;
301 
302 	version (Windows) import core.sys.windows.winsock2;
303 	else import core.sys.posix.netinet.in_;
304 
305 	version(Posix) import core.sys.posix.sys.un : sockaddr_un;
306 
307 	@safe:
308 
309 	private union {
310 		sockaddr addr;
311 		version (Posix) sockaddr_un addr_unix;
312 		sockaddr_in addr_ip4;
313 		sockaddr_in6 addr_ip6;
314 	}
315 
316 	enum socklen_t sockAddrMaxLen = max(addr.sizeof, addr_ip6.sizeof);
317 
318 
319 	this(Address addr)
320 		@trusted
321 	{
322 		assert(addr !is null);
323 		switch (addr.addressFamily) {
324 			default: throw new Exception("Unsupported address family.");
325 			case AddressFamily.INET:
326 				this.family = AddressFamily.INET;
327 				assert(addr.nameLen >= sockaddr_in.sizeof);
328 				*this.sockAddrInet4 = *cast(sockaddr_in*)addr.name;
329 				break;
330 			case AddressFamily.INET6:
331 				this.family = AddressFamily.INET6;
332 				assert(addr.nameLen >= sockaddr_in6.sizeof);
333 				*this.sockAddrInet6 = *cast(sockaddr_in6*)addr.name;
334 				break;
335 			version (Posix) {
336 				case AddressFamily.UNIX:
337 					this.family = AddressFamily.UNIX;
338 					assert(addr.nameLen >= sockaddr_un.sizeof);
339 					*this.sockAddrUnix = *cast(sockaddr_un*)addr.name;
340 					break;
341 			}
342 		}
343 	}
344 
345 	/** Family of the socket address.
346 	*/
347 	@property ushort family() const pure nothrow { return addr.sa_family; }
348 	/// ditto
349 	@property void family(AddressFamily val) pure nothrow { addr.sa_family = cast(ubyte)val; }
350 	/// ditto
351 	@property void family(ushort val) pure nothrow { addr.sa_family = cast(ubyte)val; }
352 
353 	/** The port in host byte order.
354 	*/
355 	@property ushort port()
356 	const pure nothrow {
357 		ushort nport;
358 		switch (this.family) {
359 			default: assert(false, "port() called for invalid address family.");
360 			case AF_INET: nport = addr_ip4.sin_port; break;
361 			case AF_INET6: nport = addr_ip6.sin6_port; break;
362 		}
363 		return () @trusted { return ntoh(nport); } ();
364 	}
365 	/// ditto
366 	@property void port(ushort val)
367 	pure nothrow {
368 		auto nport = () @trusted { return hton(val); } ();
369 		switch (this.family) {
370 			default: assert(false, "port() called for invalid address family.");
371 			case AF_INET: addr_ip4.sin_port = nport; break;
372 			case AF_INET6: addr_ip6.sin6_port = nport; break;
373 		}
374 	}
375 
376 	/** A pointer to a sockaddr struct suitable for passing to socket functions.
377 	*/
378 	@property inout(sockaddr)* sockAddr() inout return pure nothrow { return &addr; }
379 
380 	/** Size of the sockaddr struct that is returned by sockAddr().
381 	*/
382 	@property socklen_t sockAddrLen()
383 	const pure nothrow {
384 		switch (this.family) {
385 			default: assert(false, "sockAddrLen() called for invalid address family.");
386 			case AF_INET: return addr_ip4.sizeof;
387 			case AF_INET6: return addr_ip6.sizeof;
388 			version (Posix) {
389 				case AF_UNIX: return addr_unix.sizeof;
390 			}
391 		}
392 	}
393 
394 	@property inout(sockaddr_in)* sockAddrInet4() inout return pure nothrow
395 		in { assert (family == AF_INET); }
396 		do { return &addr_ip4; }
397 
398 	@property inout(sockaddr_in6)* sockAddrInet6() inout return pure nothrow
399 		in { assert (family == AF_INET6); }
400 		do { return &addr_ip6; }
401 
402 	version (Posix) {
403 		@property inout(sockaddr_un)* sockAddrUnix() inout return pure nothrow
404 			in { assert (family == AddressFamily.UNIX); }
405 			do { return &addr_unix; }
406 	}
407 
408 	/** Returns a string representation of the IP address
409 	*/
410 	string toAddressString()
411 	const nothrow {
412 		import std.array : appender;
413 		auto ret = appender!string();
414 		ret.reserve(40);
415 		toAddressString(str => ret.put(str));
416 		return ret.data;
417 	}
418 	/// ditto
419 	void toAddressString(scope void delegate(const(char)[]) @safe sink)
420 	const nothrow {
421 		import std.array : appender;
422 		import std.format : formattedWrite;
423 		ubyte[2] _dummy = void; // Workaround for DMD regression in master
424 
425 		scope (failure) assert(false);
426 
427 		switch (this.family) {
428 			default: assert(false, "toAddressString() called for invalid address family.");
429 			case AF_UNSPEC:
430 				sink("<UNSPEC>");
431 				break;
432 			case AF_INET: {
433 				ubyte[4] ip = () @trusted { return (cast(ubyte*)&addr_ip4.sin_addr.s_addr)[0 .. 4]; } ();
434 				sink.formattedWrite("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3]);
435 				} break;
436 			case AF_INET6: {
437 				ubyte[16] ip = addr_ip6.sin6_addr.s6_addr;
438 				foreach (i; 0 .. 8) {
439 					if (i > 0) sink(":");
440 					_dummy[] = ip[i*2 .. i*2+2];
441 					sink.formattedWrite("%x", bigEndianToNative!ushort(_dummy));
442 				}
443 				} break;
444 			version (Posix) {
445 				case AddressFamily.UNIX:
446 					import std.traits : hasMember;
447 					import std.string : fromStringz;
448 					static if (hasMember!(sockaddr_un, "sun_len"))
449 						sink(() @trusted { return cast(char[])addr_unix.sun_path[0..addr_unix.sun_len]; } ());
450 					else
451 						sink(() @trusted { return (cast(char*)addr_unix.sun_path.ptr).fromStringz; } ());
452 					break;
453 			}
454 		}
455 	}
456 
457 	/** Returns a full string representation of the address, including the port number.
458 	*/
459 	string toString()
460 	const nothrow {
461 		import std.array : appender;
462 		auto ret = appender!string();
463 		toString(str => ret.put(str));
464 		return ret.data;
465 	}
466 	/// ditto
467 	void toString(scope void delegate(const(char)[]) @safe sink)
468 	const nothrow {
469 		import std.format : formattedWrite;
470 		try {
471 			switch (this.family) {
472 				default: assert(false, "toString() called for invalid address family.");
473 				case AF_UNSPEC:
474 					sink("<UNSPEC>");
475 					break;
476 				case AF_INET:
477 					toAddressString(sink);
478 					sink.formattedWrite(":%s", port);
479 					break;
480 				case AF_INET6:
481 					sink("[");
482 					toAddressString(sink);
483 					sink.formattedWrite("]:%s", port);
484 					break;
485 				case AddressFamily.UNIX:
486 					toAddressString(sink);
487 					break;
488 			}
489 		} catch (Exception e) {
490 			assert(false, "Unexpected exception: "~e.msg);
491 		}
492 	}
493 
494 	unittest {
495 		void test(string ip) {
496 			auto res = () @trusted { return resolveHost(ip, AF_UNSPEC, false); } ().toAddressString();
497 			assert(res == ip,
498 				   "IP "~ip~" yielded wrong string representation: "~res);
499 		}
500 		test("1.2.3.4");
501 		test("102:304:506:708:90a:b0c:d0e:f10");
502 	}
503 }
504 
505 /**
506 	Represents a single TCP connection.
507 */
508 struct TCPConnection {
509 	@safe:
510 
511 	import core.time : seconds;
512 	import vibe.internal.array : BatchBuffer;
513 	//static assert(isConnectionStream!TCPConnection);
514 
515 	static struct Context {
516 		BatchBuffer!ubyte readBuffer;
517 		bool tcpNoDelay = false;
518 		bool keepAlive = false;
519 		Duration readTimeout = Duration.max;
520 		string remoteAddressString;
521 		shared(NativeEventDriver) driver;
522 	}
523 
524 	private {
525 		StreamSocketFD m_socket;
526 		Context* m_context;
527 	}
528 
529 	private this(StreamSocketFD socket, scope RefAddress remote_address)
530 	nothrow {
531 		import std.exception : enforce;
532 
533 		m_socket = socket;
534 		m_context = () @trusted { return &eventDriver.sockets.userData!Context(socket); } ();
535 		m_context.readBuffer.capacity = 4096;
536 		m_context.driver = () @trusted { return cast(shared)eventDriver; } ();
537 	}
538 
539 	this(this)
540 	nothrow {
541 		if (m_socket != StreamSocketFD.invalid)
542 			eventDriver.sockets.addRef(m_socket);
543 	}
544 
545 	~this()
546 	nothrow {
547 		if (m_socket != StreamSocketFD.invalid)
548 			releaseHandle!"sockets"(m_socket, m_context.driver);
549 	}
550 
551 	@property int fd() const nothrow { return cast(int)m_socket; }
552 
553 	bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != StreamSocketFD.invalid; }
554 
555 	@property void tcpNoDelay(bool enabled) nothrow { eventDriver.sockets.setTCPNoDelay(m_socket, enabled); m_context.tcpNoDelay = enabled; }
556 	@property bool tcpNoDelay() const nothrow { return m_context.tcpNoDelay; }
557 	@property void keepAlive(bool enabled) nothrow { eventDriver.sockets.setKeepAlive(m_socket, enabled); m_context.keepAlive = enabled; }
558 	@property bool keepAlive() const nothrow { return m_context.keepAlive; }
559 	@property void readTimeout(Duration duration) { m_context.readTimeout = duration; }
560 	@property Duration readTimeout() const nothrow { return m_context.readTimeout; }
561 	@property string peerAddress() const nothrow { return this.remoteAddress.toString(); }
562 	@property NetworkAddress localAddress() const nothrow {
563 		NetworkAddress naddr;
564 		scope addr = new RefAddress(naddr.sockAddr, naddr.sockAddrMaxLen);
565 		if (!eventDriver.sockets.getLocalAddress(m_socket, addr))
566 			logWarn("Failed to get local address for TCP connection");
567 		return naddr;
568 	}
569 	@property NetworkAddress remoteAddress() const nothrow {
570 		NetworkAddress naddr;
571 		scope addr = new RefAddress(naddr.sockAddr, naddr.sockAddrMaxLen);
572 		if (!eventDriver.sockets.getRemoteAddress(m_socket, addr))
573 			logWarn("Failed to get remote address for TCP connection");
574 		return naddr;
575 	}
576 	@property bool connected()
577 	const nothrow {
578 		if (m_socket == StreamSocketFD.invalid) return false;
579 		auto s = eventDriver.sockets.getConnectionState(m_socket);
580 		return s >= ConnectionState.connected && s < ConnectionState.activeClose;
581 	}
582 	@property bool empty() { return leastSize == 0; }
583 
584 	@property ulong leastSize()
585 	{
586 		if (!m_context) return 0;
587 
588 		auto res = waitForDataEx(m_context.readTimeout);
589 		if (res == WaitForDataStatus.timeout)
590 			throw new ReadTimeoutException("Read operation timed out");
591 
592 		return m_context.readBuffer.length;
593 	}
594 
595 	@property bool dataAvailableForRead() { return waitForData(0.seconds); }
596 
597 	void close()
598 	nothrow {
599 		//logInfo("close %s", cast(int)m_fd);
600 		if (m_socket != StreamSocketFD.invalid) {
601 			eventDriver.sockets.shutdown(m_socket, true, true);
602 			releaseHandle!"sockets"(m_socket, m_context.driver);
603 			m_socket = StreamSocketFD.invalid;
604 			m_context = null;
605 		}
606 	}
607 
608 	bool waitForData(Duration timeout = Duration.max)
609 	{
610 		return waitForDataEx(timeout) == WaitForDataStatus.dataAvailable;
611 	}
612 
613 	WaitForDataStatus waitForDataEx(Duration timeout = Duration.max)
614 	{
615 		mixin(tracer);
616 		if (!m_context) return WaitForDataStatus.noMoreData;
617 		if (m_context.readBuffer.length > 0) return WaitForDataStatus.dataAvailable;
618 		auto mode = timeout <= 0.seconds ? IOMode.immediate : IOMode.once;
619 
620 		bool cancelled;
621 		IOStatus status;
622 		size_t nbytes;
623 
624 		alias waiter = Waitable!(IOCallback,
625 			cb => eventDriver.sockets.read(m_socket, m_context.readBuffer.peekDst(), mode, cb),
626 			(cb) { cancelled = true; eventDriver.sockets.cancelRead(m_socket); },
627 			(sock, st, nb) {
628 				if (m_socket == StreamSocketFD.invalid) {
629 					cancelled = true;
630 					return;
631 				}
632 				assert(sock == m_socket); status = st; nbytes = nb;
633 			}
634 		);
635 
636 		asyncAwaitAny!(true, waiter)(timeout);
637 
638 		if (!m_context) return WaitForDataStatus.noMoreData;
639 		// NOTE: for IOMode.immediate, no actual timeout occurrs, but the read
640 		//       fails immediately with wouldBlock
641 		if (cancelled || status == IOStatus.wouldBlock)
642 			return WaitForDataStatus.timeout;
643 
644 		logTrace("Socket %s, read %s bytes: %s", m_socket, nbytes, status);
645 
646 		assert(m_context.readBuffer.length == 0);
647 		m_context.readBuffer.putN(nbytes);
648 		switch (status) {
649 			default:
650 				logDebug("Error status when waiting for data: %s", status);
651 				break;
652 			case IOStatus.ok: break;
653 			case IOStatus.wouldBlock: assert(mode == IOMode.immediate); break;
654 			case IOStatus.disconnected: break;
655 		}
656 
657 		return m_context.readBuffer.length > 0 ? WaitForDataStatus.dataAvailable : WaitForDataStatus.noMoreData;
658 	}
659 
660 
661 	/** Waits asynchronously for new data to arrive.
662 
663 		This function can be used to detach the `TCPConnection` from a
664 		running task while waiting for data, so that the associated memory
665 		resources are available for other operations.
666 
667 		Note that `read_ready_callback` may be called from outside of a
668 		task, so no blocking operations may be performed. Instead, an existing
669 		task should be notified, or a new one started with `runTask`.
670 
671 		Params:
672 			read_ready_callback = A callback taking a `bool` parameter that
673 				signals the read-readiness of the connection
674 			timeout = Optional timeout to limit the maximum wait time
675 
676 		Returns:
677 			If the read readiness can be determined immediately, it will be
678 			returned as `WaitForDataAsyncStatus.dataAvailable` or
679 			`WaitForDataAsyncStatus.noModeData` and the callback will not be
680 			invoked. Otherwise `WaitForDataAsyncStatus.waiting` is returned
681 			and the callback will be invoked once the status can be
682 			determined or the specified timeout is reached.
683 	*/
684 	WaitForDataAsyncStatus waitForDataAsync(CALLABLE)(CALLABLE read_ready_callback, Duration timeout = Duration.max)
685 		if (is(typeof(() @safe { read_ready_callback(true); } ())))
686 	{
687 		mixin(tracer);
688 		import vibe.core.core : Timer, setTimer;
689 
690 		if (!m_context)
691 			return WaitForDataAsyncStatus.noMoreData;
692 
693 		if (m_context.readBuffer.length > 0)
694 			return WaitForDataAsyncStatus.dataAvailable;
695 
696 		if (timeout <= 0.seconds) {
697 			auto rs = waitForData(0.seconds);
698 			return rs ? WaitForDataAsyncStatus.dataAvailable : WaitForDataAsyncStatus.noMoreData;
699 		}
700 
701 		static final class WaitContext {
702 			import std.algorithm.mutation : move;
703 
704 			CALLABLE callback;
705 			TCPConnection connection;
706 			Timer timer;
707 
708 			this(CALLABLE callback, TCPConnection connection, Duration timeout)
709 			{
710 				this.callback = callback;
711 				this.connection = connection;
712 				if (timeout < Duration.max)
713 					this.timer = setTimer(timeout, &onTimeout);
714 			}
715 
716 			void onTimeout()
717 			{
718 				eventDriver.sockets.cancelRead(connection.m_socket);
719 				invoke(false);
720 			}
721 
722 			void onData(StreamSocketFD, IOStatus st, size_t nb)
723 			{
724 				if (timer) timer.stop();
725 				assert(connection.m_context.readBuffer.length == 0);
726 				connection.m_context.readBuffer.putN(nb);
727 				invoke(connection.m_context.readBuffer.length > 0);
728 			}
729 
730 			void invoke(bool status)
731 			{
732 				auto cb = move(callback);
733 				connection = TCPConnection.init;
734 				timer = Timer.init;
735 				cb(status);
736 			}
737 		}
738 
739 		// FIXME: make this work without a heap allocation!
740 		auto context = new WaitContext(read_ready_callback, this, timeout);
741 
742 		eventDriver.sockets.read(m_socket, m_context.readBuffer.peekDst(),
743 			IOMode.once, &context.onData);
744 
745 		return WaitForDataAsyncStatus.waiting;
746 	}
747 
748 	const(ubyte)[] peek() { return m_context ? m_context.readBuffer.peek() : null; }
749 
750 	void skip(ulong count)
751 	{
752 		import std.algorithm.comparison : min;
753 
754 		m_context.readTimeout.loopWithTimeout!((remaining) {
755 			waitForData(remaining);
756 			auto n = min(count, m_context.readBuffer.length);
757 			m_context.readBuffer.popFrontN(n);
758 			count -= n;
759 			return count == 0;
760 		}, ReadTimeoutException);
761 	}
762 
763 	size_t read(scope ubyte[] dst, IOMode mode)
764 	{
765 		mixin(tracer);
766 		import std.algorithm.comparison : min;
767 		if (!dst.length) return 0;
768 		if (m_context.readBuffer.length >= dst.length) {
769 			m_context.readBuffer.read(dst);
770 			return dst.length;
771 		}
772 		size_t nbytes = 0;
773 		m_context.readTimeout.loopWithTimeout!((remaining) {
774 			if (m_context.readBuffer.length == 0) {
775 				if (mode == IOMode.immediate || mode == IOMode.once && nbytes > 0)
776 					return true;
777 				enforce(waitForData(remaining), "Reached end of stream while reading data.");
778 			}
779 			assert(m_context.readBuffer.length > 0);
780 			auto l = min(dst.length, m_context.readBuffer.length);
781 			m_context.readBuffer.read(dst[0 .. l]);
782 			dst = dst[l .. $];
783 			nbytes += l;
784 			return dst.length == 0;
785 		}, ReadTimeoutException);
786 		return nbytes;
787 	}
788 
789 	void read(scope ubyte[] dst) { auto r = read(dst, IOMode.all); assert(r == dst.length); }
790 
791 	size_t write(in ubyte[] bytes, IOMode mode)
792 	{
793 		mixin(tracer);
794 		if (bytes.length == 0) return 0;
795 
796 		auto res = asyncAwait!(IOCallback,
797 			cb => eventDriver.sockets.write(m_socket, bytes, mode, cb),
798 			cb => eventDriver.sockets.cancelWrite(m_socket));
799 
800 		switch (res[1]) {
801 			default:
802 				throw new Exception("Error writing data to socket.");
803 			case IOStatus.ok:
804 				assert(mode != IOMode.all || res[2] == bytes.length);
805 				break;
806 			case IOStatus.disconnected:
807 				if (mode == IOMode.all && res[2] != bytes.length)
808 					throw new Exception("Connection closed while writing data.");
809 				break;
810 		}
811 
812 		return res[2];
813 	}
814 
815 	void write(in ubyte[] bytes) { auto r = write(bytes, IOMode.all); assert(r == bytes.length); }
816 	void write(in char[] bytes) { write(cast(const(ubyte)[])bytes); }
817 	void write(InputStream stream) { write(stream, 0); }
818 
819 	void flush() {
820 		mixin(tracer);
821 	}
822 	void finalize() {}
823 	void write(InputStream)(InputStream stream, ulong nbytes = 0) if (isInputStream!InputStream) { writeDefault(stream, nbytes); }
824 
825 	private void writeDefault(InputStream)(InputStream stream, ulong nbytes = 0)
826 		if (isInputStream!InputStream)
827 	{
828 		import std.algorithm.comparison : min;
829 		import vibe.internal.allocator : theAllocator, makeArray, dispose;
830 
831 		scope buffer = () @trusted { return cast(ubyte[]) theAllocator.allocate(64*1024); }();
832 		scope (exit) () @trusted { theAllocator.dispose(buffer); }();
833 
834 		//logTrace("default write %d bytes, empty=%s", nbytes, stream.empty);
835 		if( nbytes == 0 ){
836 			while( !stream.empty ){
837 				size_t chunk = min(stream.leastSize, buffer.length);
838 				assert(chunk > 0, "leastSize returned zero for non-empty stream.");
839 				//logTrace("read pipe chunk %d", chunk);
840 				stream.read(buffer[0 .. chunk]);
841 				write(buffer[0 .. chunk]);
842 			}
843 		} else {
844 			while( nbytes > 0 ){
845 				size_t chunk = min(nbytes, buffer.length);
846 				//logTrace("read pipe chunk %d", chunk);
847 				stream.read(buffer[0 .. chunk]);
848 				write(buffer[0 .. chunk]);
849 				nbytes -= chunk;
850 			}
851 		}
852 	}
853 }
854 
855 /** Represents possible return values for
856 	TCPConnection.waitForDataAsync.
857 */
858 enum WaitForDataAsyncStatus {
859 	noMoreData,
860 	dataAvailable,
861 	waiting,
862 }
863 
864 enum WaitForDataStatus {
865 	dataAvailable,
866 	noMoreData,
867 	timeout
868 }
869 
870 unittest { // test compilation of callback with scoped destruction
871 	static struct CB {
872 		~this() {}
873 		this(this) {}
874 		void opCall(bool) {}
875 	}
876 
877 	void test() {
878 		TCPConnection c;
879 		CB cb;
880 		c.waitForDataAsync(cb);
881 	}
882 }
883 
884 
885 mixin validateConnectionStream!TCPConnection;
886 
887 private void loopWithTimeout(alias LoopBody, ExceptionType = Exception)(Duration timeout,
888 	immutable string timeoutMsg = "Operation timed out.")
889 {
890 	import core.time : seconds, MonoTime;
891 
892 	MonoTime now;
893 	if (timeout != Duration.max)
894 		now = MonoTime.currTime();
895 
896 	do {
897 		if (LoopBody(timeout))
898 			return;
899 
900 		if (timeout != Duration.max) {
901 			auto prev = now;
902 			now = MonoTime.currTime();
903 			if (now > prev) timeout -= now - prev;
904 		}
905 	} while (timeout > 0.seconds);
906 
907 	throw new ExceptionType(timeoutMsg);
908 }
909 
910 
911 /**
912 	Represents a listening TCP socket.
913 */
914 struct TCPListener {
915 	// FIXME: copying may lead to dangling FDs - this somehow needs to employ reference counting without breaking
916 	//        the previous behavior of keeping the socket alive when the listener isn't stored. At the same time,
917 	//        stopListening() needs to keep working.
918 	private {
919 		static struct Context {
920 			shared(NativeEventDriver) driver;
921 		}
922 
923 		StreamListenSocketFD m_socket;
924 		Context* m_context;
925 	}
926 
927 	this(StreamListenSocketFD socket)
928 	{
929 		m_socket = socket;
930 		m_context = () @trusted { return &eventDriver.sockets.userData!Context(m_socket); } ();
931 		m_context.driver = () @trusted { return cast(shared)eventDriver; } ();
932 	}
933 
934 	bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != StreamListenSocketFD.invalid; }
935 
936 	/// The local address at which TCP connections are accepted.
937 	@property NetworkAddress bindAddress()
938 	{
939 		NetworkAddress ret;
940 		scope ra = new RefAddress(ret.sockAddr, ret.sockAddrMaxLen);
941 		enforce(eventDriver.sockets.getLocalAddress(m_socket, ra),
942 			"Failed to query bind address of listening socket.");
943 		return ret;
944 	}
945 
946 	/// Stops listening and closes the socket.
947 	void stopListening()
948 	{
949 		if (m_socket != StreamListenSocketFD.invalid) {
950 			releaseHandle!"sockets"(m_socket, m_context.driver);
951 			m_socket = StreamListenSocketFD.invalid;
952 		}
953 	}
954 }
955 
956 
957 /**
958 	Represents a bound and possibly 'connected' UDP socket.
959 */
960 struct UDPConnection {
961 	static struct Context {
962 		bool canBroadcast;
963 		shared(NativeEventDriver) driver;
964 	}
965 
966 	private {
967 		DatagramSocketFD m_socket;
968 		Context* m_context;
969 	}
970 
971 	private this(ref NetworkAddress bind_address, UDPListenOptions options)
972 	{
973 		DatagramCreateOptions copts;
974 		if (options & UDPListenOptions.reuseAddress) copts |= DatagramCreateOptions.reuseAddress;
975 		if (options & UDPListenOptions.reusePort) copts |= DatagramCreateOptions.reusePort;
976 
977 		scope baddr = new RefAddress(bind_address.sockAddr, bind_address.sockAddrLen);
978 		m_socket = eventDriver.sockets.createDatagramSocket(baddr, null, copts);
979 		enforce(m_socket != DatagramSocketFD.invalid, "Failed to create datagram socket.");
980 		m_context = () @trusted { return &eventDriver.sockets.userData!Context(m_socket); } ();
981 		m_context.driver = () @trusted { return cast(shared)eventDriver; } ();
982 	}
983 
984 
985 	this(this)
986 	nothrow {
987 		if (m_socket != DatagramSocketFD.invalid)
988 			eventDriver.sockets.addRef(m_socket);
989 	}
990 
991 	~this()
992 	nothrow {
993 		if (m_socket != DatagramSocketFD.invalid)
994 			releaseHandle!"sockets"(m_socket, m_context.driver);
995 	}
996 
997 	@property int fd() const nothrow { return cast(int)m_socket; }
998 
999 	bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != DatagramSocketFD.invalid; }
1000 
1001 	/** Returns the address to which the UDP socket is bound.
1002 	*/
1003 	@property string bindAddress() const { return localAddress.toString(); }
1004 
1005 	/** Determines if the socket is allowed to send to broadcast addresses.
1006 	*/
1007 	@property bool canBroadcast() const { return m_context.canBroadcast; }
1008 	/// ditto
1009 	@property void canBroadcast(bool val) { enforce(eventDriver.sockets.setBroadcast(m_socket, val), "Failed to set UDP broadcast flag."); m_context.canBroadcast = val; }
1010 
1011 	/// The local/bind address of the underlying socket.
1012 	@property NetworkAddress localAddress() const nothrow {
1013 		NetworkAddress naddr;
1014 		scope addr = new RefAddress(naddr.sockAddr, naddr.sockAddrMaxLen);
1015 		try {
1016 			enforce(eventDriver.sockets.getLocalAddress(m_socket, addr), "Failed to query socket address.");
1017 		} catch (Exception e) { logWarn("Failed to get local address for TCP connection: %s", e.msg); }
1018 		return naddr;
1019 	}
1020 
1021 	/** Set IP multicast loopback mode.
1022 
1023 		This is on by default. All packets send will also loopback if enabled.
1024 		Useful if more than one application is running on same host and both need each other's packets.
1025 	*/
1026 	@property void multicastLoopback(bool loop)
1027 	{
1028 		enforce(eventDriver.sockets.setOption(m_socket, DatagramSocketOption.multicastLoopback, loop),
1029 			"Failed to set multicast loopback mode.");
1030 	}
1031 
1032 	/** Become a member of an IP multicast group.
1033 
1034 		The multiaddr parameter should be in the range 239.0.0.0-239.255.255.255.
1035 		See https://www.iana.org/assignments/multicast-addresses/multicast-addresses.xml#multicast-addresses-12
1036 		and https://www.iana.org/assignments/ipv6-multicast-addresses/ipv6-multicast-addresses.xhtml
1037 	*/
1038 	void addMembership(ref NetworkAddress multiaddr, uint interface_index = 0)
1039 	{
1040 		scope addr = new RefAddress(multiaddr.sockAddr, multiaddr.sockAddrMaxLen);
1041 		enforce(eventDriver.sockets.joinMulticastGroup(m_socket, addr, interface_index),
1042 			"Failed to add multicast membership.");
1043 	}
1044 
1045 	/** Stops listening for datagrams and frees all resources.
1046 	*/
1047 	void close()
1048 	{
1049 		if (m_socket != DatagramSocketFD.invalid) {
1050 			releaseHandle!"sockets"(m_socket, m_context.driver);
1051 			m_socket = DatagramSocketFD.init;
1052 			m_context = null;
1053 		}
1054 	}
1055 
1056 	/** Locks the UDP connection to a certain peer.
1057 
1058 		Once connected, the UDPConnection can only communicate with the specified peer.
1059 		Otherwise communication with any reachable peer is possible.
1060 	*/
1061 	void connect(string host, ushort port)
1062 	{
1063 		auto address = resolveHost(host);
1064 		address.port = port;
1065 		connect(address);
1066 	}
1067 	/// ditto
1068 	void connect(NetworkAddress address)
1069 	{
1070 		scope addr = new RefAddress(address.sockAddr, address.sockAddrLen);
1071 		eventDriver.sockets.setTargetAddress(m_socket, addr);
1072 	}
1073 
1074 	/** Sends a single packet.
1075 
1076 		If peer_address is given, the packet is send to that address. Otherwise the packet
1077 		will be sent to the address specified by a call to connect().
1078 	*/
1079 	void send(in ubyte[] data, in NetworkAddress* peer_address = null)
1080 	{
1081 		scope addrc = new RefAddress;
1082 		if (peer_address)
1083 			addrc.set(() @trusted { return (cast(NetworkAddress*)peer_address).sockAddr; } (), peer_address.sockAddrLen);
1084 
1085 		IOStatus status;
1086 		size_t nbytes;
1087 		bool cancelled;
1088 
1089 		alias waitable = Waitable!(DatagramIOCallback,
1090 			cb => eventDriver.sockets.send(m_socket, data, IOMode.once, peer_address ? addrc : null, cb),
1091 			(cb) { cancelled = true; eventDriver.sockets.cancelSend(m_socket); },
1092 			(DatagramSocketFD, IOStatus status_, size_t nbytes_, scope RefAddress addr)
1093 			{
1094 				status = status_;
1095 				nbytes = nbytes_;
1096 			}
1097 		);
1098 
1099 		asyncAwaitAny!(true, waitable);
1100 
1101 		enforce(!cancelled && status == IOStatus.ok, "Failed to send packet.");
1102 		enforce(nbytes == data.length, "Packet was only sent partially.");
1103 	}
1104 
1105 	/** Receives a single packet.
1106 
1107 		If a buffer is given, it must be large enough to hold the full packet.
1108 
1109 		The timeout overload will throw an Exception if no data arrives before the
1110 		specified duration has elapsed.
1111 	*/
1112 	ubyte[] recv(ubyte[] buf = null, NetworkAddress* peer_address = null)
1113 	{
1114 		return recv(Duration.max, buf, peer_address);
1115 	}
1116 	/// ditto
1117 	ubyte[] recv(Duration timeout, ubyte[] buf = null, NetworkAddress* peer_address = null)
1118 	{
1119 		import std.socket : Address;
1120 		if (buf.length == 0) buf = new ubyte[65536];
1121 
1122 		IOStatus status;
1123 		size_t nbytes;
1124 		bool cancelled;
1125 
1126 		alias waitable = Waitable!(DatagramIOCallback,
1127 			cb => eventDriver.sockets.receive(m_socket, buf, IOMode.once, cb),
1128 			(cb) { cancelled = true; eventDriver.sockets.cancelReceive(m_socket); },
1129 			(DatagramSocketFD, IOStatus status_, size_t nbytes_, scope RefAddress addr)
1130 			{
1131 				status = status_;
1132 				nbytes = nbytes_;
1133 				if (status_ == IOStatus.ok && peer_address) {
1134 					try *peer_address = NetworkAddress(addr);
1135 					catch (Exception e) logWarn("Failed to store datagram source address: %s", e.msg);
1136 				}
1137 			}
1138 		);
1139 
1140 		asyncAwaitAny!(true, waitable)(timeout);
1141 		enforce(!cancelled, "Receive timeout.");
1142 		enforce(status == IOStatus.ok, "Failed to receive packet.");
1143 		return buf[0 .. nbytes];
1144 	}
1145 }
1146 
1147 
1148 /**
1149 	Flags to control the behavior of listenTCP.
1150 */
1151 enum TCPListenOptions {
1152 	/// Don't enable any particular option
1153 	none = 0,
1154 	/// Deprecated: causes incoming connections to be distributed across the thread pool
1155 	distribute = 1<<0,
1156 	/// Disables automatic closing of the connection when the connection callback exits
1157 	disableAutoClose = 1<<1,
1158 	/** Enable port reuse on linux kernel version >=3.9, do nothing on other OS
1159 		Does not affect libasync driver because it is always enabled by libasync.
1160 	*/
1161 	reusePort = 1<<2,
1162 	/// Enable address reuse
1163 	reuseAddress = 1<<3,
1164 	///
1165 	defaults = reuseAddress
1166 }
1167 
1168 
1169 /** Flags to control the behavior of `listenUDP`
1170 */
1171 enum UDPListenOptions {
1172 	/// No options
1173 	none = 0,
1174 	/// Enable address reuse
1175 	reuseAddress = 1 << 0,
1176 	/// Enable port reuse
1177 	reusePort = 1 << 1
1178 }
1179 
1180 private pure nothrow {
1181 	import std.bitmanip;
1182 
1183 	ushort ntoh(ushort val)
1184 	{
1185 		version (LittleEndian) return swapEndian(val);
1186 		else version (BigEndian) return val;
1187 		else static assert(false, "Unknown endianness.");
1188 	}
1189 
1190 	ushort hton(ushort val)
1191 	{
1192 		version (LittleEndian) return swapEndian(val);
1193 		else version (BigEndian) return val;
1194 		else static assert(false, "Unknown endianness.");
1195 	}
1196 }
1197 
1198 private enum tracer = "";
1199 
1200 
1201 /// Thrown by TCPConnection read-alike operations when timeout is reached.
1202 class ReadTimeoutException: Exception
1203 {
1204 	@safe pure nothrow @nogc:
1205 	this(string message, Throwable next, string file =__FILE__, size_t line = __LINE__)
1206 	{
1207 		super(message, next, file, line);
1208 	}
1209 
1210     this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null)
1211 	{
1212 		super(message, file, line, next);
1213 	}
1214 }
1215 
1216 
1217 // check whether the given name is not a valid host name, but may instead
1218 // be a valid IP address. This is used as a quick check to avoid
1219 // unnecessary address parsing or DNS queries
1220 private bool isMaybeIPAddress(in char[] name)
1221 {
1222 	import std.algorithm.searching : all, canFind;
1223 	import std.ascii : isDigit;
1224 
1225 	// could be an IPv6 address, but ':' is invalid in host names
1226 	if (name.canFind(':')) return true;
1227 
1228 	// an IPv4 address is at least 7 characters (0.0.0.0)
1229 	if (name.length < 7) return false;
1230 
1231 	// no valid TLD consists of only digits
1232 	return name.canFind('.') && name.all!(ch => ch.isDigit || ch == '.');
1233 }
1234 
1235 unittest {
1236 	assert(isMaybeIPAddress("0.0.0.0"));
1237 	assert(isMaybeIPAddress("::1"));
1238 	assert(isMaybeIPAddress("aabb::1f"));
1239 	assert(!isMaybeIPAddress("example.com"));
1240 	assert(!isMaybeIPAddress("12.com"));
1241 	assert(!isMaybeIPAddress("1.1.1.t12"));
1242 }