1 /** 2 Generic connection pool for reusing persistent connections across fibers. 3 4 Copyright: © 2012-2016 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.core.connectionpool; 9 10 import vibe.core.log; 11 12 import core.thread; 13 import vibe.core.sync; 14 import vibe.internal.freelistref; 15 16 /** 17 Generic connection pool class. 18 19 The connection pool is creating connections using the supplied factory 20 function as needed whenever `lockConnection` is called. Connections are 21 associated to the calling fiber, as long as any copy of the returned 22 `LockedConnection` object still exists. Connections that are not associated 23 to any fiber will be kept in a pool of open connections for later reuse. 24 25 Note that, after retrieving a connection with `lockConnection`, the caller 26 has to make sure that the connection is actually physically open and to 27 reopen it if necessary. The `ConnectionPool` class has no knowledge of the 28 internals of the connection objects. 29 */ 30 final class ConnectionPool(Connection) 31 { 32 private { 33 Connection delegate() @safe m_connectionFactory; 34 Connection[] m_connections; 35 int[const(Connection)] m_lockCount; 36 FreeListRef!LocalTaskSemaphore m_sem; 37 debug Thread m_thread; 38 } 39 40 this(Connection delegate() @safe connection_factory, uint max_concurrent = uint.max) 41 { 42 m_connectionFactory = connection_factory; 43 () @trusted { m_sem = FreeListRef!LocalTaskSemaphore(max_concurrent); } (); 44 debug m_thread = () @trusted { return Thread.getThis(); } (); 45 } 46 47 deprecated("Use an @safe callback instead") 48 this(Connection delegate() connection_factory, uint max_concurrent = uint.max) 49 @system { 50 this(cast(Connection delegate() @safe)connection_factory, max_concurrent); 51 } 52 53 /** Determines the maximum number of concurrently open connections. 54 55 Attempting to lock more connections that this number will cause the 56 calling fiber to be blocked until one of the locked connections 57 becomes available for reuse. 58 */ 59 @property void maxConcurrency(uint max_concurrent) { 60 m_sem.maxLocks = max_concurrent; 61 } 62 /// ditto 63 @property uint maxConcurrency() nothrow { 64 return m_sem.maxLocks; 65 } 66 67 /** Retrieves a connection to temporarily associate with the calling fiber. 68 69 The returned `LockedConnection` object uses RAII and reference counting 70 to determine when to unlock the connection. 71 */ 72 LockedConnection!Connection lockConnection() 73 @safe { 74 debug assert(m_thread is () @trusted { return Thread.getThis(); } (), "ConnectionPool was called from a foreign thread!"); 75 76 () @trusted { m_sem.lock(); } (); 77 scope (failure) () @trusted { m_sem.unlock(); } (); 78 79 size_t cidx = size_t.max; 80 foreach( i, c; m_connections ){ 81 auto plc = c in m_lockCount; 82 if( !plc || *plc == 0 ){ 83 cidx = i; 84 break; 85 } 86 } 87 88 Connection conn; 89 if( cidx != size_t.max ){ 90 logTrace("returning %s connection %d of %d", Connection.stringof, cidx, m_connections.length); 91 conn = m_connections[cidx]; 92 } else { 93 logDebug("creating new %s connection, all %d are in use", Connection.stringof, m_connections.length); 94 conn = m_connectionFactory(); // NOTE: may block 95 static if (is(typeof(cast(void*)conn))) 96 logDebug(" ... %s", () @trusted { return cast(void*)conn; } ()); 97 } 98 m_lockCount[conn] = 1; 99 if( cidx == size_t.max ){ 100 m_connections ~= conn; 101 logDebug("Now got %d connections", m_connections.length); 102 } 103 auto ret = LockedConnection!Connection(this, conn); 104 return ret; 105 } 106 107 /** Removes all currently unlocked connections from the pool. 108 109 Params: 110 disconnect_callback = Gets called for every removed connection to 111 allow closing connections and freeing associated resources. 112 */ 113 void removeUnused(scope void delegate(Connection conn) @safe nothrow disconnect_callback) 114 { 115 Connection[] remaining_conns, removed_conns; 116 foreach (c; m_connections) { 117 if (m_lockCount.get(c, 0) > 0) 118 remaining_conns ~= c; 119 else 120 removed_conns ~= c; 121 } 122 123 m_connections = remaining_conns; 124 125 foreach (c; removed_conns) 126 disconnect_callback(c); 127 } 128 129 /** Removes an existing connection from the pool 130 It can be called with a locked connection, same connection 131 can be added back to the pool anytime. Any fibers that hold 132 a lock on this connection will keep behaving as expected. 133 134 Params: 135 conn = connection to remove from the pool 136 */ 137 void remove(Connection conn) @safe 138 { 139 foreach (idx, c; m_connections) 140 if (c is conn) 141 { 142 m_connections = m_connections[0 .. idx] ~ m_connections[idx + 1 .. $]; 143 auto plc = conn in m_lockCount; 144 assert(plc !is null); 145 assert(*plc >= 0); 146 if (*plc > 0) 147 *plc *= -1; // invert the plc to signal LockedConnection that this connection is no longer in the pool 148 else 149 m_lockCount.remove(conn); 150 return; 151 } 152 assert(0, "Removing non existing conn"); 153 } 154 155 /** Add a connection to the pool explicitly 156 157 Params: 158 conn = new connection to add to the pool 159 160 Returns: 161 success/failure 162 */ 163 bool add(Connection conn) @safe nothrow 164 { 165 if (m_connections.length < this.maxConcurrency) 166 { 167 auto plc = conn in m_lockCount; 168 if (plc is null) 169 m_lockCount[conn] = 0; 170 else if (*plc < 0) 171 *plc *= -1; // invert the plc back to positive 172 m_connections ~= conn; 173 return true; 174 } 175 return false; 176 } 177 } 178 179 /// 180 unittest { 181 class Connection { 182 void write() {} 183 } 184 185 auto pool = new ConnectionPool!Connection({ 186 return new Connection; // perform the connection here 187 }); 188 189 // create and lock a first connection 190 auto c1 = pool.lockConnection(); 191 c1.write(); 192 193 // create and lock a second connection 194 auto c2 = pool.lockConnection(); 195 c2.write(); 196 197 // writing to c1 will still write to the first connection 198 c1.write(); 199 200 // free up the reference to the first connection, so that it can be reused 201 destroy(c1); 202 203 // locking a new connection will reuse the first connection now instead of creating a new one 204 auto c3 = pool.lockConnection(); 205 c3.write(); 206 } 207 208 unittest { // issue vibe-d#2109 209 import vibe.core.net : TCPConnection, connectTCP; 210 new ConnectionPool!TCPConnection({ return connectTCP("127.0.0.1", 8080); }); 211 } 212 213 unittest { // removeUnused 214 class Connection {} 215 216 auto pool = new ConnectionPool!Connection({ 217 return new Connection; // perform the connection here 218 }); 219 220 auto c1 = pool.lockConnection(); 221 auto c1i = c1.__conn; 222 223 auto c2 = pool.lockConnection(); 224 auto c2i = c2.__conn; 225 226 227 assert(pool.m_connections == [c1i, c2i]); 228 229 c2 = LockedConnection!Connection.init; 230 pool.removeUnused((c) { assert(c is c2i); }); 231 assert(pool.m_connections == [c1i]); 232 233 c1 = LockedConnection!Connection.init; 234 pool.removeUnused((c) { assert(c is c1i); }); 235 assert(pool.m_connections == []); 236 } 237 238 239 struct LockedConnection(Connection) { 240 import vibe.core.task : Task; 241 242 private { 243 ConnectionPool!Connection m_pool; 244 Task m_task; 245 Connection m_conn; 246 debug uint m_magic = 0xB1345AC2; 247 } 248 249 @safe: 250 251 private this(ConnectionPool!Connection pool, Connection conn) 252 { 253 assert(!!conn); 254 m_pool = pool; 255 m_conn = conn; 256 m_task = Task.getThis(); 257 } 258 259 this(this) 260 { 261 debug assert(m_magic == 0xB1345AC2, "LockedConnection value corrupted."); 262 if (!!m_conn) { 263 auto fthis = Task.getThis(); 264 assert(fthis is m_task); 265 m_pool.m_lockCount[m_conn]++; 266 static if (is(typeof(cast(void*)conn))) 267 logTrace("conn %s copy %d", () @trusted { return cast(void*)m_conn; } (), m_pool.m_lockCount[m_conn]); 268 } 269 } 270 271 ~this() 272 { 273 debug assert(m_magic == 0xB1345AC2, "LockedConnection value corrupted."); 274 if (!!m_conn) { 275 auto fthis = Task.getThis(); 276 assert(fthis is m_task, "Locked connection destroyed in foreign task."); 277 auto plc = m_conn in m_pool.m_lockCount; 278 assert(plc !is null); 279 assert(*plc != 0); 280 //logTrace("conn %s destroy %d", cast(void*)m_conn, *plc-1); 281 if( *plc > 0 && --*plc == 0 ){ 282 () @trusted { m_pool.m_sem.unlock(); } (); 283 //logTrace("conn %s release", cast(void*)m_conn); 284 } 285 else if (*plc < 0 && ++*plc == 0) // connection was removed from the pool and no lock remains on it 286 { 287 m_pool.m_lockCount.remove(m_conn); 288 } 289 m_conn = Connection.init; 290 } 291 } 292 293 294 @property int __refCount() const { return m_pool.m_lockCount.get(m_conn, 0); } 295 @property inout(Connection) __conn() inout { return m_conn; } 296 297 alias __conn this; 298 } 299 300 /// 301 unittest { 302 int id = 0; 303 class Connection { 304 public int id; 305 } 306 307 auto pool = new ConnectionPool!Connection({ 308 auto conn = new Connection(); // perform the connection here 309 conn.id = id++; 310 return conn; 311 }); 312 313 // create and lock a first connection 314 auto c1 = pool.lockConnection(); 315 assert(c1.id == 0); 316 pool.remove(c1); 317 destroy(c1); 318 319 auto c2 = pool.lockConnection(); 320 assert(c2.id == 1); // assert that we got a new connection 321 pool.remove(c2); 322 pool.add(c2); 323 destroy(c2); 324 325 auto c3 = pool.lockConnection(); 326 assert(c3.id == 1); // should get the same connection back 327 }