1 /**
2 	TCP/UDP connection and server handling.
3 
4 	Copyright: © 2012-2016 Sönke Ludwig
5 	Authors: Sönke Ludwig
6 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
7 */
8 module vibe.core.net;
9 
10 import eventcore.core;
11 import std.exception : enforce;
12 import std.format : format;
13 import std.functional : toDelegate;
14 import std.socket : AddressFamily, UnknownAddress;
15 import vibe.core.internal.release;
16 import vibe.core.log;
17 import vibe.core.stream;
18 import vibe.internal.async;
19 import core.time : Duration;
20 
21 @safe:
22 
23 
24 /**
25 	Resolves the given host name/IP address string.
26 
27 	Setting use_dns to false will only allow IP address strings but also guarantees
28 	that the call will not block.
29 */
30 NetworkAddress resolveHost(string host, AddressFamily address_family = AddressFamily.UNSPEC, bool use_dns = true)
31 {
32 	return resolveHost(host, cast(ushort)address_family, use_dns);
33 }
34 /// ditto
35 NetworkAddress resolveHost(string host, ushort address_family, bool use_dns = true)
36 {
37 	import std.socket : parseAddress;
38 	version (Windows) import core.sys.windows.winsock2 : sockaddr_in, sockaddr_in6;
39 	else import core.sys.posix.netinet.in_ : sockaddr_in, sockaddr_in6;
40 
41 	enforce(host.length > 0, "Host name must not be empty.");
42 	if (isMaybeIPAddress(host)) {
43 		auto addr = parseAddress(host);
44 		enforce(address_family == AddressFamily.UNSPEC || addr.addressFamily == address_family);
45 		NetworkAddress ret;
46 		ret.family = addr.addressFamily;
47 		switch (addr.addressFamily) with(AddressFamily) {
48 			default: throw new Exception("Unsupported address family");
49 			case INET: *ret.sockAddrInet4 = () @trusted { return *cast(sockaddr_in*)addr.name; } (); break;
50 			case INET6: *ret.sockAddrInet6 = () @trusted { return *cast(sockaddr_in6*)addr.name; } (); break;
51 		}
52 		return ret;
53 	} else {
54 		enforce(use_dns, "Malformed IP address string.");
55 		NetworkAddress res;
56 		bool success = false;
57 		alias waitable = Waitable!(DNSLookupCallback,
58 			cb => eventDriver.dns.lookupHost(host, cb),
59 			(cb, id) => eventDriver.dns.cancelLookup(id),
60 			(DNSLookupID, DNSStatus status, scope RefAddress[] addrs) {
61 				if (status == DNSStatus.ok && addrs.length > 0) {
62 					try res = NetworkAddress(addrs[0]);
63 					catch (Exception e) { logDiagnostic("Failed to store address from DNS lookup: %s", e.msg); }
64 					success = true;
65 				}
66 			}
67 		);
68 
69 		asyncAwaitAny!(true, waitable);
70 
71 		enforce(success, "Failed to lookup host '"~host~"'.");
72 		return res;
73 	}
74 }
75 
76 
77 /**
78 	Starts listening on the specified port.
79 
80 	'connection_callback' will be called for each client that connects to the
81 	server socket. Each new connection gets its own fiber. The stream parameter
82 	then allows to perform blocking I/O on the client socket.
83 
84 	The address parameter can be used to specify the network
85 	interface on which the server socket is supposed to listen for connections.
86 	By default, all IPv4 and IPv6 interfaces will be used.
87 */
88 TCPListener[] listenTCP(ushort port, TCPConnectionDelegate connection_callback, TCPListenOptions options = TCPListenOptions.defaults)
89 {
90 	TCPListener[] ret;
91 	try ret ~= listenTCP(port, connection_callback, "::", options);
92 	catch (Exception e) logDiagnostic("Failed to listen on \"::\": %s", e.msg);
93 	try ret ~= listenTCP(port, connection_callback, "0.0.0.0", options);
94 	catch (Exception e) logDiagnostic("Failed to listen on \"0.0.0.0\": %s", e.msg);
95 	enforce(ret.length > 0, format("Failed to listen on all interfaces on port %s", port));
96 	return ret;
97 }
98 /// ditto
99 TCPListener listenTCP(ushort port, TCPConnectionDelegate connection_callback, string address, TCPListenOptions options = TCPListenOptions.defaults)
100 {
101 	auto addr = resolveHost(address);
102 	addr.port = port;
103 	StreamListenOptions sopts = StreamListenOptions.defaults;
104 	if (options & TCPListenOptions.reuseAddress)
105 		sopts |= StreamListenOptions.reuseAddress;
106 	else
107 		sopts &= ~StreamListenOptions.reuseAddress;
108 	if (options & TCPListenOptions.reusePort)
109 		sopts |= StreamListenOptions.reusePort;
110 	else
111 		sopts &= ~StreamListenOptions.reusePort;
112 	scope addrc = new RefAddress(addr.sockAddr, addr.sockAddrLen);
113 	auto sock = eventDriver.sockets.listenStream(addrc, sopts,
114 		(StreamListenSocketFD ls, StreamSocketFD s, scope RefAddress addr) @safe nothrow {
115 			import vibe.core.core : runTask;
116 			auto conn = TCPConnection(s, addr);
117 			runTask(connection_callback, conn);
118 		});
119 	enforce(sock != StreamListenSocketFD.invalid, "Failed to listen on "~addr.toString());
120 	return TCPListener(sock);
121 }
122 
123 /// Compatibility overload - use an `@safe nothrow` callback instead.
124 deprecated("Use a @safe nothrow callback instead.")
125 TCPListener[] listenTCP(ushort port, void delegate(TCPConnection) connection_callback, TCPListenOptions options = TCPListenOptions.defaults)
126 {
127 	TCPListener[] ret;
128 	try ret ~= listenTCP(port, connection_callback, "::", options);
129 	catch (Exception e) logDiagnostic("Failed to listen on \"::\": %s", e.msg);
130 	try ret ~= listenTCP(port, connection_callback, "0.0.0.0", options);
131 	catch (Exception e) logDiagnostic("Failed to listen on \"0.0.0.0\": %s", e.msg);
132 	enforce(ret.length > 0, format("Failed to listen on all interfaces on port %s", port));
133 	return ret;
134 }
135 /// ditto
136 deprecated("Use a @safe nothrow callback instead.")
137 TCPListener listenTCP(ushort port, void delegate(TCPConnection) connection_callback, string address, TCPListenOptions options = TCPListenOptions.defaults)
138 {
139 	return listenTCP(port, (conn) @trusted nothrow {
140 		try connection_callback(conn);
141 		catch (Exception e) {
142 			logError("Handling of connection failed: %s", e.msg);
143 			conn.close();
144 			logDebug("Full error: %s", e);
145 		}
146 	}, address, options);
147 }
148 
149 /**
150 	Starts listening on the specified port.
151 
152 	This function is the same as listenTCP but takes a function callback instead of a delegate.
153 */
154 TCPListener[] listenTCP_s(ushort port, TCPConnectionFunction connection_callback, TCPListenOptions options = TCPListenOptions.defaults) @trusted
155 {
156 	return listenTCP(port, toDelegate(connection_callback), options);
157 }
158 /// ditto
159 TCPListener listenTCP_s(ushort port, TCPConnectionFunction connection_callback, string address, TCPListenOptions options = TCPListenOptions.defaults) @trusted
160 {
161 	return listenTCP(port, toDelegate(connection_callback), address, options);
162 }
163 
164 /**
165 	Establishes a connection to the given host/port.
166 */
167 TCPConnection connectTCP(string host, ushort port, string bind_interface = null,
168 	ushort bind_port = 0, Duration timeout = Duration.max)
169 {
170 	NetworkAddress addr = resolveHost(host);
171 	addr.port = port;
172 	if (addr.family != AddressFamily.UNIX)
173 		addr.port = port;
174 
175 	NetworkAddress bind_address;
176 	if (bind_interface.length) bind_address = resolveHost(bind_interface, addr.family);
177 	else {
178 		bind_address.family = addr.family;
179 		if (bind_address.family == AddressFamily.INET) bind_address.sockAddrInet4.sin_addr.s_addr = 0;
180 		else if (bind_address.family != AddressFamily.UNIX) bind_address.sockAddrInet6.sin6_addr.s6_addr[] = 0;
181 	}
182 	if (addr.family != AddressFamily.UNIX)
183 		bind_address.port = bind_port;
184 
185 	return connectTCP(addr, bind_address, timeout);
186 }
187 /// ditto
188 TCPConnection connectTCP(NetworkAddress addr, NetworkAddress bind_address = anyAddress,
189 	Duration timeout = Duration.max)
190 {
191 	import std.conv : to;
192 
193 	if (bind_address.family == AddressFamily.UNSPEC) {
194 		bind_address.family = addr.family;
195 		if (bind_address.family == AddressFamily.INET) bind_address.sockAddrInet4.sin_addr.s_addr = 0;
196 		else if (bind_address.family != AddressFamily.UNIX) bind_address.sockAddrInet6.sin6_addr.s6_addr[] = 0;
197 		if (bind_address.family != AddressFamily.UNIX)
198 			bind_address.port = 0;
199 	}
200 	enforce(addr.family == bind_address.family, "Destination address and bind address have different address families.");
201 
202 	return () @trusted { // scope
203 		scope uaddr = new RefAddress(addr.sockAddr, addr.sockAddrLen);
204 		scope baddr = new RefAddress(bind_address.sockAddr, bind_address.sockAddrLen);
205 
206 		bool cancelled;
207 		StreamSocketFD sock;
208 		ConnectStatus status;
209 
210 		alias waiter = Waitable!(ConnectCallback,
211 			cb => eventDriver.sockets.connectStream(uaddr, baddr, cb),
212 			(cb, sock_fd) {
213 				cancelled = true;
214 				if (sock_fd != SocketFD.invalid)
215 					eventDriver.sockets.cancelConnectStream(sock_fd);
216 			},
217 			(fd, st) { sock = fd; status = st; }
218 		);
219 
220 		asyncAwaitAny!(true, waiter)(timeout);
221 
222 		enforce(!cancelled, "Failed to connect to " ~ addr.toString() ~
223 			": timeout");
224 
225 		if (status != ConnectStatus.connected) {
226 			if (sock != SocketFD.invalid)
227 				eventDriver.sockets.releaseRef(sock);
228 
229 			enforce(false, "Failed to connect to "~addr.toString()~": "~status.to!string);
230 			assert(false);
231 		}
232 
233 		return TCPConnection(sock, uaddr);
234 	} ();
235 }
236 
237 
238 /**
239 	Creates a bound UDP socket suitable for sending and receiving packets.
240 */
241 UDPConnection listenUDP(ref NetworkAddress addr)
242 {
243 	return UDPConnection(addr);
244 }
245 /// ditto
246 UDPConnection listenUDP(ushort port, string bind_address = "0.0.0.0")
247 {
248 	auto addr = resolveHost(bind_address, AddressFamily.UNSPEC, false);
249 	addr.port = port;
250 	return UDPConnection(addr);
251 }
252 
253 NetworkAddress anyAddress()
254 {
255 	NetworkAddress ret;
256 	ret.family = AddressFamily.UNSPEC;
257 	return ret;
258 }
259 
260 
261 /// Callback invoked for incoming TCP connections.
262 @safe nothrow alias TCPConnectionDelegate = void delegate(TCPConnection stream);
263 /// ditto
264 @safe nothrow alias TCPConnectionFunction = void function(TCPConnection stream);
265 
266 
267 /**
268 	Represents a network/socket address.
269 */
270 struct NetworkAddress {
271 	import std.algorithm.comparison : max;
272 	import std.socket : Address;
273 
274 	version (Windows) import core.sys.windows.winsock2;
275 	else import core.sys.posix.netinet.in_;
276 
277 	version(Posix) import core.sys.posix.sys.un : sockaddr_un;
278 
279 	@safe:
280 
281 	private union {
282 		sockaddr addr;
283 		version (Posix) sockaddr_un addr_unix;
284 		sockaddr_in addr_ip4;
285 		sockaddr_in6 addr_ip6;
286 	}
287 
288 	enum socklen_t sockAddrMaxLen = max(addr.sizeof, addr_ip6.sizeof);
289 
290 
291 	this(Address addr)
292 		@trusted
293 	{
294 		assert(addr !is null);
295 		switch (addr.addressFamily) {
296 			default: throw new Exception("Unsupported address family.");
297 			case AddressFamily.INET:
298 				this.family = AddressFamily.INET;
299 				assert(addr.nameLen >= sockaddr_in.sizeof);
300 				*this.sockAddrInet4 = *cast(sockaddr_in*)addr.name;
301 				break;
302 			case AddressFamily.INET6:
303 				this.family = AddressFamily.INET6;
304 				assert(addr.nameLen >= sockaddr_in6.sizeof);
305 				*this.sockAddrInet6 = *cast(sockaddr_in6*)addr.name;
306 				break;
307 			version (Posix) {
308 				case AddressFamily.UNIX:
309 					this.family = AddressFamily.UNIX;
310 					assert(addr.nameLen >= sockaddr_un.sizeof);
311 					*this.sockAddrUnix = *cast(sockaddr_un*)addr.name;
312 					break;
313 			}
314 		}
315 	}
316 
317 	/** Family of the socket address.
318 	*/
319 	@property ushort family() const pure nothrow { return addr.sa_family; }
320 	/// ditto
321 	@property void family(AddressFamily val) pure nothrow { addr.sa_family = cast(ubyte)val; }
322 	/// ditto
323 	@property void family(ushort val) pure nothrow { addr.sa_family = cast(ubyte)val; }
324 
325 	/** The port in host byte order.
326 	*/
327 	@property ushort port()
328 	const pure nothrow {
329 		ushort nport;
330 		switch (this.family) {
331 			default: assert(false, "port() called for invalid address family.");
332 			case AF_INET: nport = addr_ip4.sin_port; break;
333 			case AF_INET6: nport = addr_ip6.sin6_port; break;
334 		}
335 		return () @trusted { return ntoh(nport); } ();
336 	}
337 	/// ditto
338 	@property void port(ushort val)
339 	pure nothrow {
340 		auto nport = () @trusted { return hton(val); } ();
341 		switch (this.family) {
342 			default: assert(false, "port() called for invalid address family.");
343 			case AF_INET: addr_ip4.sin_port = nport; break;
344 			case AF_INET6: addr_ip6.sin6_port = nport; break;
345 		}
346 	}
347 
348 	/** A pointer to a sockaddr struct suitable for passing to socket functions.
349 	*/
350 	@property inout(sockaddr)* sockAddr() inout pure nothrow { return &addr; }
351 
352 	/** Size of the sockaddr struct that is returned by sockAddr().
353 	*/
354 	@property socklen_t sockAddrLen()
355 	const pure nothrow {
356 		switch (this.family) {
357 			default: assert(false, "sockAddrLen() called for invalid address family.");
358 			case AF_INET: return addr_ip4.sizeof;
359 			case AF_INET6: return addr_ip6.sizeof;
360 			version (Posix) {
361 				case AF_UNIX: return addr_unix.sizeof;
362 			}
363 		}
364 	}
365 
366 	@property inout(sockaddr_in)* sockAddrInet4() inout pure nothrow
367 		in { assert (family == AF_INET); }
368 		do { return &addr_ip4; }
369 
370 	@property inout(sockaddr_in6)* sockAddrInet6() inout pure nothrow
371 		in { assert (family == AF_INET6); }
372 		do { return &addr_ip6; }
373 
374 	version (Posix) {
375 		@property inout(sockaddr_un)* sockAddrUnix() inout pure nothrow
376 			in { assert (family == AddressFamily.UNIX); }
377 			do { return &addr_unix; }
378 	}
379 
380 	/** Returns a string representation of the IP address
381 	*/
382 	string toAddressString()
383 	const nothrow {
384 		import std.array : appender;
385 		auto ret = appender!string();
386 		ret.reserve(40);
387 		toAddressString(str => ret.put(str));
388 		return ret.data;
389 	}
390 	/// ditto
391 	void toAddressString(scope void delegate(const(char)[]) @safe sink)
392 	const nothrow {
393 		import std.array : appender;
394 		import std.format : formattedWrite;
395 		ubyte[2] _dummy = void; // Workaround for DMD regression in master
396 
397 		scope (failure) assert(false);
398 
399 		switch (this.family) {
400 			default: assert(false, "toAddressString() called for invalid address family.");
401 			case AF_UNSPEC:
402 				sink("<UNSPEC>");
403 				break;
404 			case AF_INET: {
405 				ubyte[4] ip = () @trusted { return (cast(ubyte*)&addr_ip4.sin_addr.s_addr)[0 .. 4]; } ();
406 				sink.formattedWrite("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3]);
407 				} break;
408 			case AF_INET6: {
409 				ubyte[16] ip = addr_ip6.sin6_addr.s6_addr;
410 				foreach (i; 0 .. 8) {
411 					if (i > 0) sink(":");
412 					_dummy[] = ip[i*2 .. i*2+2];
413 					sink.formattedWrite("%x", bigEndianToNative!ushort(_dummy));
414 				}
415 				} break;
416 			version (Posix) {
417 				case AddressFamily.UNIX:
418 					import std.traits : hasMember;
419 					import std..string : fromStringz;
420 					static if (hasMember!(sockaddr_un, "sun_len"))
421 						sink(() @trusted { return cast(char[])addr_unix.sun_path[0..addr_unix.sun_len]; } ());
422 					else
423 						sink(() @trusted { return (cast(char*)addr_unix.sun_path.ptr).fromStringz; } ());
424 					break;
425 			}
426 		}
427 	}
428 
429 	/** Returns a full string representation of the address, including the port number.
430 	*/
431 	string toString()
432 	const nothrow {
433 		import std.array : appender;
434 		auto ret = appender!string();
435 		toString(str => ret.put(str));
436 		return ret.data;
437 	}
438 	/// ditto
439 	void toString(scope void delegate(const(char)[]) @safe sink)
440 	const nothrow {
441 		import std.format : formattedWrite;
442 		try {
443 			switch (this.family) {
444 				default: assert(false, "toString() called for invalid address family.");
445 				case AF_UNSPEC:
446 					sink("<UNSPEC>");
447 					break;
448 				case AF_INET:
449 					toAddressString(sink);
450 					sink.formattedWrite(":%s", port);
451 					break;
452 				case AF_INET6:
453 					sink("[");
454 					toAddressString(sink);
455 					sink.formattedWrite("]:%s", port);
456 					break;
457 				case AddressFamily.UNIX:
458 					toAddressString(sink);
459 					break;
460 			}
461 		} catch (Exception e) {
462 			assert(false, "Unexpected exception: "~e.msg);
463 		}
464 	}
465 
466 	version(Have_libev) {}
467 	else {
468 		unittest {
469 			void test(string ip) {
470 				auto res = () @trusted { return resolveHost(ip, AF_UNSPEC, false); } ().toAddressString();
471 				assert(res == ip,
472 					   "IP "~ip~" yielded wrong string representation: "~res);
473 			}
474 			test("1.2.3.4");
475 			test("102:304:506:708:90a:b0c:d0e:f10");
476 		}
477 	}
478 }
479 
480 /**
481 	Represents a single TCP connection.
482 */
483 struct TCPConnection {
484 	@safe:
485 
486 	import core.time : seconds;
487 	import vibe.internal.array : BatchBuffer;
488 	//static assert(isConnectionStream!TCPConnection);
489 
490 	static struct Context {
491 		BatchBuffer!ubyte readBuffer;
492 		bool tcpNoDelay = false;
493 		bool keepAlive = false;
494 		Duration readTimeout = Duration.max;
495 		string remoteAddressString;
496 		shared(NativeEventDriver) driver;
497 	}
498 
499 	private {
500 		StreamSocketFD m_socket;
501 		Context* m_context;
502 	}
503 
504 	private this(StreamSocketFD socket, scope RefAddress remote_address)
505 	nothrow {
506 		import std.exception : enforce;
507 
508 		m_socket = socket;
509 		m_context = () @trusted { return &eventDriver.sockets.userData!Context(socket); } ();
510 		m_context.readBuffer.capacity = 4096;
511 		m_context.driver = () @trusted { return cast(shared)eventDriver; } ();
512 	}
513 
514 	this(this)
515 	nothrow {
516 		if (m_socket != StreamSocketFD.invalid)
517 			eventDriver.sockets.addRef(m_socket);
518 	}
519 
520 	~this()
521 	nothrow {
522 		if (m_socket != StreamSocketFD.invalid)
523 			releaseHandle!"sockets"(m_socket, m_context.driver);
524 	}
525 
526 	bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != StreamSocketFD.invalid; }
527 
528 	@property void tcpNoDelay(bool enabled) nothrow { eventDriver.sockets.setTCPNoDelay(m_socket, enabled); m_context.tcpNoDelay = enabled; }
529 	@property bool tcpNoDelay() const nothrow { return m_context.tcpNoDelay; }
530 	@property void keepAlive(bool enabled) nothrow { eventDriver.sockets.setKeepAlive(m_socket, enabled); m_context.keepAlive = enabled; }
531 	@property bool keepAlive() const nothrow { return m_context.keepAlive; }
532 	@property void readTimeout(Duration duration) { m_context.readTimeout = duration; }
533 	@property Duration readTimeout() const nothrow { return m_context.readTimeout; }
534 	@property string peerAddress() const nothrow { return this.remoteAddress.toString(); }
535 	@property NetworkAddress localAddress() const nothrow {
536 		NetworkAddress naddr;
537 		scope addr = new RefAddress(naddr.sockAddr, naddr.sockAddrMaxLen);
538 		if (!eventDriver.sockets.getLocalAddress(m_socket, addr))
539 			logWarn("Failed to get local address for TCP connection");
540 		return naddr;
541 	}
542 	@property NetworkAddress remoteAddress() const nothrow {
543 		NetworkAddress naddr;
544 		scope addr = new RefAddress(naddr.sockAddr, naddr.sockAddrMaxLen);
545 		if (!eventDriver.sockets.getRemoteAddress(m_socket, addr))
546 			logWarn("Failed to get remote address for TCP connection");
547 		return naddr;
548 	}
549 	@property bool connected()
550 	const nothrow {
551 		if (m_socket == StreamSocketFD.invalid) return false;
552 		auto s = eventDriver.sockets.getConnectionState(m_socket);
553 		return s >= ConnectionState.connected && s < ConnectionState.activeClose;
554 	}
555 	@property bool empty() { return leastSize == 0; }
556 
557 	@property ulong leastSize()
558 	{
559 		if (!m_context) return 0;
560 
561 		auto res = waitForDataEx(m_context.readTimeout);
562 		if (res == WaitForDataStatus.timeout)
563 			throw new ReadTimeoutException("Read operation timed out");
564 
565 		return m_context.readBuffer.length;
566 	}
567 
568 	@property bool dataAvailableForRead() { return waitForData(0.seconds); }
569 
570 	void close()
571 	nothrow {
572 		//logInfo("close %s", cast(int)m_fd);
573 		if (m_socket != StreamSocketFD.invalid) {
574 			eventDriver.sockets.shutdown(m_socket, true, true);
575 			releaseHandle!"sockets"(m_socket, m_context.driver);
576 			m_socket = StreamSocketFD.invalid;
577 			m_context = null;
578 		}
579 	}
580 
581 	bool waitForData(Duration timeout = Duration.max)
582 	{
583 		return waitForDataEx(timeout) == WaitForDataStatus.dataAvailable;
584 	}
585 
586 	WaitForDataStatus waitForDataEx(Duration timeout = Duration.max)
587 	{
588 mixin(tracer);
589 		if (!m_context) return WaitForDataStatus.noMoreData;
590 		if (m_context.readBuffer.length > 0) return WaitForDataStatus.dataAvailable;
591 		auto mode = timeout <= 0.seconds ? IOMode.immediate : IOMode.once;
592 
593 		bool cancelled;
594 		IOStatus status;
595 		size_t nbytes;
596 
597 		alias waiter = Waitable!(IOCallback,
598 			cb => eventDriver.sockets.read(m_socket, m_context.readBuffer.peekDst(), mode, cb),
599 			(cb) { cancelled = true; eventDriver.sockets.cancelRead(m_socket); },
600 			(sock, st, nb) {
601 				if (m_socket == StreamSocketFD.invalid) {
602 					cancelled = true;
603 					return;
604 				}
605 				assert(sock == m_socket); status = st; nbytes = nb;
606 			}
607 		);
608 
609 		asyncAwaitAny!(true, waiter)(timeout);
610 
611 		if (!m_context) return WaitForDataStatus.noMoreData;
612 		if (cancelled) return WaitForDataStatus.timeout;
613 
614 		logTrace("Socket %s, read %s bytes: %s", m_socket, nbytes, status);
615 
616 		assert(m_context.readBuffer.length == 0);
617 		m_context.readBuffer.putN(nbytes);
618 		switch (status) {
619 			default:
620 				logDebug("Error status when waiting for data: %s", status);
621 				break;
622 			case IOStatus.ok: break;
623 			case IOStatus.wouldBlock: assert(mode == IOMode.immediate); break;
624 			case IOStatus.disconnected: break;
625 		}
626 
627 		return m_context.readBuffer.length > 0 ? WaitForDataStatus.dataAvailable : WaitForDataStatus.noMoreData;
628 	}
629 
630 
631 	/** Waits asynchronously for new data to arrive.
632 
633 		This function can be used to detach the `TCPConnection` from a
634 		running task while waiting for data, so that the associated memory
635 		resources are available for other operations.
636 
637 		Note that `read_ready_callback` may be called from outside of a
638 		task, so no blocking operations may be performed. Instead, an existing
639 		task should be notified, or a new one started with `runTask`.
640 
641 		Params:
642 			read_ready_callback = A callback taking a `bool` parameter that
643 				signals the read-readiness of the connection
644 			timeout = Optional timeout to limit the maximum wait time
645 
646 		Returns:
647 			If the read readiness can be determined immediately, it will be
648 			returned as `WaitForDataAsyncStatus.dataAvailable` or
649 			`WaitForDataAsyncStatus.noModeData` and the callback will not be
650 			invoked. Otherwise `WaitForDataAsyncStatus.waiting` is returned
651 			and the callback will be invoked once the status can be
652 			determined or the specified timeout is reached.
653 	*/
654 	WaitForDataAsyncStatus waitForDataAsync(CALLABLE)(CALLABLE read_ready_callback, Duration timeout = Duration.max)
655 		if (is(typeof(() @safe { read_ready_callback(true); } ())))
656 	{
657 mixin(tracer);
658 		import vibe.core.core : Timer, setTimer;
659 
660 		if (!m_context)
661 			return WaitForDataAsyncStatus.noMoreData;
662 
663 		if (m_context.readBuffer.length > 0)
664 			return WaitForDataAsyncStatus.dataAvailable;
665 
666 		if (timeout <= 0.seconds) {
667 			auto rs = waitForData(0.seconds);
668 			return rs ? WaitForDataAsyncStatus.dataAvailable : WaitForDataAsyncStatus.noMoreData;
669 		}
670 
671 		static final class WaitContext {
672 			import std.algorithm.mutation : move;
673 
674 			CALLABLE callback;
675 			TCPConnection connection;
676 			Timer timer;
677 
678 			this(CALLABLE callback, TCPConnection connection, Duration timeout)
679 			{
680 				this.callback = callback;
681 				this.connection = connection;
682 				if (timeout < Duration.max)
683 					this.timer = setTimer(timeout, &onTimeout);
684 			}
685 
686 			void onTimeout()
687 			{
688 				eventDriver.sockets.cancelRead(connection.m_socket);
689 				invoke(false);
690 			}
691 
692 			void onData(StreamSocketFD, IOStatus st, size_t nb)
693 			{
694 				if (timer) timer.stop();
695 				assert(connection.m_context.readBuffer.length == 0);
696 				connection.m_context.readBuffer.putN(nb);
697 				invoke(connection.m_context.readBuffer.length > 0);
698 			}
699 
700 			void invoke(bool status)
701 			{
702 				auto cb = move(callback);
703 				connection = TCPConnection.init;
704 				timer = Timer.init;
705 				cb(status);
706 			}
707 		}
708 
709 		// FIXME: make this work without a heap allocation!
710 		auto context = new WaitContext(read_ready_callback, this, timeout);
711 
712 		eventDriver.sockets.read(m_socket, m_context.readBuffer.peekDst(),
713 			IOMode.once, &context.onData);
714 
715 		return WaitForDataAsyncStatus.waiting;
716 	}
717 
718 	const(ubyte)[] peek() { return m_context ? m_context.readBuffer.peek() : null; }
719 
720 	void skip(ulong count)
721 	{
722 		import std.algorithm.comparison : min;
723 
724 		m_context.readTimeout.loopWithTimeout!((remaining) {
725 			waitForData(remaining);
726 			auto n = min(count, m_context.readBuffer.length);
727 			m_context.readBuffer.popFrontN(n);
728 			count -= n;
729 			return count == 0;
730 		}, ReadTimeoutException);
731 	}
732 
733 	size_t read(scope ubyte[] dst, IOMode mode)
734 	{
735 mixin(tracer);
736 		import std.algorithm.comparison : min;
737 		if (!dst.length) return 0;
738 		if (m_context.readBuffer.length >= dst.length) {
739 			m_context.readBuffer.read(dst);
740 			return dst.length;
741 		}
742 		size_t nbytes = 0;
743 		m_context.readTimeout.loopWithTimeout!((remaining) {
744 			if (m_context.readBuffer.length == 0) {
745 				if (mode == IOMode.immediate || mode == IOMode.once && nbytes > 0)
746 					return true;
747 				enforce(waitForData(remaining), "Reached end of stream while reading data.");
748 			}
749 			assert(m_context.readBuffer.length > 0);
750 			auto l = min(dst.length, m_context.readBuffer.length);
751 			m_context.readBuffer.read(dst[0 .. l]);
752 			dst = dst[l .. $];
753 			nbytes += l;
754 			return dst.length == 0;
755 		}, ReadTimeoutException);
756 		return nbytes;
757 	}
758 
759 	void read(scope ubyte[] dst) { auto r = read(dst, IOMode.all); assert(r == dst.length); }
760 
761 	size_t write(in ubyte[] bytes, IOMode mode)
762 	{
763 mixin(tracer);
764 		if (bytes.length == 0) return 0;
765 
766 		auto res = asyncAwait!(IOCallback,
767 			cb => eventDriver.sockets.write(m_socket, bytes, mode, cb),
768 			cb => eventDriver.sockets.cancelWrite(m_socket));
769 
770 		switch (res[1]) {
771 			default:
772 				throw new Exception("Error writing data to socket.");
773 			case IOStatus.ok:
774 				assert(mode != IOMode.all || res[2] == bytes.length);
775 				break;
776 			case IOStatus.disconnected:
777 				if (mode == IOMode.all && res[2] != bytes.length)
778 					throw new Exception("Connection closed while writing data.");
779 				break;
780 		}
781 
782 		return res[2];
783 	}
784 
785 	void write(in ubyte[] bytes) { auto r = write(bytes, IOMode.all); assert(r == bytes.length); }
786 	void write(in char[] bytes) { write(cast(const(ubyte)[])bytes); }
787 	void write(InputStream stream) { write(stream, 0); }
788 
789 	void flush() {
790 mixin(tracer);
791 	}
792 	void finalize() {}
793 	void write(InputStream)(InputStream stream, ulong nbytes = 0) if (isInputStream!InputStream) { writeDefault(stream, nbytes); }
794 
795 	private void writeDefault(InputStream)(InputStream stream, ulong nbytes = 0)
796 		if (isInputStream!InputStream)
797 	{
798 		import std.algorithm.comparison : min;
799 		import vibe.internal.allocator : theAllocator, makeArray, dispose;
800 
801 		scope buffer = () @trusted { return cast(ubyte[]) theAllocator.allocate(64*1024); }();
802 		scope (exit) () @trusted { theAllocator.dispose(buffer); }();
803 
804 		//logTrace("default write %d bytes, empty=%s", nbytes, stream.empty);
805 		if( nbytes == 0 ){
806 			while( !stream.empty ){
807 				size_t chunk = min(stream.leastSize, buffer.length);
808 				assert(chunk > 0, "leastSize returned zero for non-empty stream.");
809 				//logTrace("read pipe chunk %d", chunk);
810 				stream.read(buffer[0 .. chunk]);
811 				write(buffer[0 .. chunk]);
812 			}
813 		} else {
814 			while( nbytes > 0 ){
815 				size_t chunk = min(nbytes, buffer.length);
816 				//logTrace("read pipe chunk %d", chunk);
817 				stream.read(buffer[0 .. chunk]);
818 				write(buffer[0 .. chunk]);
819 				nbytes -= chunk;
820 			}
821 		}
822 	}
823 }
824 
825 /** Represents possible return values for
826 	TCPConnection.waitForDataAsync.
827 */
828 enum WaitForDataAsyncStatus {
829 	noMoreData,
830 	dataAvailable,
831 	waiting,
832 }
833 
834 enum WaitForDataStatus {
835 	dataAvailable,
836 	noMoreData,
837 	timeout
838 }
839 
840 unittest { // test compilation of callback with scoped destruction
841 	static struct CB {
842 		~this() {}
843 		this(this) {}
844 		void opCall(bool) {}
845 	}
846 
847 	void test() {
848 		TCPConnection c;
849 		CB cb;
850 		c.waitForDataAsync(cb);
851 	}
852 }
853 
854 
855 mixin validateConnectionStream!TCPConnection;
856 
857 private void loopWithTimeout(alias LoopBody, ExceptionType = Exception)(Duration timeout,
858 	immutable string timeoutMsg = "Operation timed out.")
859 {
860 	import core.time : seconds, MonoTime;
861 
862 	MonoTime now;
863 	if (timeout != Duration.max)
864 		now = MonoTime.currTime();
865 
866 	do {
867 		if (LoopBody(timeout))
868 			return;
869 
870 		if (timeout != Duration.max) {
871 			auto prev = now;
872 			now = MonoTime.currTime();
873 			if (now > prev) timeout -= now - prev;
874 		}
875 	} while (timeout > 0.seconds);
876 
877 	throw new ExceptionType(timeoutMsg);
878 }
879 
880 
881 /**
882 	Represents a listening TCP socket.
883 */
884 struct TCPListener {
885 	// FIXME: copying may lead to dangling FDs - this somehow needs to employ reference counting without breaking
886 	//        the previous behavior of keeping the socket alive when the listener isn't stored. At the same time,
887 	//        stopListening() needs to keep working.
888 	private {
889 		static struct Context {
890 			shared(NativeEventDriver) driver;
891 		}
892 
893 		StreamListenSocketFD m_socket;
894 		Context* m_context;
895 	}
896 
897 	this(StreamListenSocketFD socket)
898 	{
899 		m_socket = socket;
900 		m_context = () @trusted { return &eventDriver.sockets.userData!Context(m_socket); } ();
901 		m_context.driver = () @trusted { return cast(shared)eventDriver; } ();
902 	}
903 
904 	bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != StreamListenSocketFD.invalid; }
905 
906 	/// The local address at which TCP connections are accepted.
907 	@property NetworkAddress bindAddress()
908 	{
909 		NetworkAddress ret;
910 		scope ra = new RefAddress(ret.sockAddr, ret.sockAddrMaxLen);
911 		enforce(eventDriver.sockets.getLocalAddress(m_socket, ra),
912 			"Failed to query bind address of listening socket.");
913 		return ret;
914 	}
915 
916 	/// Stops listening and closes the socket.
917 	void stopListening()
918 	{
919 		if (m_socket != StreamListenSocketFD.invalid) {
920 			releaseHandle!"sockets"(m_socket, m_context.driver);
921 			m_socket = StreamListenSocketFD.invalid;
922 		}
923 	}
924 }
925 
926 
927 /**
928 	Represents a bound and possibly 'connected' UDP socket.
929 */
930 struct UDPConnection {
931 	static struct Context {
932 		bool canBroadcast;
933 		shared(NativeEventDriver) driver;
934 	}
935 
936 	private {
937 		DatagramSocketFD m_socket;
938 		Context* m_context;
939 	}
940 
941 	private this(ref NetworkAddress bind_address)
942 	{
943 		scope baddr = new RefAddress(bind_address.sockAddr, bind_address.sockAddrLen);
944 		m_socket = eventDriver.sockets.createDatagramSocket(baddr, null);
945 		enforce(m_socket != DatagramSocketFD.invalid, "Failed to create datagram socket.");
946 		m_context = () @trusted { return &eventDriver.sockets.userData!Context(m_socket); } ();
947 		m_context.driver = () @trusted { return cast(shared)eventDriver; } ();
948 	}
949 
950 
951 	this(this)
952 	nothrow {
953 		if (m_socket != DatagramSocketFD.invalid)
954 			eventDriver.sockets.addRef(m_socket);
955 	}
956 
957 	~this()
958 	nothrow {
959 		if (m_socket != DatagramSocketFD.invalid)
960 			releaseHandle!"sockets"(m_socket, m_context.driver);
961 	}
962 
963 	bool opCast(T)() const nothrow if (is(T == bool)) { return m_socket != DatagramSocketFD.invalid; }
964 
965 	/** Returns the address to which the UDP socket is bound.
966 	*/
967 	@property string bindAddress() const { return localAddress.toString(); }
968 
969 	/** Determines if the socket is allowed to send to broadcast addresses.
970 	*/
971 	@property bool canBroadcast() const { return m_context.canBroadcast; }
972 	/// ditto
973 	@property void canBroadcast(bool val) { enforce(eventDriver.sockets.setBroadcast(m_socket, val), "Failed to set UDP broadcast flag."); m_context.canBroadcast = val; }
974 
975 	/// The local/bind address of the underlying socket.
976 	@property NetworkAddress localAddress() const nothrow {
977 		NetworkAddress naddr;
978 		scope addr = new RefAddress(naddr.sockAddr, naddr.sockAddrMaxLen);
979 		try {
980 			enforce(eventDriver.sockets.getLocalAddress(m_socket, addr), "Failed to query socket address.");
981 		} catch (Exception e) { logWarn("Failed to get local address for TCP connection: %s", e.msg); }
982 		return naddr;
983 	}
984 
985 	/** Set IP multicast loopback mode.
986 
987 		This is on by default. All packets send will also loopback if enabled.
988 		Useful if more than one application is running on same host and both need each other's packets.
989 	*/
990 	@property void multicastLoopback(bool loop)
991 	{
992 		enforce(eventDriver.sockets.setOption(m_socket, DatagramSocketOption.multicastLoopback, loop),
993 			"Failed to set multicast loopback mode.");
994 	}
995 
996 	/** Become a member of an IP multicast group.
997 
998 		The multiaddr parameter should be in the range 239.0.0.0-239.255.255.255.
999 		See https://www.iana.org/assignments/multicast-addresses/multicast-addresses.xml#multicast-addresses-12
1000 		and https://www.iana.org/assignments/ipv6-multicast-addresses/ipv6-multicast-addresses.xhtml
1001 	*/
1002 	void addMembership(ref NetworkAddress multiaddr, uint interface_index = 0)
1003 	{
1004 		scope addr = new RefAddress(multiaddr.sockAddr, multiaddr.sockAddrMaxLen);
1005 		enforce(eventDriver.sockets.joinMulticastGroup(m_socket, addr, interface_index),
1006 			"Failed to add multicast membership.");
1007 	}
1008 
1009 	/** Stops listening for datagrams and frees all resources.
1010 	*/
1011 	void close()
1012 	{
1013 		if (m_socket != DatagramSocketFD.invalid) {
1014 			releaseHandle!"sockets"(m_socket, m_context.driver);
1015 			m_socket = DatagramSocketFD.init;
1016 			m_context = null;
1017 		}
1018 	}
1019 
1020 	/** Locks the UDP connection to a certain peer.
1021 
1022 		Once connected, the UDPConnection can only communicate with the specified peer.
1023 		Otherwise communication with any reachable peer is possible.
1024 	*/
1025 	void connect(string host, ushort port)
1026 	{
1027 		auto address = resolveHost(host);
1028 		address.port = port;
1029 		connect(address);
1030 	}
1031 	/// ditto
1032 	void connect(NetworkAddress address)
1033 	{
1034 		scope addr = new RefAddress(address.sockAddr, address.sockAddrLen);
1035 		eventDriver.sockets.setTargetAddress(m_socket, addr);
1036 	}
1037 
1038 	/** Sends a single packet.
1039 
1040 		If peer_address is given, the packet is send to that address. Otherwise the packet
1041 		will be sent to the address specified by a call to connect().
1042 	*/
1043 	void send(in ubyte[] data, in NetworkAddress* peer_address = null)
1044 	{
1045 		scope addrc = new RefAddress;
1046 		if (peer_address)
1047 			addrc.set(() @trusted { return (cast(NetworkAddress*)peer_address).sockAddr; } (), peer_address.sockAddrLen);
1048 
1049 		IOStatus status;
1050 		size_t nbytes;
1051 		bool cancelled;
1052 
1053 		alias waitable = Waitable!(DatagramIOCallback,
1054 			cb => eventDriver.sockets.send(m_socket, data, IOMode.once, peer_address ? addrc : null, cb),
1055 			(cb) { cancelled = true; eventDriver.sockets.cancelSend(m_socket); },
1056 			(DatagramSocketFD, IOStatus status_, size_t nbytes_, scope RefAddress addr)
1057 			{
1058 				status = status_;
1059 				nbytes = nbytes_;
1060 			}
1061 		);
1062 
1063 		asyncAwaitAny!(true, waitable);
1064 
1065 		enforce(!cancelled && status == IOStatus.ok, "Failed to send packet.");
1066 		enforce(nbytes == data.length, "Packet was only sent partially.");
1067 	}
1068 
1069 	/** Receives a single packet.
1070 
1071 		If a buffer is given, it must be large enough to hold the full packet.
1072 
1073 		The timeout overload will throw an Exception if no data arrives before the
1074 		specified duration has elapsed.
1075 	*/
1076 	ubyte[] recv(ubyte[] buf = null, NetworkAddress* peer_address = null)
1077 	{
1078 		return recv(Duration.max, buf, peer_address);
1079 	}
1080 	/// ditto
1081 	ubyte[] recv(Duration timeout, ubyte[] buf = null, NetworkAddress* peer_address = null)
1082 	{
1083 		import std.socket : Address;
1084 		if (buf.length == 0) buf = new ubyte[65536];
1085 
1086 		IOStatus status;
1087 		size_t nbytes;
1088 		bool cancelled;
1089 
1090 		alias waitable = Waitable!(DatagramIOCallback,
1091 			cb => eventDriver.sockets.receive(m_socket, buf, IOMode.once, cb),
1092 			(cb) { cancelled = true; eventDriver.sockets.cancelReceive(m_socket); },
1093 			(DatagramSocketFD, IOStatus status_, size_t nbytes_, scope RefAddress addr)
1094 			{
1095 				status = status_;
1096 				nbytes = nbytes_;
1097 				if (status_ == IOStatus.ok && peer_address) {
1098 					try *peer_address = NetworkAddress(addr);
1099 					catch (Exception e) logWarn("Failed to store datagram source address: %s", e.msg);
1100 				}
1101 			}
1102 		);
1103 
1104 		asyncAwaitAny!(true, waitable)(timeout);
1105 		enforce(!cancelled, "Receive timeout.");
1106 		enforce(status == IOStatus.ok, "Failed to receive packet.");
1107 		return buf[0 .. nbytes];
1108 	}
1109 }
1110 
1111 
1112 /**
1113 	Flags to control the behavior of listenTCP.
1114 */
1115 enum TCPListenOptions {
1116 	/// Don't enable any particular option
1117 	none = 0,
1118 	/// Deprecated: causes incoming connections to be distributed across the thread pool
1119 	distribute = 1<<0,
1120 	/// Disables automatic closing of the connection when the connection callback exits
1121 	disableAutoClose = 1<<1,
1122 	/** Enable port reuse on linux kernel version >=3.9, do nothing on other OS
1123 		Does not affect libasync driver because it is always enabled by libasync.
1124 	*/
1125 	reusePort = 1<<2,
1126 	/// Enable address reuse
1127 	reuseAddress = 1<<3,
1128 	///
1129 	defaults = reuseAddress
1130 }
1131 
1132 private pure nothrow {
1133 	import std.bitmanip;
1134 
1135 	ushort ntoh(ushort val)
1136 	{
1137 		version (LittleEndian) return swapEndian(val);
1138 		else version (BigEndian) return val;
1139 		else static assert(false, "Unknown endianness.");
1140 	}
1141 
1142 	ushort hton(ushort val)
1143 	{
1144 		version (LittleEndian) return swapEndian(val);
1145 		else version (BigEndian) return val;
1146 		else static assert(false, "Unknown endianness.");
1147 	}
1148 }
1149 
1150 private enum tracer = "";
1151 
1152 
1153 /// Thrown by TCPConnection read-alike operations when timeout is reached.
1154 class ReadTimeoutException: Exception
1155 {
1156 	@safe pure nothrow this(string message,
1157 							Throwable next,
1158 							string file =__FILE__,
1159 							size_t line = __LINE__)
1160 	{
1161 		super(message, next, file, line);
1162 	}
1163 
1164 	@safe pure nothrow this(string message,
1165 							string file =__FILE__,
1166 							size_t line = __LINE__,
1167 							Throwable next = null)
1168 	{
1169 		super(message, file, line, next);
1170 	}
1171 }
1172 
1173 
1174 // check whether the given name is not a valid host name, but may instead
1175 // be a valid IP address. This is used as a quick check to avoid
1176 // unnecessary address parsing or DNS queries
1177 private bool isMaybeIPAddress(in char[] name)
1178 {
1179 	import std.algorithm.searching : all, canFind;
1180 	import std.ascii : isDigit;
1181 
1182 	// could be an IPv6 address, but ':' is invalid in host names
1183 	if (name.canFind(':')) return true;
1184 
1185 	// an IPv4 address is at least 7 characters (0.0.0.0)
1186 	if (name.length < 7) return false;
1187 
1188 	// no valid TLD consists of only digits
1189 	return name.canFind('.') && name.all!(ch => ch.isDigit || ch == '.');
1190 }
1191 
1192 unittest {
1193 	assert(isMaybeIPAddress("0.0.0.0"));
1194 	assert(isMaybeIPAddress("::1"));
1195 	assert(isMaybeIPAddress("aabb::1f"));
1196 	assert(!isMaybeIPAddress("example.com"));
1197 	assert(!isMaybeIPAddress("12.com"));
1198 	assert(!isMaybeIPAddress("1.1.1.t12"));
1199 }