1 /** 2 Multi-threaded task pool implementation. 3 4 Copyright: © 2012-2020 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.core.taskpool; 9 10 import vibe.core.concurrency : isWeaklyIsolated; 11 import vibe.core.core : exitEventLoop, isCallable, isMethod, isNothrowCallable, 12 isNothrowMethod, logicalProcessorCount, runEventLoop, runTask, runTask_internal; 13 import vibe.core.log; 14 import vibe.core.sync : ManualEvent, VibeSyncMonitor = Monitor, createSharedManualEvent, createMonitor; 15 import vibe.core.task : Task, TaskFuncInfo, TaskSettings, callWithMove; 16 import core.sync.mutex : Mutex; 17 import core.thread : Thread; 18 import std.traits : isFunctionPointer; 19 20 21 /** Implements a shared, multi-threaded task pool. 22 */ 23 shared final class TaskPool { 24 private { 25 struct State { 26 WorkerThread[] threads; 27 TaskQueue queue; 28 bool term; 29 } 30 VibeSyncMonitor!(State, shared(Mutex)) m_state; 31 shared(ManualEvent) m_signal; 32 immutable size_t m_threadCount; 33 } 34 35 /** Creates a new task pool with the specified number of threads. 36 37 Params: 38 thread_count = The number of worker threads to create 39 */ 40 this(size_t thread_count = logicalProcessorCount()) 41 @safe nothrow { 42 import std.format : format; 43 44 m_threadCount = thread_count; 45 m_signal = createSharedManualEvent(); 46 m_state = createMonitor!State(new shared Mutex); 47 48 with (m_state.lock) { 49 queue.setup(); 50 threads.length = thread_count; 51 foreach (i; 0 .. thread_count) { 52 WorkerThread thr; 53 () @trusted nothrow { 54 thr = new WorkerThread(this); 55 try thr.name = format("vibe-%s", i); 56 catch (Exception e) logException(e, "Failed to set worker thread name"); 57 thr.start(); 58 } (); 59 threads[i] = thr; 60 } 61 } 62 } 63 64 /** Returns the number of worker threads. 65 */ 66 @property size_t threadCount() const shared nothrow { return m_threadCount; } 67 68 /** Instructs all worker threads to terminate and waits until all have 69 finished. 70 */ 71 void terminate() 72 @safe nothrow { 73 m_state.lock.term = true; 74 m_signal.emit(); 75 76 while (true) { 77 WorkerThread th; 78 with (m_state.lock) 79 if (threads.length) { 80 th = threads[0]; 81 threads = threads[1 .. $]; 82 } 83 if (!th) break; 84 85 if (th is Thread.getThis()) 86 continue; 87 88 () @trusted { 89 try th.join(); 90 catch (Exception e) { 91 logWarn("Failed to wait for worker thread exit: %s", e.msg); 92 } 93 } (); 94 } 95 96 size_t cnt = m_state.lock.queue.length; 97 if (cnt > 0) logWarn("There were still %d worker tasks pending at exit.", cnt); 98 } 99 100 /** Instructs all worker threads to terminate as soon as all tasks have 101 been processed and waits for them to finish. 102 */ 103 void join() 104 @safe nothrow { 105 assert(false, "TODO!"); 106 } 107 108 /** Runs a new asynchronous task in a worker thread. 109 110 Only function pointers with weakly isolated arguments are allowed to be 111 able to guarantee thread-safety. 112 */ 113 void runTask(FT, ARGS...)(FT func, auto ref ARGS args) 114 if (isFunctionPointer!FT) 115 { 116 foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); 117 runTask_unsafe(TaskSettings.init, func, args); 118 } 119 /// ditto 120 void runTask(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args) 121 if (is(typeof(__traits(getMember, object, __traits(identifier, method))))) 122 { 123 foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); 124 auto func = &__traits(getMember, object, __traits(identifier, method)); 125 runTask_unsafe(TaskSettings.init, func, args); 126 } 127 /// ditto 128 void runTask(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args) 129 if (isFunctionPointer!FT) 130 { 131 foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); 132 runTask_unsafe(settings, func, args); 133 } 134 /// ditto 135 void runTask(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args) 136 if (is(typeof(__traits(getMember, object, __traits(identifier, method))))) 137 { 138 foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); 139 auto func = &__traits(getMember, object, __traits(identifier, method)); 140 runTask_unsafe(settings, func, args); 141 } 142 143 /** Runs a new asynchronous task in a worker thread, returning the task handle. 144 145 This function will yield and wait for the new task to be created and started 146 in the worker thread, then resume and return it. 147 148 Only function pointers with weakly isolated arguments are allowed to be 149 able to guarantee thread-safety. 150 */ 151 Task runTaskH(FT, ARGS...)(FT func, auto ref ARGS args) 152 if (isFunctionPointer!FT && isNothrowCallable!(FT, ARGS)) 153 { 154 foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); 155 156 // workaround for runWorkerTaskH to work when called outside of a task 157 if (Task.getThis() == Task.init) { 158 Task ret; 159 .runTask((FT func, ARGS args) nothrow { ret = doRunTaskH(TaskSettings.init, func, args); }, func, args).joinUninterruptible(); 160 return ret; 161 } else return doRunTaskH(TaskSettings.init, func, args); 162 } 163 /// ditto 164 Task runTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args) 165 if (isNothrowMethod!(shared(T), method, ARGS)) 166 { 167 static void wrapper()(shared(T) object, ref ARGS args) { 168 __traits(getMember, object, __traits(identifier, method))(args); 169 } 170 return runTaskH(&wrapper!(), object, args); 171 } 172 /// ditto 173 Task runTaskH(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args) 174 if (isFunctionPointer!FT && isNothrowCallable!(FT, ARGS)) 175 { 176 foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); 177 178 // workaround for runWorkerTaskH to work when called outside of a task 179 if (Task.getThis() == Task.init) { 180 Task ret; 181 .runTask((TaskSettings settings, FT func, ARGS args) nothrow { ret = doRunTaskH(settings, func, args); }, settings, func, args).joinUninterruptible(); 182 return ret; 183 } else return doRunTaskH(settings, func, args); 184 } 185 /// ditto 186 Task runTaskH(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args) 187 if (isNothrowMethod!(shared(T), method, ARGS)) 188 { 189 static void wrapper()(shared(T) object, ref ARGS args) { 190 __traits(getMember, object, __traits(identifier, method))(args); 191 } 192 return runTaskH(settings, &wrapper!(), object, args); 193 } 194 /// ditto 195 deprecated("The `func` argument should be `nothrow`.") 196 Task runTaskH(FT, ARGS...)(FT func, auto ref ARGS args) 197 if (isFunctionPointer!FT && isCallable!(FT, ARGS) && !isNothrowCallable!(FT, ARGS)) 198 { 199 foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); 200 201 // workaround for runWorkerTaskH to work when called outside of a task 202 if (Task.getThis() == Task.init) { 203 Task ret; 204 .runTask({ ret = doRunTaskH(TaskSettings.init, func, args); }).join(); 205 return ret; 206 } else return doRunTaskH(TaskSettings.init, func, args); 207 } 208 /// ditto 209 deprecated("The `method` argument should be `nothrow`.") 210 Task runTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args) 211 if (isMethod!(shared(T), method, ARGS) && !isNothrowMethod!(shared(T), method, ARGS)) 212 { 213 static void wrapper()(shared(T) object, ref ARGS args) { 214 __traits(getMember, object, __traits(identifier, method))(args); 215 } 216 return runTaskH(&wrapper!(), object, args); 217 } 218 /// ditto 219 deprecated("The `func` argument should be `nothrow`.") 220 Task runTaskH(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args) 221 if (isFunctionPointer!FT && isCallable!(FT, ARGS) && !isNothrowCallable!(FT, ARGS)) 222 { 223 foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); 224 225 // workaround for runWorkerTaskH to work when called outside of a task 226 if (Task.getThis() == Task.init) { 227 Task ret; 228 .runTask({ ret = doRunTaskH(settings, func, args); }).join(); 229 return ret; 230 } else return doRunTaskH(settings, func, args); 231 } 232 /// ditto 233 deprecated("The `method` argument should be `nothrow`.") 234 Task runTaskH(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args) 235 if (isMethod!(shared(T), method, ARGS) && !isNothrowMethod!(shared(T), method, ARGS)) 236 { 237 static void wrapper()(shared(T) object, ref ARGS args) { 238 __traits(getMember, object, __traits(identifier, method))(args); 239 } 240 return runTaskH(settings, &wrapper!(), object, args); 241 } 242 243 // NOTE: needs to be a separate function to avoid recursion for the 244 // workaround above, which breaks @safe inference 245 private Task doRunTaskH(FT, ARGS...)(TaskSettings settings, FT func, ref ARGS args) 246 if (isFunctionPointer!FT) 247 { 248 import std.typecons : Typedef; 249 import vibe.core.channel : Channel, createChannel; 250 251 foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); 252 253 alias PrivateTask = Typedef!(Task, Task.init, __PRETTY_FUNCTION__); 254 255 auto ch = createChannel!Task(); 256 257 static void taskFun(Channel!Task ch, FT func, ARGS args) { 258 try ch.put(Task.getThis()); 259 catch (Exception e) assert(false, e.msg); 260 mixin(callWithMove!ARGS("func", "args")); 261 } 262 runTask_unsafe(settings, &taskFun, ch, func, args); 263 264 Task ret; 265 try ret = ch.consumeOne(); 266 catch (Exception e) assert(false, "Failed to reveice task handle: " ~ e.msg); 267 ch.close(); 268 return ret; 269 } 270 271 272 /** Runs a new asynchronous task in all worker threads concurrently. 273 274 This function is mainly useful for long-living tasks that distribute their 275 work across all CPU cores. Only function pointers with weakly isolated 276 arguments are allowed to be able to guarantee thread-safety. 277 278 The number of tasks started is guaranteed to be equal to 279 `threadCount`. 280 */ 281 void runTaskDist(FT, ARGS...)(FT func, auto ref ARGS args) 282 if (isFunctionPointer!FT && isNothrowCallable!(FT, ARGS)) 283 { 284 foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); 285 runTaskDist_unsafe(TaskSettings.init, func, args); 286 } 287 /// ditto 288 void runTaskDist(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args) 289 if (isNothrowMethod!(shared(T), method, ARGS)) 290 { 291 auto func = &__traits(getMember, object, __traits(identifier, method)); 292 foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); 293 294 runTaskDist_unsafe(TaskSettings.init, func, args); 295 } 296 /// ditto 297 void runTaskDist(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args) 298 if (isFunctionPointer!FT && isNothrowCallable!(FT, ARGS)) 299 { 300 foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); 301 runTaskDist_unsafe(settings, func, args); 302 } 303 /// ditto 304 void runTaskDist(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args) 305 if (isNothrowMethod!(shared(T), method, ARGS)) 306 { 307 auto func = &__traits(getMember, object, __traits(identifier, method)); 308 foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); 309 310 runTaskDist_unsafe(settings, func, args); 311 } 312 /// ditto 313 deprecated("The `func` argument should be `nothrow`.") 314 void runTaskDist(FT, ARGS...)(FT func, auto ref ARGS args) 315 if (isFunctionPointer!FT && isCallable!(FT, ARGS) && !isNothrowCallable!(FT, ARGS)) 316 { 317 foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); 318 runTaskDist_unsafe(TaskSettings.init, func, args); 319 } 320 /// ditto 321 deprecated("The `method` argument should be `nothrow`.") 322 void runTaskDist(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args) 323 if (isMethod!(shared(T), method, ARGS) && !isNothrowMethod!(shared(T), method, ARGS)) 324 { 325 auto func = &__traits(getMember, object, __traits(identifier, method)); 326 foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); 327 328 runTaskDist_unsafe(TaskSettings.init, func, args); 329 } 330 /// ditto 331 deprecated("The `func` argument should be `nothrow`.") 332 void runTaskDist(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args) 333 if (isFunctionPointer!FT && isCallable!(FT, ARGS) && !isNothrowCallable!(FT, ARGS)) 334 { 335 foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); 336 runTaskDist_unsafe(settings, func, args); 337 } 338 /// ditto 339 deprecated("The `method` argument should be `nothrow`.") 340 void runTaskDist(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args) 341 if (isMethod!(shared(T), method, ARGS) && !isNothrowMethod!(shared(T), method, ARGS)) 342 { 343 auto func = &__traits(getMember, object, __traits(identifier, method)); 344 foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); 345 346 runTaskDist_unsafe(settings, func, args); 347 } 348 349 /** Runs a new asynchronous task in all worker threads and returns the handles. 350 351 `on_handle` is an alias to a callble that takes a `Task` as its only 352 argument and is called for every task instance that gets created. 353 354 See_also: `runTaskDist` 355 */ 356 void runTaskDistH(HCB, FT, ARGS...)(scope HCB on_handle, FT func, auto ref ARGS args) 357 if (!is(HCB == TaskSettings)) 358 { 359 runTaskDistH(TaskSettings.init, on_handle, func, args); 360 } 361 /// ditto 362 void runTaskDistH(HCB, FT, ARGS...)(TaskSettings settings, scope HCB on_handle, FT func, auto ref ARGS args) 363 { 364 import vibe.core.channel : Channel, createChannel; 365 366 // TODO: support non-copyable argument types using .move 367 auto ch = createChannel!Task; 368 369 static void call(Channel!Task ch, FT func, ARGS args) { 370 try ch.put(Task.getThis()); 371 catch (Exception e) assert(false, e.msg); 372 func(args); 373 } 374 runTaskDist(settings, &call, ch, func, args); 375 376 foreach (i; 0 .. this.threadCount) 377 on_handle(ch.consumeOne()); 378 379 ch.close(); 380 } 381 382 private void runTask_unsafe(CALLABLE, ARGS...)(TaskSettings settings, CALLABLE callable, ref ARGS args) 383 { 384 import std.traits : ParameterTypeTuple; 385 import vibe.internal.traits : areConvertibleTo; 386 import vibe.internal.typetuple; 387 388 alias FARGS = ParameterTypeTuple!CALLABLE; 389 static assert(areConvertibleTo!(Group!ARGS, Group!FARGS), 390 "Cannot convert arguments '"~ARGS.stringof~"' to function arguments '"~FARGS.stringof~"'."); 391 392 m_state.lock.queue.put(settings, callable, args); 393 m_signal.emitSingle(); 394 } 395 396 private void runTaskDist_unsafe(CALLABLE, ARGS...)(TaskSettings settings, ref CALLABLE callable, ARGS args) // NOTE: no ref for args, to disallow non-copyable types! 397 { 398 import std.traits : ParameterTypeTuple; 399 import vibe.internal.traits : areConvertibleTo; 400 import vibe.internal.typetuple; 401 402 alias FARGS = ParameterTypeTuple!CALLABLE; 403 static assert(areConvertibleTo!(Group!ARGS, Group!FARGS), 404 "Cannot convert arguments '"~ARGS.stringof~"' to function arguments '"~FARGS.stringof~"'."); 405 406 { 407 auto st = m_state.lock; 408 foreach (thr; st.threads) { 409 // create one TFI per thread to properly account for elaborate assignment operators/postblit 410 thr.m_queue.put(settings, callable, args); 411 } 412 } 413 m_signal.emit(); 414 } 415 } 416 417 private final class WorkerThread : Thread { 418 private { 419 shared(TaskPool) m_pool; 420 TaskQueue m_queue; 421 } 422 423 this(shared(TaskPool) pool) 424 nothrow { 425 m_pool = pool; 426 m_queue.setup(); 427 super(&main); 428 } 429 430 private void main() 431 nothrow { 432 import core.stdc.stdlib : abort; 433 import core.exception : InvalidMemoryOperationError; 434 import std.encoding : sanitize; 435 436 try { 437 if (m_pool.m_state.lock.term) return; 438 logDebug("entering worker thread"); 439 440 // There is an issue where a task that periodically calls yield() 441 // but otherwise only performs a CPU computation will cause a 442 // call to runEventLoopOnce() or yield() called from the global 443 // thread context to not return before the task is finished. For 444 // this reason we start a task here, which in turn is scheduled 445 // properly together with such a task, and also is schduled 446 // according to the task priorities. 447 runTask(&handleWorkerTasks).joinUninterruptible(); 448 449 logDebug("Worker thread exit."); 450 } catch (Throwable th) { 451 th.logException!(LogLevel.fatal)("Worker thread terminated due to uncaught error"); 452 abort(); 453 } 454 } 455 456 private void handleWorkerTasks() 457 nothrow @safe { 458 import std.algorithm.iteration : filter; 459 import std.algorithm.mutation : swap; 460 import std.algorithm.searching : count; 461 import std.array : array; 462 463 logTrace("worker thread enter"); 464 TaskFuncInfo taskfunc; 465 auto emit_count = m_pool.m_signal.emitCount; 466 while(true) { 467 with (m_pool.m_state.lock) { 468 logTrace("worker thread check"); 469 470 if (term) break; 471 472 if (m_queue.consume(taskfunc)) { 473 logTrace("worker thread got specific task"); 474 } else if (queue.consume(taskfunc)) { 475 logTrace("worker thread got unspecific task"); 476 } 477 } 478 479 if (taskfunc.func !is null) 480 .runTask_internal!((ref tfi) { swap(tfi, taskfunc); }); 481 else emit_count = m_pool.m_signal.waitUninterruptible(emit_count); 482 } 483 484 logTrace("worker thread exit"); 485 486 if (!m_queue.empty) 487 logWarn("Worker thread shuts down with specific worker tasks left in its queue."); 488 489 with (m_pool.m_state.lock) { 490 threads = threads.filter!(t => t !is this).array; 491 if (threads.length > 0 && !queue.empty) 492 logWarn("Worker threads shut down with worker tasks still left in the queue."); 493 } 494 } 495 } 496 497 private struct TaskQueue { 498 nothrow @safe: 499 // TODO: avoid use of GC 500 501 import vibe.internal.array : FixedRingBuffer; 502 FixedRingBuffer!TaskFuncInfo* m_queue; 503 504 void setup() 505 { 506 m_queue = new FixedRingBuffer!TaskFuncInfo; 507 } 508 509 @property bool empty() const { return m_queue.empty; } 510 511 @property size_t length() const { return m_queue.length; } 512 513 void put(CALLABLE, ARGS...)(TaskSettings settings, ref CALLABLE c, ref ARGS args) 514 { 515 import std.algorithm.comparison : max; 516 if (m_queue.full) m_queue.capacity = max(16, m_queue.capacity * 3 / 2); 517 assert(!m_queue.full); 518 519 m_queue.peekDst[0].settings = settings; 520 m_queue.peekDst[0].set(c, args); 521 m_queue.putN(1); 522 } 523 524 bool consume(ref TaskFuncInfo tfi) 525 { 526 import std.algorithm.mutation : swap; 527 528 if (m_queue.empty) return false; 529 swap(tfi, m_queue.front); 530 m_queue.popFront(); 531 return true; 532 } 533 }