1 /**
2 	Multi-threaded task pool implementation.
4 	Copyright: © 2012-2020 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.core.taskpool;
10 import vibe.core.concurrency : isWeaklyIsolated;
11 import vibe.core.core : exitEventLoop, isCallable, isMethod, isNothrowCallable,
12 	isNothrowMethod, logicalProcessorCount, runEventLoop, runTask, runTask_internal;
13 import vibe.core.log;
14 import vibe.core.sync : ManualEvent, VibeSyncMonitor = Monitor, createSharedManualEvent, createMonitor;
15 import vibe.core.task : Task, TaskFuncInfo, TaskSettings, callWithMove;
16 import core.sync.mutex : Mutex;
17 import core.thread : Thread;
18 import std.traits : isFunctionPointer;
21 /** Implements a shared, multi-threaded task pool.
22 */
23 shared final class TaskPool {
24 	private {
25 		struct State {
26 			WorkerThread[] threads;
27 			TaskQueue queue;
28 			bool term;
29 		}
30 		VibeSyncMonitor!(State, shared(Mutex)) m_state;
31 		shared(ManualEvent) m_signal;
32 		immutable size_t m_threadCount;
33 	}
35 	/** Creates a new task pool with the specified number of threads.
37 		Params:
38 			thread_count = The number of worker threads to create
39 	*/
40 	this(size_t thread_count = logicalProcessorCount())
41 	@safe nothrow {
42 		import std.format : format;
44 		m_threadCount = thread_count;
45 		m_signal = createSharedManualEvent();
46 		m_state = createMonitor!State(new shared Mutex);
48 		with (m_state.lock) {
49 			queue.setup();
50 			threads.length = thread_count;
51 			foreach (i; 0 .. thread_count) {
52 				WorkerThread thr;
53 				() @trusted nothrow {
54 					thr = new WorkerThread(this);
55 					try thr.name = format("vibe-%s", i);
56 					catch (Exception e) logException(e, "Failed to set worker thread name");
57 					thr.start();
58 				} ();
59 				threads[i] = thr;
60 			}
61 		}
62 	}
64 	/** Returns the number of worker threads.
65 	*/
66 	@property size_t threadCount() const shared nothrow { return m_threadCount; }
68 	/** Instructs all worker threads to terminate and waits until all have
69 		finished.
70 	*/
71 	void terminate()
72 	@safe nothrow {
73 		m_state.lock.term = true;
74 		m_signal.emit();
76 		while (true) {
77 			WorkerThread th;
78 			with (m_state.lock)
79 				if (threads.length) {
80 					th = threads[0];
81 					threads = threads[1 .. $];
82 				}
83 			if (!th) break;
85 			if (th is Thread.getThis())
86 				continue;
88 			() @trusted {
89 				try th.join();
90 				catch (Exception e) {
91 					logWarn("Failed to wait for worker thread exit: %s", e.msg);
92 				}
93 			} ();
94 		}
96 		size_t cnt = m_state.lock.queue.length;
97 		if (cnt > 0) logWarn("There were still %d worker tasks pending at exit.", cnt);
98 	}
100 	/** Instructs all worker threads to terminate as soon as all tasks have
101 		been processed and waits for them to finish.
102 	*/
103 	void join()
104 	@safe nothrow {
105 		assert(false, "TODO!");
106 	}
108 	/** Runs a new asynchronous task in a worker thread.
110 		Only function pointers with weakly isolated arguments are allowed to be
111 		able to guarantee thread-safety.
112 	*/
113 	void runTask(FT, ARGS...)(FT func, auto ref ARGS args)
114 		if (isFunctionPointer!FT)
115 	{
116 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
117 		runTask_unsafe(TaskSettings.init, func, args);
118 	}
119 	/// ditto
120 	void runTask(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args)
121 		if (is(typeof(__traits(getMember, object, __traits(identifier, method)))))
122 	{
123 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
124 		auto func = &__traits(getMember, object, __traits(identifier, method));
125 		runTask_unsafe(TaskSettings.init, func, args);
126 	}
127 	/// ditto
128 	void runTask(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args)
129 		if (isFunctionPointer!FT)
130 	{
131 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
132 		runTask_unsafe(settings, func, args);
133 	}
134 	/// ditto
135 	void runTask(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args)
136 		if (is(typeof(__traits(getMember, object, __traits(identifier, method)))))
137 	{
138 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
139 		auto func = &__traits(getMember, object, __traits(identifier, method));
140 		runTask_unsafe(settings, func, args);
141 	}
143 	/** Runs a new asynchronous task in a worker thread, returning the task handle.
145 		This function will yield and wait for the new task to be created and started
146 		in the worker thread, then resume and return it.
148 		Only function pointers with weakly isolated arguments are allowed to be
149 		able to guarantee thread-safety.
150 	*/
151 	Task runTaskH(FT, ARGS...)(FT func, auto ref ARGS args)
152 		if (isFunctionPointer!FT && isNothrowCallable!(FT, ARGS))
153 	{
154 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
156 		// workaround for runWorkerTaskH to work when called outside of a task
157 		if (Task.getThis() == Task.init) {
158 			Task ret;
159 			.runTask((FT func, ARGS args) nothrow { ret = doRunTaskH(TaskSettings.init, func, args); }, func, args).joinUninterruptible();
160 			return ret;
161 		} else return doRunTaskH(TaskSettings.init, func, args);
162 	}
163 	/// ditto
164 	Task runTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args)
165 		if (isNothrowMethod!(shared(T), method, ARGS))
166 	{
167 		static void wrapper()(shared(T) object, ref ARGS args) {
168 			__traits(getMember, object, __traits(identifier, method))(args);
169 		}
170 		return runTaskH(&wrapper!(), object, args);
171 	}
172 	/// ditto
173 	Task runTaskH(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args)
174 		if (isFunctionPointer!FT && isNothrowCallable!(FT, ARGS))
175 	{
176 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
178 		// workaround for runWorkerTaskH to work when called outside of a task
179 		if (Task.getThis() == Task.init) {
180 			Task ret;
181 			.runTask((TaskSettings settings, FT func, ARGS args) nothrow { ret = doRunTaskH(settings, func, args); }, settings, func, args).joinUninterruptible();
182 			return ret;
183 		} else return doRunTaskH(settings, func, args);
184 	}
185 	/// ditto
186 	Task runTaskH(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args)
187 		if (isNothrowMethod!(shared(T), method, ARGS))
188 	{
189 		static void wrapper()(shared(T) object, ref ARGS args) {
190 			__traits(getMember, object, __traits(identifier, method))(args);
191 		}
192 		return runTaskH(settings, &wrapper!(), object, args);
193 	}
194 	/// ditto
195 	deprecated("The `func` argument should be `nothrow`.")
196 	Task runTaskH(FT, ARGS...)(FT func, auto ref ARGS args)
197 		if (isFunctionPointer!FT && isCallable!(FT, ARGS) && !isNothrowCallable!(FT, ARGS))
198 	{
199 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
201 		// workaround for runWorkerTaskH to work when called outside of a task
202 		if (Task.getThis() == Task.init) {
203 			Task ret;
204 			.runTask({ ret = doRunTaskH(TaskSettings.init, func, args); }).join();
205 			return ret;
206 		} else return doRunTaskH(TaskSettings.init, func, args);
207 	}
208 	/// ditto
209 	deprecated("The `method` argument should be `nothrow`.")
210 	Task runTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args)
211 		if (isMethod!(shared(T), method, ARGS) && !isNothrowMethod!(shared(T), method, ARGS))
212 	{
213 		static void wrapper()(shared(T) object, ref ARGS args) {
214 			__traits(getMember, object, __traits(identifier, method))(args);
215 		}
216 		return runTaskH(&wrapper!(), object, args);
217 	}
218 	/// ditto
219 	deprecated("The `func` argument should be `nothrow`.")
220 	Task runTaskH(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args)
221 		if (isFunctionPointer!FT && isCallable!(FT, ARGS) && !isNothrowCallable!(FT, ARGS))
222 	{
223 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
225 		// workaround for runWorkerTaskH to work when called outside of a task
226 		if (Task.getThis() == Task.init) {
227 			Task ret;
228 			.runTask({ ret = doRunTaskH(settings, func, args); }).join();
229 			return ret;
230 		} else return doRunTaskH(settings, func, args);
231 	}
232 	/// ditto
233 	deprecated("The `method` argument should be `nothrow`.")
234 	Task runTaskH(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args)
235 		if (isMethod!(shared(T), method, ARGS) && !isNothrowMethod!(shared(T), method, ARGS))
236 	{
237 		static void wrapper()(shared(T) object, ref ARGS args) {
238 			__traits(getMember, object, __traits(identifier, method))(args);
239 		}
240 		return runTaskH(settings, &wrapper!(), object, args);
241 	}
243 	// NOTE: needs to be a separate function to avoid recursion for the
244 	//       workaround above, which breaks @safe inference
245 	private Task doRunTaskH(FT, ARGS...)(TaskSettings settings, FT func, ref ARGS args)
246 		if (isFunctionPointer!FT)
247 	{
248 		import std.typecons : Typedef;
249 		import vibe.core.channel : Channel, createChannel;
251 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
253 		alias PrivateTask = Typedef!(Task, Task.init, __PRETTY_FUNCTION__);
255 		auto ch = createChannel!Task();
257 		static void taskFun(Channel!Task ch, FT func, ARGS args) {
258 			try ch.put(Task.getThis());
259 			catch (Exception e) assert(false, e.msg);
260 			mixin(callWithMove!ARGS("func", "args"));
261 		}
262 		runTask_unsafe(settings, &taskFun, ch, func, args);
264 		Task ret;
265 		try ret = ch.consumeOne();
266 		catch (Exception e) assert(false, "Failed to reveice task handle: " ~ e.msg);
267 		ch.close();
268 		return ret;
269 	}
272 	/** Runs a new asynchronous task in all worker threads concurrently.
274 		This function is mainly useful for long-living tasks that distribute their
275 		work across all CPU cores. Only function pointers with weakly isolated
276 		arguments are allowed to be able to guarantee thread-safety.
278 		The number of tasks started is guaranteed to be equal to
279 		`threadCount`.
280 	*/
281 	void runTaskDist(FT, ARGS...)(FT func, auto ref ARGS args)
282 		if (isFunctionPointer!FT && isNothrowCallable!(FT, ARGS))
283 	{
284 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
285 		runTaskDist_unsafe(TaskSettings.init, func, args);
286 	}
287 	/// ditto
288 	void runTaskDist(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args)
289 		if (isNothrowMethod!(shared(T), method, ARGS))
290 	{
291 		auto func = &__traits(getMember, object, __traits(identifier, method));
292 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
294 		runTaskDist_unsafe(TaskSettings.init, func, args);
295 	}
296 	/// ditto
297 	void runTaskDist(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args)
298 		if (isFunctionPointer!FT && isNothrowCallable!(FT, ARGS))
299 	{
300 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
301 		runTaskDist_unsafe(settings, func, args);
302 	}
303 	/// ditto
304 	void runTaskDist(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args)
305 		if (isNothrowMethod!(shared(T), method, ARGS))
306 	{
307 		auto func = &__traits(getMember, object, __traits(identifier, method));
308 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
310 		runTaskDist_unsafe(settings, func, args);
311 	}
312 	/// ditto
313 	deprecated("The `func` argument should be `nothrow`.")
314 	void runTaskDist(FT, ARGS...)(FT func, auto ref ARGS args)
315 		if (isFunctionPointer!FT && isCallable!(FT, ARGS) && !isNothrowCallable!(FT, ARGS))
316 	{
317 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
318 		runTaskDist_unsafe(TaskSettings.init, func, args);
319 	}
320 	/// ditto
321 	deprecated("The `method` argument should be `nothrow`.")
322 	void runTaskDist(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args)
323 		if (isMethod!(shared(T), method, ARGS) && !isNothrowMethod!(shared(T), method, ARGS))
324 	{
325 		auto func = &__traits(getMember, object, __traits(identifier, method));
326 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
328 		runTaskDist_unsafe(TaskSettings.init, func, args);
329 	}
330 	/// ditto
331 	deprecated("The `func` argument should be `nothrow`.")
332 	void runTaskDist(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args)
333 		if (isFunctionPointer!FT && isCallable!(FT, ARGS) && !isNothrowCallable!(FT, ARGS))
334 	{
335 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
336 		runTaskDist_unsafe(settings, func, args);
337 	}
338 	/// ditto
339 	deprecated("The `method` argument should be `nothrow`.")
340 	void runTaskDist(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args)
341 		if (isMethod!(shared(T), method, ARGS) && !isNothrowMethod!(shared(T), method, ARGS))
342 	{
343 		auto func = &__traits(getMember, object, __traits(identifier, method));
344 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
346 		runTaskDist_unsafe(settings, func, args);
347 	}
349 	/** Runs a new asynchronous task in all worker threads and returns the handles.
351 		`on_handle` is an alias to a callble that takes a `Task` as its only
352 		argument and is called for every task instance that gets created.
354 		See_also: `runTaskDist`
355 	*/
356 	void runTaskDistH(HCB, FT, ARGS...)(scope HCB on_handle, FT func, auto ref ARGS args)
357 		if (!is(HCB == TaskSettings))
358 	{
359 		runTaskDistH(TaskSettings.init, on_handle, func, args);
360 	}
361 	/// ditto
362 	void runTaskDistH(HCB, FT, ARGS...)(TaskSettings settings, scope HCB on_handle, FT func, auto ref ARGS args)
363 	{
364 		import vibe.core.channel : Channel, createChannel;
366 		// TODO: support non-copyable argument types using .move
367 		auto ch = createChannel!Task;
369 		static void call(Channel!Task ch, FT func, ARGS args) {
370 			try ch.put(Task.getThis());
371 			catch (Exception e) assert(false, e.msg);
372 			func(args);
373 		}
374 		runTaskDist(settings, &call, ch, func, args);
376 		foreach (i; 0 .. this.threadCount)
377 			on_handle(ch.consumeOne());
379 		ch.close();
380 	}
382 	private void runTask_unsafe(CALLABLE, ARGS...)(TaskSettings settings, CALLABLE callable, ref ARGS args)
383 	{
384 		import std.traits : ParameterTypeTuple;
385 		import vibe.internal.traits : areConvertibleTo;
386 		import vibe.internal.typetuple;
388 		alias FARGS = ParameterTypeTuple!CALLABLE;
389 		static assert(areConvertibleTo!(Group!ARGS, Group!FARGS),
390 			"Cannot convert arguments '"~ARGS.stringof~"' to function arguments '"~FARGS.stringof~"'.");
392 		m_state.lock.queue.put(settings, callable, args);
393 		m_signal.emitSingle();
394 	}
396 	private void runTaskDist_unsafe(CALLABLE, ARGS...)(TaskSettings settings, ref CALLABLE callable, ARGS args) // NOTE: no ref for args, to disallow non-copyable types!
397 	{
398 		import std.traits : ParameterTypeTuple;
399 		import vibe.internal.traits : areConvertibleTo;
400 		import vibe.internal.typetuple;
402 		alias FARGS = ParameterTypeTuple!CALLABLE;
403 		static assert(areConvertibleTo!(Group!ARGS, Group!FARGS),
404 			"Cannot convert arguments '"~ARGS.stringof~"' to function arguments '"~FARGS.stringof~"'.");
406 		{
407 			auto st = m_state.lock;
408 			foreach (thr; st.threads) {
409 				// create one TFI per thread to properly account for elaborate assignment operators/postblit
410 				thr.m_queue.put(settings, callable, args);
411 			}
412 		}
413 		m_signal.emit();
414 	}
415 }
417 private final class WorkerThread : Thread {
418 	private {
419 		shared(TaskPool) m_pool;
420 		TaskQueue m_queue;
421 	}
423 	this(shared(TaskPool) pool)
424 	nothrow {
425 		m_pool = pool;
426 		m_queue.setup();
427 		super(&main);
428 	}
430 	private void main()
431 	nothrow {
432 		import core.stdc.stdlib : abort;
433 		import core.exception : InvalidMemoryOperationError;
434 		import std.encoding : sanitize;
436 		try {
437 			if (m_pool.m_state.lock.term) return;
438 			logDebug("entering worker thread");
440 			// There is an issue where a task that periodically calls yield()
441 			// but otherwise only performs a CPU computation will cause a
442 			// call to runEventLoopOnce() or yield() called from the global
443 			// thread context to not return before the task is finished. For
444 			// this reason we start a task here, which in turn is scheduled
445 			// properly together with such a task, and also is schduled
446 			// according to the task priorities.
447 			runTask(&handleWorkerTasks).joinUninterruptible();
449 			logDebug("Worker thread exit.");
450 		} catch (Throwable th) {
451 			th.logException!(LogLevel.fatal)("Worker thread terminated due to uncaught error");
452 			abort();
453 		}
454 	}
456 	private void handleWorkerTasks()
457 	nothrow @safe {
458 		import std.algorithm.iteration : filter;
459 		import std.algorithm.mutation : swap;
460 		import std.algorithm.searching : count;
461 		import std.array : array;
463 		logTrace("worker thread enter");
464 		TaskFuncInfo taskfunc;
465 		auto emit_count = m_pool.m_signal.emitCount;
466 		while(true) {
467 			with (m_pool.m_state.lock) {
468 				logTrace("worker thread check");
470 				if (term) break;
472 				if (m_queue.consume(taskfunc)) {
473 					logTrace("worker thread got specific task");
474 				} else if (queue.consume(taskfunc)) {
475 					logTrace("worker thread got unspecific task");
476 				}
477 			}
479 			if (taskfunc.func !is null)
480 				.runTask_internal!((ref tfi) { swap(tfi, taskfunc); });
481 			else emit_count = m_pool.m_signal.waitUninterruptible(emit_count);
482 		}
484 		logTrace("worker thread exit");
486 		if (!m_queue.empty)
487 			logWarn("Worker thread shuts down with specific worker tasks left in its queue.");
489 		with (m_pool.m_state.lock) {
490 			threads = threads.filter!(t => t !is this).array;
491 			if (threads.length > 0 && !queue.empty)
492 				logWarn("Worker threads shut down with worker tasks still left in the queue.");
493 		}
494 	}
495 }
497 private struct TaskQueue {
498 nothrow @safe:
499 	// TODO: avoid use of GC
501 	import vibe.internal.array : FixedRingBuffer;
502 	FixedRingBuffer!TaskFuncInfo* m_queue;
504 	void setup()
505 	{
506 		m_queue = new FixedRingBuffer!TaskFuncInfo;
507 	}
509 	@property bool empty() const { return m_queue.empty; }
511 	@property size_t length() const { return m_queue.length; }
513 	void put(CALLABLE, ARGS...)(TaskSettings settings, ref CALLABLE c, ref ARGS args)
514 	{
515 		import std.algorithm.comparison : max;
516 		if (m_queue.full) m_queue.capacity = max(16, m_queue.capacity * 3 / 2);
517 		assert(!m_queue.full);
519 		m_queue.peekDst[0].settings = settings;
520 		m_queue.peekDst[0].set(c, args);
521 		m_queue.putN(1);
522 	}
524 	bool consume(ref TaskFuncInfo tfi)
525 	{
526 		import std.algorithm.mutation : swap;
528 		if (m_queue.empty) return false;
529 		swap(tfi, m_queue.front);
530 		m_queue.popFront();
531 		return true;
532 	}
533 }