1 /**
2 	Generic connection pool for reusing persistent connections across fibers.
3 
4 	Copyright: © 2012-2016 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.core.connectionpool;
9 
10 import vibe.core.log;
11 
12 import core.thread;
13 import vibe.core.sync;
14 import vibe.internal.freelistref;
15 
16 /**
17 	Generic connection pool class.
18 
19 	The connection pool is creating connections using the supplied factory
20 	function as needed whenever `lockConnection` is called. Connections are
21 	associated to the calling fiber, as long as any copy of the returned
22 	`LockedConnection` object still exists. Connections that are not associated
23 	to any fiber will be kept in a pool of open connections for later reuse.
24 
25 	Note that, after retrieving a connection with `lockConnection`, the caller
26 	has to make sure that the connection is actually physically open and to
27 	reopen it if necessary. The `ConnectionPool` class has no knowledge of the
28 	internals of the connection objects.
29 */
30 final class ConnectionPool(Connection)
31 {
32 	private {
33 		Connection delegate() @safe m_connectionFactory;
34 		Connection[] m_connections;
35 		int[const(Connection)] m_lockCount;
36 		FreeListRef!LocalTaskSemaphore m_sem;
37 		debug Thread m_thread;
38 	}
39 
40 	this(Connection delegate() @safe connection_factory, uint max_concurrent = uint.max)
41 	{
42 		m_connectionFactory = connection_factory;
43 		() @trusted { m_sem = FreeListRef!LocalTaskSemaphore(max_concurrent); } ();
44 		debug m_thread = () @trusted { return Thread.getThis(); } ();
45 	}
46 
47 	deprecated("Use an @safe callback instead")
48 	this(Connection delegate() connection_factory, uint max_concurrent = uint.max)
49 	@system {
50 		this(cast(Connection delegate() @safe)connection_factory, max_concurrent);
51 	}
52 
53 	/** Determines the maximum number of concurrently open connections.
54 
55 		Attempting to lock more connections that this number will cause the
56 		calling fiber to be blocked until one of the locked connections
57 		becomes available for reuse.
58 	*/
59 	@property void maxConcurrency(uint max_concurrent) {
60 		m_sem.maxLocks = max_concurrent;
61 	}
62 	/// ditto
63 	@property uint maxConcurrency() nothrow {
64 		return m_sem.maxLocks;
65 	}
66 
67 	/** Retrieves a connection to temporarily associate with the calling fiber.
68 
69 		The returned `LockedConnection` object uses RAII and reference counting
70 		to determine when to unlock the connection.
71 	*/
72 	LockedConnection!Connection lockConnection()
73 	@safe {
74 		debug assert(m_thread is () @trusted { return Thread.getThis(); } (), "ConnectionPool was called from a foreign thread!");
75 
76 		() @trusted { m_sem.lock(); } ();
77 		scope (failure) () @trusted { m_sem.unlock(); } ();
78 
79 		size_t cidx = size_t.max;
80 		foreach( i, c; m_connections ){
81 			auto plc = c in m_lockCount;
82 			if( !plc || *plc == 0 ){
83 				cidx = i;
84 				break;
85 			}
86 		}
87 
88 		Connection conn;
89 		if( cidx != size_t.max ){
90 			logTrace("returning %s connection %d of %d", Connection.stringof, cidx, m_connections.length);
91 			conn = m_connections[cidx];
92 		} else {
93 			logDebug("creating new %s connection, all %d are in use", Connection.stringof, m_connections.length);
94 			conn = m_connectionFactory(); // NOTE: may block
95 			static if (is(typeof(cast(void*)conn)))
96 				logDebug(" ... %s", () @trusted { return cast(void*)conn; } ());
97 		}
98 		m_lockCount[conn] = 1;
99 		if( cidx == size_t.max ){
100 			m_connections ~= conn;
101 			logDebug("Now got %d connections", m_connections.length);
102 		}
103 		auto ret = LockedConnection!Connection(this, conn);
104 		return ret;
105 	}
106 
107 	/** Removes all currently unlocked connections from the pool.
108 
109 		Params:
110 			disconnect_callback = Gets called for every removed connection to
111 				allow closing connections and freeing associated resources.
112 	*/
113 	void removeUnused(scope void delegate(Connection conn) @safe nothrow disconnect_callback)
114 	{
115 		Connection[] remaining_conns, removed_conns;
116 		foreach (c; m_connections) {
117 			if (m_lockCount.get(c, 0) > 0)
118 				remaining_conns ~= c;
119 			else
120 				removed_conns ~= c;
121 		}
122 
123 		m_connections = remaining_conns;
124 
125 		foreach (c; removed_conns)
126 			disconnect_callback(c);
127 	}
128 
129 	/** Removes an existing connection from the pool
130 		It can be called with a locked connection, same connection
131 		can be added back to the pool anytime. Any fibers that hold
132 		a lock on this connection will keep behaving as expected.
133 
134 		Params:
135 			conn = connection to remove from the pool
136 	*/
137 	void remove(Connection conn) @safe
138 	{
139 		foreach (idx, c; m_connections)
140 			if (c is conn)
141 			{
142 				m_connections = m_connections[0 .. idx] ~ m_connections[idx + 1 .. $];
143 				auto plc = conn in m_lockCount;
144 				assert(plc !is null);
145 				assert(*plc >= 0);
146 				if (*plc > 0)
147 					*plc *= -1; // invert the plc to signal LockedConnection that this connection is no longer in the pool
148 				else
149 					m_lockCount.remove(conn);
150 				return;
151 			}
152 		assert(0, "Removing non existing conn");
153 	}
154 
155 	/** Add a connection to the pool explicitly
156 
157 		Params:
158 			conn = new connection to add to the pool
159 
160 		Returns:
161 			success/failure
162 	*/
163 	bool add(Connection conn) @safe nothrow
164 	{
165 		if (m_connections.length < this.maxConcurrency)
166 		{
167 			auto plc = conn in m_lockCount;
168 			if (plc is null)
169 				m_lockCount[conn] = 0;
170 			else if (*plc < 0)
171 				*plc *= -1; // invert the plc back to positive
172 			m_connections ~= conn;
173 			return true;
174 		}
175 		return false;
176 	}
177 }
178 
179 ///
180 unittest {
181 	class Connection {
182 		void write() {}
183 	}
184 
185 	auto pool = new ConnectionPool!Connection({
186 		return new Connection; // perform the connection here
187 	});
188 
189 	// create and lock a first connection
190 	auto c1 = pool.lockConnection();
191 	c1.write();
192 
193 	// create and lock a second connection
194 	auto c2 = pool.lockConnection();
195 	c2.write();
196 
197 	// writing to c1 will still write to the first connection
198 	c1.write();
199 
200 	// free up the reference to the first connection, so that it can be reused
201 	destroy(c1);
202 
203 	// locking a new connection will reuse the first connection now instead of creating a new one
204 	auto c3 = pool.lockConnection();
205 	c3.write();
206 }
207 
208 unittest { // issue vibe-d#2109
209 	import vibe.core.net : TCPConnection, connectTCP;
210 	new ConnectionPool!TCPConnection({ return connectTCP("127.0.0.1", 8080); });
211 }
212 
213 unittest { // removeUnused
214 	class Connection {}
215 
216 	auto pool = new ConnectionPool!Connection({
217 		return new Connection; // perform the connection here
218 	});
219 
220 	auto c1 = pool.lockConnection();
221 	auto c1i = c1.__conn;
222 
223 	auto c2 = pool.lockConnection();
224 	auto c2i = c2.__conn;
225 
226 
227 	assert(pool.m_connections == [c1i, c2i]);
228 
229 	c2 = LockedConnection!Connection.init;
230 	pool.removeUnused((c) { assert(c is c2i); });
231 	assert(pool.m_connections == [c1i]);
232 
233 	c1 = LockedConnection!Connection.init;
234 	pool.removeUnused((c) { assert(c is c1i); });
235 	assert(pool.m_connections == []);
236 }
237 
238 
239 struct LockedConnection(Connection) {
240 	import vibe.core.task : Task;
241 
242 	private {
243 		ConnectionPool!Connection m_pool;
244 		Task m_task;
245 		Connection m_conn;
246 		debug uint m_magic = 0xB1345AC2;
247 	}
248 
249 	@safe:
250 
251 	private this(ConnectionPool!Connection pool, Connection conn)
252 	{
253 		assert(!!conn);
254 		m_pool = pool;
255 		m_conn = conn;
256 		m_task = Task.getThis();
257 	}
258 
259 	this(this)
260 	{
261 		debug assert(m_magic == 0xB1345AC2, "LockedConnection value corrupted.");
262 		if (!!m_conn) {
263 			auto fthis = Task.getThis();
264 			assert(fthis is m_task);
265 			m_pool.m_lockCount[m_conn]++;
266 			static if (is(typeof(cast(void*)conn)))
267 				logTrace("conn %s copy %d", () @trusted { return cast(void*)m_conn; } (), m_pool.m_lockCount[m_conn]);
268 		}
269 	}
270 
271 	~this()
272 	{
273 		debug assert(m_magic == 0xB1345AC2, "LockedConnection value corrupted.");
274 		if (!!m_conn) {
275 			auto fthis = Task.getThis();
276 			assert(fthis is m_task, "Locked connection destroyed in foreign task.");
277 			auto plc = m_conn in m_pool.m_lockCount;
278 			assert(plc !is null);
279 			assert(*plc != 0);
280 			//logTrace("conn %s destroy %d", cast(void*)m_conn, *plc-1);
281 			if( *plc > 0 && --*plc == 0 ){
282 				() @trusted { m_pool.m_sem.unlock(); } ();
283 				//logTrace("conn %s release", cast(void*)m_conn);
284 			}
285 			else if (*plc < 0 && ++*plc == 0) // connection was removed from the pool and no lock remains on it
286 			{
287 				m_pool.m_lockCount.remove(m_conn);
288 			}
289 			m_conn = Connection.init;
290 		}
291 	}
292 
293 
294 	@property int __refCount() const { return m_pool.m_lockCount.get(m_conn, 0); }
295 	@property inout(Connection) __conn() inout { return m_conn; }
296 
297 	alias __conn this;
298 }
299 
300 ///
301 unittest {
302 	int id = 0;
303 	class Connection {
304 		public int id;
305 	}
306 
307 	auto pool = new ConnectionPool!Connection({
308 		auto conn = new Connection(); // perform the connection here
309 		conn.id = id++;
310 		return conn;
311 	});
312 
313 	// create and lock a first connection
314 	auto c1 = pool.lockConnection();
315 	assert(c1.id == 0);
316 	pool.remove(c1);
317 	destroy(c1);
318 
319 	auto c2 = pool.lockConnection();
320 	assert(c2.id == 1); // assert that we got a new connection
321 	pool.remove(c2);
322 	pool.add(c2);
323 	destroy(c2);
324 
325 	auto c3 = pool.lockConnection();
326 	assert(c3.id == 1); // should get the same connection back
327 }