1 /**
2 	Generic connection pool for reusing persistent connections across fibers.
3 
4 	Copyright: © 2012-2016 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.core.connectionpool;
9 
10 import vibe.core.log;
11 
12 import core.thread;
13 import vibe.core.sync;
14 import vibe.internal.freelistref;
15 
16 /**
17 	Generic connection pool class.
18 
19 	The connection pool is creating connections using the supplied factory
20 	function as needed whenever `lockConnection` is called. Connections are
21 	associated to the calling fiber, as long as any copy of the returned
22 	`LockedConnection` object still exists. Connections that are not associated
23 	to any fiber will be kept in a pool of open connections for later reuse.
24 
25 	Note that, after retrieving a connection with `lockConnection`, the caller
26 	has to make sure that the connection is actually physically open and to
27 	reopen it if necessary. The `ConnectionPool` class has no knowledge of the
28 	internals of the connection objects.
29 */
30 final class ConnectionPool(Connection)
31 {
32 	private {
33 		Connection delegate() @safe m_connectionFactory;
34 		Connection[] m_connections;
35 		int[const(Connection)] m_lockCount;
36 		FreeListRef!LocalTaskSemaphore m_sem;
37 		debug Thread m_thread;
38 	}
39 
40 	this(Connection delegate() @safe connection_factory, uint max_concurrent = uint.max)
41 	{
42 		m_connectionFactory = connection_factory;
43 		() @trusted { m_sem = FreeListRef!LocalTaskSemaphore(max_concurrent); } ();
44 		debug m_thread = () @trusted { return Thread.getThis(); } ();
45 	}
46 
47 	deprecated("Use an @safe callback instead")
48 	this(Connection delegate() connection_factory, uint max_concurrent = uint.max)
49 	@system {
50 		this(cast(Connection delegate() @safe)connection_factory, max_concurrent);
51 	}
52 
53 	/** Determines the maximum number of concurrently open connections.
54 
55 		Attempting to lock more connections that this number will cause the
56 		calling fiber to be blocked until one of the locked connections
57 		becomes available for reuse.
58 	*/
59 	@property void maxConcurrency(uint max_concurrent) {
60 		m_sem.maxLocks = max_concurrent;
61 	}
62 	/// ditto
63 	@property uint maxConcurrency() {
64 		return m_sem.maxLocks;
65 	}
66 
67 	/** Retrieves a connection to temporarily associate with the calling fiber.
68 
69 		The returned `LockedConnection` object uses RAII and reference counting
70 		to determine when to unlock the connection.
71 	*/
72 	LockedConnection!Connection lockConnection()
73 	@safe {
74 		debug assert(m_thread is () @trusted { return Thread.getThis(); } (), "ConnectionPool was called from a foreign thread!");
75 
76 		() @trusted { m_sem.lock(); } ();
77 		scope (failure) () @trusted { m_sem.unlock(); } ();
78 
79 		size_t cidx = size_t.max;
80 		foreach( i, c; m_connections ){
81 			auto plc = c in m_lockCount;
82 			if( !plc || *plc == 0 ){
83 				cidx = i;
84 				break;
85 			}
86 		}
87 
88 		Connection conn;
89 		if( cidx != size_t.max ){
90 			logTrace("returning %s connection %d of %d", Connection.stringof, cidx, m_connections.length);
91 			conn = m_connections[cidx];
92 		} else {
93 			logDebug("creating new %s connection, all %d are in use", Connection.stringof, m_connections.length);
94 			conn = m_connectionFactory(); // NOTE: may block
95 			static if (is(typeof(cast(void*)conn)))
96 				logDebug(" ... %s", () @trusted { return cast(void*)conn; } ());
97 		}
98 		m_lockCount[conn] = 1;
99 		if( cidx == size_t.max ){
100 			m_connections ~= conn;
101 			logDebug("Now got %d connections", m_connections.length);
102 		}
103 		auto ret = LockedConnection!Connection(this, conn);
104 		return ret;
105 	}
106 
107 	/** Removes all currently unlocked connections from the pool.
108 
109 		Params:
110 			disconnect_callback = Gets called for every removed connection to
111 				allow closing connections and freeing associated resources.
112 	*/
113 	void removeUnused(scope void delegate(Connection conn) @safe nothrow disconnect_callback)
114 	{
115 		Connection[] remaining_conns, removed_conns;
116 		foreach (c; m_connections) {
117 			if (m_lockCount.get(c, 0) > 0)
118 				remaining_conns ~= c;
119 			else
120 				removed_conns ~= c;
121 		}
122 
123 		m_connections = remaining_conns;
124 
125 		foreach (c; removed_conns)
126 			disconnect_callback(c);
127 	}
128 }
129 
130 ///
131 unittest {
132 	class Connection {
133 		void write() {}
134 	}
135 
136 	auto pool = new ConnectionPool!Connection({
137 		return new Connection; // perform the connection here
138 	});
139 
140 	// create and lock a first connection
141 	auto c1 = pool.lockConnection();
142 	c1.write();
143 
144 	// create and lock a second connection
145 	auto c2 = pool.lockConnection();
146 	c2.write();
147 
148 	// writing to c1 will still write to the first connection
149 	c1.write();
150 
151 	// free up the reference to the first connection, so that it can be reused
152 	destroy(c1);
153 
154 	// locking a new connection will reuse the first connection now instead of creating a new one
155 	auto c3 = pool.lockConnection();
156 	c3.write();
157 }
158 
159 unittest { // issue vibe-d#2109
160 	import vibe.core.net : TCPConnection, connectTCP;
161 	new ConnectionPool!TCPConnection({ return connectTCP("127.0.0.1", 8080); });
162 }
163 
164 unittest { // removeUnused
165 	class Connection {}
166 
167 	auto pool = new ConnectionPool!Connection({
168 		return new Connection; // perform the connection here
169 	});
170 
171 	auto c1 = pool.lockConnection();
172 	auto c1i = c1.__conn;
173 
174 	auto c2 = pool.lockConnection();
175 	auto c2i = c2.__conn;
176 
177 
178 	assert(pool.m_connections == [c1i, c2i]);
179 
180 	c2 = LockedConnection!Connection.init;
181 	pool.removeUnused((c) { assert(c is c2i); });
182 	assert(pool.m_connections == [c1i]);
183 
184 	c1 = LockedConnection!Connection.init;
185 	pool.removeUnused((c) { assert(c is c1i); });
186 	assert(pool.m_connections == []);
187 }
188 
189 
190 struct LockedConnection(Connection) {
191 	import vibe.core.task : Task;
192 
193 	private {
194 		ConnectionPool!Connection m_pool;
195 		Task m_task;
196 		Connection m_conn;
197 		debug uint m_magic = 0xB1345AC2;
198 	}
199 
200 	@safe:
201 
202 	private this(ConnectionPool!Connection pool, Connection conn)
203 	{
204 		assert(!!conn);
205 		m_pool = pool;
206 		m_conn = conn;
207 		m_task = Task.getThis();
208 	}
209 
210 	this(this)
211 	{
212 		debug assert(m_magic == 0xB1345AC2, "LockedConnection value corrupted.");
213 		if (!!m_conn) {
214 			auto fthis = Task.getThis();
215 			assert(fthis is m_task);
216 			m_pool.m_lockCount[m_conn]++;
217 			static if (is(typeof(cast(void*)conn)))
218 				logTrace("conn %s copy %d", () @trusted { return cast(void*)m_conn; } (), m_pool.m_lockCount[m_conn]);
219 		}
220 	}
221 
222 	~this()
223 	{
224 		debug assert(m_magic == 0xB1345AC2, "LockedConnection value corrupted.");
225 		if (!!m_conn) {
226 			auto fthis = Task.getThis();
227 			assert(fthis is m_task, "Locked connection destroyed in foreign task.");
228 			auto plc = m_conn in m_pool.m_lockCount;
229 			assert(plc !is null);
230 			assert(*plc >= 1);
231 			//logTrace("conn %s destroy %d", cast(void*)m_conn, *plc-1);
232 			if( --*plc == 0 ){
233 				() @trusted { m_pool.m_sem.unlock(); } ();
234 				//logTrace("conn %s release", cast(void*)m_conn);
235 			}
236 			m_conn = Connection.init;
237 		}
238 	}
239 
240 
241 	@property int __refCount() const { return m_pool.m_lockCount.get(m_conn, 0); }
242 	@property inout(Connection) __conn() inout { return m_conn; }
243 
244 	alias __conn this;
245 }