1 /** 2 Generic connection pool for reusing persistent connections across fibers. 3 4 Copyright: © 2012-2016 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.core.connectionpool; 9 10 import vibe.core.log; 11 12 import core.thread; 13 import vibe.core.sync; 14 import vibe.internal.freelistref; 15 16 /** 17 Generic connection pool class. 18 19 The connection pool is creating connections using the supplied factory 20 function as needed whenever `lockConnection` is called. Connections are 21 associated to the calling fiber, as long as any copy of the returned 22 `LockedConnection` object still exists. Connections that are not associated 23 to any fiber will be kept in a pool of open connections for later reuse. 24 25 Note that, after retrieving a connection with `lockConnection`, the caller 26 has to make sure that the connection is actually physically open and to 27 reopen it if necessary. The `ConnectionPool` class has no knowledge of the 28 internals of the connection objects. 29 */ 30 final class ConnectionPool(Connection) 31 { 32 private { 33 Connection delegate() @safe m_connectionFactory; 34 Connection[] m_connections; 35 int[const(Connection)] m_lockCount; 36 FreeListRef!LocalTaskSemaphore m_sem; 37 debug Thread m_thread; 38 } 39 40 this(Connection delegate() @safe connection_factory, uint max_concurrent = uint.max) 41 { 42 m_connectionFactory = connection_factory; 43 () @trusted { m_sem = FreeListRef!LocalTaskSemaphore(max_concurrent); } (); 44 debug m_thread = () @trusted { return Thread.getThis(); } (); 45 } 46 47 deprecated("Use an @safe callback instead") 48 this(Connection delegate() connection_factory, uint max_concurrent = uint.max) 49 @system { 50 this(cast(Connection delegate() @safe)connection_factory, max_concurrent); 51 } 52 53 /** Determines the maximum number of concurrently open connections. 54 55 Attempting to lock more connections that this number will cause the 56 calling fiber to be blocked until one of the locked connections 57 becomes available for reuse. 58 */ 59 @property void maxConcurrency(uint max_concurrent) { 60 m_sem.maxLocks = max_concurrent; 61 } 62 /// ditto 63 @property uint maxConcurrency() { 64 return m_sem.maxLocks; 65 } 66 67 /** Retrieves a connection to temporarily associate with the calling fiber. 68 69 The returned `LockedConnection` object uses RAII and reference counting 70 to determine when to unlock the connection. 71 */ 72 LockedConnection!Connection lockConnection() 73 @safe { 74 debug assert(m_thread is () @trusted { return Thread.getThis(); } (), "ConnectionPool was called from a foreign thread!"); 75 76 () @trusted { m_sem.lock(); } (); 77 scope (failure) () @trusted { m_sem.unlock(); } (); 78 79 size_t cidx = size_t.max; 80 foreach( i, c; m_connections ){ 81 auto plc = c in m_lockCount; 82 if( !plc || *plc == 0 ){ 83 cidx = i; 84 break; 85 } 86 } 87 88 Connection conn; 89 if( cidx != size_t.max ){ 90 logTrace("returning %s connection %d of %d", Connection.stringof, cidx, m_connections.length); 91 conn = m_connections[cidx]; 92 } else { 93 logDebug("creating new %s connection, all %d are in use", Connection.stringof, m_connections.length); 94 conn = m_connectionFactory(); // NOTE: may block 95 static if (is(typeof(cast(void*)conn))) 96 logDebug(" ... %s", () @trusted { return cast(void*)conn; } ()); 97 } 98 m_lockCount[conn] = 1; 99 if( cidx == size_t.max ){ 100 m_connections ~= conn; 101 logDebug("Now got %d connections", m_connections.length); 102 } 103 auto ret = LockedConnection!Connection(this, conn); 104 return ret; 105 } 106 107 /** Removes all currently unlocked connections from the pool. 108 109 Params: 110 disconnect_callback = Gets called for every removed connection to 111 allow closing connections and freeing associated resources. 112 */ 113 void removeUnused(scope void delegate(Connection conn) @safe nothrow disconnect_callback) 114 { 115 Connection[] remaining_conns, removed_conns; 116 foreach (c; m_connections) { 117 if (m_lockCount.get(c, 0) > 0) 118 remaining_conns ~= c; 119 else 120 removed_conns ~= c; 121 } 122 123 m_connections = remaining_conns; 124 125 foreach (c; removed_conns) 126 disconnect_callback(c); 127 } 128 } 129 130 /// 131 unittest { 132 class Connection { 133 void write() {} 134 } 135 136 auto pool = new ConnectionPool!Connection({ 137 return new Connection; // perform the connection here 138 }); 139 140 // create and lock a first connection 141 auto c1 = pool.lockConnection(); 142 c1.write(); 143 144 // create and lock a second connection 145 auto c2 = pool.lockConnection(); 146 c2.write(); 147 148 // writing to c1 will still write to the first connection 149 c1.write(); 150 151 // free up the reference to the first connection, so that it can be reused 152 destroy(c1); 153 154 // locking a new connection will reuse the first connection now instead of creating a new one 155 auto c3 = pool.lockConnection(); 156 c3.write(); 157 } 158 159 unittest { // issue vibe-d#2109 160 import vibe.core.net : TCPConnection, connectTCP; 161 new ConnectionPool!TCPConnection({ return connectTCP("127.0.0.1", 8080); }); 162 } 163 164 unittest { // removeUnused 165 class Connection {} 166 167 auto pool = new ConnectionPool!Connection({ 168 return new Connection; // perform the connection here 169 }); 170 171 auto c1 = pool.lockConnection(); 172 auto c1i = c1.__conn; 173 174 auto c2 = pool.lockConnection(); 175 auto c2i = c2.__conn; 176 177 178 assert(pool.m_connections == [c1i, c2i]); 179 180 c2 = LockedConnection!Connection.init; 181 pool.removeUnused((c) { assert(c is c2i); }); 182 assert(pool.m_connections == [c1i]); 183 184 c1 = LockedConnection!Connection.init; 185 pool.removeUnused((c) { assert(c is c1i); }); 186 assert(pool.m_connections == []); 187 } 188 189 190 struct LockedConnection(Connection) { 191 import vibe.core.task : Task; 192 193 private { 194 ConnectionPool!Connection m_pool; 195 Task m_task; 196 Connection m_conn; 197 debug uint m_magic = 0xB1345AC2; 198 } 199 200 @safe: 201 202 private this(ConnectionPool!Connection pool, Connection conn) 203 { 204 assert(!!conn); 205 m_pool = pool; 206 m_conn = conn; 207 m_task = Task.getThis(); 208 } 209 210 this(this) 211 { 212 debug assert(m_magic == 0xB1345AC2, "LockedConnection value corrupted."); 213 if (!!m_conn) { 214 auto fthis = Task.getThis(); 215 assert(fthis is m_task); 216 m_pool.m_lockCount[m_conn]++; 217 static if (is(typeof(cast(void*)conn))) 218 logTrace("conn %s copy %d", () @trusted { return cast(void*)m_conn; } (), m_pool.m_lockCount[m_conn]); 219 } 220 } 221 222 ~this() 223 { 224 debug assert(m_magic == 0xB1345AC2, "LockedConnection value corrupted."); 225 if (!!m_conn) { 226 auto fthis = Task.getThis(); 227 assert(fthis is m_task, "Locked connection destroyed in foreign task."); 228 auto plc = m_conn in m_pool.m_lockCount; 229 assert(plc !is null); 230 assert(*plc >= 1); 231 //logTrace("conn %s destroy %d", cast(void*)m_conn, *plc-1); 232 if( --*plc == 0 ){ 233 () @trusted { m_pool.m_sem.unlock(); } (); 234 //logTrace("conn %s release", cast(void*)m_conn); 235 } 236 m_conn = Connection.init; 237 } 238 } 239 240 241 @property int __refCount() const { return m_pool.m_lockCount.get(m_conn, 0); } 242 @property inout(Connection) __conn() inout { return m_conn; } 243 244 alias __conn this; 245 }