1 /**
2 	Multi-threaded task pool implementation.
3 
4 	Copyright: © 2012-2020 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.core.taskpool;
9 
10 import vibe.core.concurrency : isWeaklyIsolated;
11 import vibe.core.core : exitEventLoop, isCallable, isMethod, isNothrowCallable,
12 	isNothrowMethod, logicalProcessorCount, runEventLoop, runTask, runTask_internal;
13 import vibe.core.log;
14 import vibe.core.sync : ManualEvent, VibeSyncMonitor = Monitor, createSharedManualEvent, createMonitor;
15 import vibe.core.task : Task, TaskFuncInfo, TaskSettings, callWithMove;
16 import core.sync.mutex : Mutex;
17 import core.thread : Thread;
18 import std.traits : isFunctionPointer;
19 
20 
21 /** Implements a shared, multi-threaded task pool.
22 */
23 shared final class TaskPool {
24 	private {
25 		struct State {
26 			WorkerThread[] threads;
27 			TaskQueue queue;
28 			bool term;
29 		}
30 		VibeSyncMonitor!(State, shared(Mutex)) m_state;
31 		shared(ManualEvent) m_signal;
32 		immutable size_t m_threadCount;
33 	}
34 
35 	/** Creates a new task pool with the specified number of threads.
36 
37 		Params:
38 			thread_count = The number of worker threads to create
39 	*/
40 	this(size_t thread_count = logicalProcessorCount())
41 	@safe nothrow {
42 		import std.format : format;
43 
44 		m_threadCount = thread_count;
45 		m_signal = createSharedManualEvent();
46 		m_state = createMonitor!State(new shared Mutex);
47 
48 		with (m_state.lock) {
49 			queue.setup();
50 			threads.length = thread_count;
51 			foreach (i; 0 .. thread_count) {
52 				WorkerThread thr;
53 				() @trusted nothrow {
54 					thr = new WorkerThread(this);
55 					try thr.name = format("vibe-%s", i);
56 					catch (Exception e) logException(e, "Failed to set worker thread name");
57 					thr.start();
58 				} ();
59 				threads[i] = thr;
60 			}
61 		}
62 	}
63 
64 	/** Returns the number of worker threads.
65 	*/
66 	@property size_t threadCount() const shared nothrow { return m_threadCount; }
67 
68 	/** Instructs all worker threads to terminate and waits until all have
69 		finished.
70 	*/
71 	void terminate()
72 	@safe nothrow {
73 		m_state.lock.term = true;
74 		m_signal.emit();
75 
76 		while (true) {
77 			WorkerThread th;
78 			with (m_state.lock)
79 				if (threads.length) {
80 					th = threads[0];
81 					threads = threads[1 .. $];
82 				}
83 			if (!th) break;
84 
85 			if (th is Thread.getThis())
86 				continue;
87 
88 			() @trusted {
89 				try th.join();
90 				catch (Exception e) {
91 					logWarn("Failed to wait for worker thread exit: %s", e.msg);
92 				}
93 			} ();
94 		}
95 
96 		size_t cnt = m_state.lock.queue.length;
97 		if (cnt > 0) logWarn("There were still %d worker tasks pending at exit.", cnt);
98 	}
99 
100 	/** Instructs all worker threads to terminate as soon as all tasks have
101 		been processed and waits for them to finish.
102 	*/
103 	void join()
104 	@safe nothrow {
105 		assert(false, "TODO!");
106 	}
107 
108 	/** Runs a new asynchronous task in a worker thread.
109 
110 		Only function pointers with weakly isolated arguments are allowed to be
111 		able to guarantee thread-safety.
112 	*/
113 	void runTask(FT, ARGS...)(FT func, auto ref ARGS args)
114 		if (isFunctionPointer!FT)
115 	{
116 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
117 		runTask_unsafe(TaskSettings.init, func, args);
118 	}
119 	/// ditto
120 	void runTask(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args)
121 		if (is(typeof(__traits(getMember, object, __traits(identifier, method)))))
122 	{
123 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
124 		auto func = &__traits(getMember, object, __traits(identifier, method));
125 		runTask_unsafe(TaskSettings.init, func, args);
126 	}
127 	/// ditto
128 	void runTask(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args)
129 		if (isFunctionPointer!FT)
130 	{
131 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
132 		runTask_unsafe(settings, func, args);
133 	}
134 	/// ditto
135 	void runTask(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args)
136 		if (is(typeof(__traits(getMember, object, __traits(identifier, method)))))
137 	{
138 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
139 		auto func = &__traits(getMember, object, __traits(identifier, method));
140 		runTask_unsafe(settings, func, args);
141 	}
142 
143 	/** Runs a new asynchronous task in a worker thread, returning the task handle.
144 
145 		This function will yield and wait for the new task to be created and started
146 		in the worker thread, then resume and return it.
147 
148 		Only function pointers with weakly isolated arguments are allowed to be
149 		able to guarantee thread-safety.
150 	*/
151 	Task runTaskH(FT, ARGS...)(FT func, auto ref ARGS args)
152 		if (isFunctionPointer!FT && isNothrowCallable!(FT, ARGS))
153 	{
154 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
155 
156 		// workaround for runWorkerTaskH to work when called outside of a task
157 		if (Task.getThis() == Task.init) {
158 			Task ret;
159 			.runTask((FT func, ARGS args) nothrow { ret = doRunTaskH(TaskSettings.init, func, args); }, func, args).joinUninterruptible();
160 			return ret;
161 		} else return doRunTaskH(TaskSettings.init, func, args);
162 	}
163 	/// ditto
164 	Task runTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args)
165 		if (isNothrowMethod!(shared(T), method, ARGS))
166 	{
167 		static void wrapper()(shared(T) object, ref ARGS args) {
168 			__traits(getMember, object, __traits(identifier, method))(args);
169 		}
170 		return runTaskH(&wrapper!(), object, args);
171 	}
172 	/// ditto
173 	Task runTaskH(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args)
174 		if (isFunctionPointer!FT && isNothrowCallable!(FT, ARGS))
175 	{
176 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
177 
178 		// workaround for runWorkerTaskH to work when called outside of a task
179 		if (Task.getThis() == Task.init) {
180 			Task ret;
181 			.runTask((TaskSettings settings, FT func, ARGS args) nothrow { ret = doRunTaskH(settings, func, args); }, settings, func, args).joinUninterruptible();
182 			return ret;
183 		} else return doRunTaskH(settings, func, args);
184 	}
185 	/// ditto
186 	Task runTaskH(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args)
187 		if (isNothrowMethod!(shared(T), method, ARGS))
188 	{
189 		static void wrapper()(shared(T) object, ref ARGS args) {
190 			__traits(getMember, object, __traits(identifier, method))(args);
191 		}
192 		return runTaskH(settings, &wrapper!(), object, args);
193 	}
194 	/// ditto
195 	deprecated("The `func` argument should be `nothrow`.")
196 	Task runTaskH(FT, ARGS...)(FT func, auto ref ARGS args)
197 		if (isFunctionPointer!FT && isCallable!(FT, ARGS) && !isNothrowCallable!(FT, ARGS))
198 	{
199 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
200 
201 		// workaround for runWorkerTaskH to work when called outside of a task
202 		if (Task.getThis() == Task.init) {
203 			Task ret;
204 			.runTask({ ret = doRunTaskH(TaskSettings.init, func, args); }).join();
205 			return ret;
206 		} else return doRunTaskH(TaskSettings.init, func, args);
207 	}
208 	/// ditto
209 	deprecated("The `method` argument should be `nothrow`.")
210 	Task runTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args)
211 		if (isMethod!(shared(T), method, ARGS) && !isNothrowMethod!(shared(T), method, ARGS))
212 	{
213 		static void wrapper()(shared(T) object, ref ARGS args) {
214 			__traits(getMember, object, __traits(identifier, method))(args);
215 		}
216 		return runTaskH(&wrapper!(), object, args);
217 	}
218 	/// ditto
219 	deprecated("The `func` argument should be `nothrow`.")
220 	Task runTaskH(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args)
221 		if (isFunctionPointer!FT && isCallable!(FT, ARGS) && !isNothrowCallable!(FT, ARGS))
222 	{
223 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
224 
225 		// workaround for runWorkerTaskH to work when called outside of a task
226 		if (Task.getThis() == Task.init) {
227 			Task ret;
228 			.runTask({ ret = doRunTaskH(settings, func, args); }).join();
229 			return ret;
230 		} else return doRunTaskH(settings, func, args);
231 	}
232 	/// ditto
233 	deprecated("The `method` argument should be `nothrow`.")
234 	Task runTaskH(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args)
235 		if (isMethod!(shared(T), method, ARGS) && !isNothrowMethod!(shared(T), method, ARGS))
236 	{
237 		static void wrapper()(shared(T) object, ref ARGS args) {
238 			__traits(getMember, object, __traits(identifier, method))(args);
239 		}
240 		return runTaskH(settings, &wrapper!(), object, args);
241 	}
242 
243 	// NOTE: needs to be a separate function to avoid recursion for the
244 	//       workaround above, which breaks @safe inference
245 	private Task doRunTaskH(FT, ARGS...)(TaskSettings settings, FT func, ref ARGS args)
246 		if (isFunctionPointer!FT)
247 	{
248 		import std.typecons : Typedef;
249 		import vibe.core.channel : Channel, createChannel;
250 
251 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
252 
253 		alias PrivateTask = Typedef!(Task, Task.init, __PRETTY_FUNCTION__);
254 
255 		auto ch = createChannel!Task();
256 
257 		static void taskFun(Channel!Task ch, FT func, ARGS args) {
258 			try ch.put(Task.getThis());
259 			catch (Exception e) assert(false, e.msg);
260 			mixin(callWithMove!ARGS("func", "args"));
261 		}
262 		runTask_unsafe(settings, &taskFun, ch, func, args);
263 
264 		Task ret;
265 		try ret = ch.consumeOne();
266 		catch (Exception e) assert(false, "Failed to reveice task handle: " ~ e.msg);
267 		ch.close();
268 		return ret;
269 	}
270 
271 
272 	/** Runs a new asynchronous task in all worker threads concurrently.
273 
274 		This function is mainly useful for long-living tasks that distribute their
275 		work across all CPU cores. Only function pointers with weakly isolated
276 		arguments are allowed to be able to guarantee thread-safety.
277 
278 		The number of tasks started is guaranteed to be equal to
279 		`threadCount`.
280 	*/
281 	void runTaskDist(FT, ARGS...)(FT func, auto ref ARGS args)
282 		if (isFunctionPointer!FT && isNothrowCallable!(FT, ARGS))
283 	{
284 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
285 		runTaskDist_unsafe(TaskSettings.init, func, args);
286 	}
287 	/// ditto
288 	void runTaskDist(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args)
289 		if (isNothrowMethod!(shared(T), method, ARGS))
290 	{
291 		auto func = &__traits(getMember, object, __traits(identifier, method));
292 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
293 
294 		runTaskDist_unsafe(TaskSettings.init, func, args);
295 	}
296 	/// ditto
297 	void runTaskDist(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args)
298 		if (isFunctionPointer!FT && isNothrowCallable!(FT, ARGS))
299 	{
300 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
301 		runTaskDist_unsafe(settings, func, args);
302 	}
303 	/// ditto
304 	void runTaskDist(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args)
305 		if (isNothrowMethod!(shared(T), method, ARGS))
306 	{
307 		auto func = &__traits(getMember, object, __traits(identifier, method));
308 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
309 
310 		runTaskDist_unsafe(settings, func, args);
311 	}
312 	/// ditto
313 	deprecated("The `func` argument should be `nothrow`.")
314 	void runTaskDist(FT, ARGS...)(FT func, auto ref ARGS args)
315 		if (isFunctionPointer!FT && isCallable!(FT, ARGS) && !isNothrowCallable!(FT, ARGS))
316 	{
317 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
318 		runTaskDist_unsafe(TaskSettings.init, func, args);
319 	}
320 	/// ditto
321 	deprecated("The `method` argument should be `nothrow`.")
322 	void runTaskDist(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args)
323 		if (isMethod!(shared(T), method, ARGS) && !isNothrowMethod!(shared(T), method, ARGS))
324 	{
325 		auto func = &__traits(getMember, object, __traits(identifier, method));
326 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
327 
328 		runTaskDist_unsafe(TaskSettings.init, func, args);
329 	}
330 	/// ditto
331 	deprecated("The `func` argument should be `nothrow`.")
332 	void runTaskDist(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args)
333 		if (isFunctionPointer!FT && isCallable!(FT, ARGS) && !isNothrowCallable!(FT, ARGS))
334 	{
335 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
336 		runTaskDist_unsafe(settings, func, args);
337 	}
338 	/// ditto
339 	deprecated("The `method` argument should be `nothrow`.")
340 	void runTaskDist(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args)
341 		if (isMethod!(shared(T), method, ARGS) && !isNothrowMethod!(shared(T), method, ARGS))
342 	{
343 		auto func = &__traits(getMember, object, __traits(identifier, method));
344 		foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads.");
345 
346 		runTaskDist_unsafe(settings, func, args);
347 	}
348 
349 	/** Runs a new asynchronous task in all worker threads and returns the handles.
350 
351 		`on_handle` is an alias to a callble that takes a `Task` as its only
352 		argument and is called for every task instance that gets created.
353 
354 		See_also: `runTaskDist`
355 	*/
356 	void runTaskDistH(HCB, FT, ARGS...)(scope HCB on_handle, FT func, auto ref ARGS args)
357 		if (!is(HCB == TaskSettings))
358 	{
359 		runTaskDistH(TaskSettings.init, on_handle, func, args);
360 	}
361 	/// ditto
362 	void runTaskDistH(HCB, FT, ARGS...)(TaskSettings settings, scope HCB on_handle, FT func, auto ref ARGS args)
363 	{
364 		import vibe.core.channel : Channel, createChannel;
365 
366 		// TODO: support non-copyable argument types using .move
367 		auto ch = createChannel!Task;
368 
369 		static void call(Channel!Task ch, FT func, ARGS args) {
370 			try ch.put(Task.getThis());
371 			catch (Exception e) assert(false, e.msg);
372 			func(args);
373 		}
374 		runTaskDist(settings, &call, ch, func, args);
375 
376 		foreach (i; 0 .. this.threadCount)
377 			on_handle(ch.consumeOne());
378 
379 		ch.close();
380 	}
381 
382 	private void runTask_unsafe(CALLABLE, ARGS...)(TaskSettings settings, CALLABLE callable, ref ARGS args)
383 	{
384 		import std.traits : ParameterTypeTuple;
385 		import vibe.internal.traits : areConvertibleTo;
386 		import vibe.internal.typetuple;
387 
388 		alias FARGS = ParameterTypeTuple!CALLABLE;
389 		static assert(areConvertibleTo!(Group!ARGS, Group!FARGS),
390 			"Cannot convert arguments '"~ARGS.stringof~"' to function arguments '"~FARGS.stringof~"'.");
391 
392 		m_state.lock.queue.put(settings, callable, args);
393 		m_signal.emitSingle();
394 	}
395 
396 	private void runTaskDist_unsafe(CALLABLE, ARGS...)(TaskSettings settings, ref CALLABLE callable, ARGS args) // NOTE: no ref for args, to disallow non-copyable types!
397 	{
398 		import std.traits : ParameterTypeTuple;
399 		import vibe.internal.traits : areConvertibleTo;
400 		import vibe.internal.typetuple;
401 
402 		alias FARGS = ParameterTypeTuple!CALLABLE;
403 		static assert(areConvertibleTo!(Group!ARGS, Group!FARGS),
404 			"Cannot convert arguments '"~ARGS.stringof~"' to function arguments '"~FARGS.stringof~"'.");
405 
406 		{
407 			auto st = m_state.lock;
408 			foreach (thr; st.threads) {
409 				// create one TFI per thread to properly account for elaborate assignment operators/postblit
410 				thr.m_queue.put(settings, callable, args);
411 			}
412 		}
413 		m_signal.emit();
414 	}
415 }
416 
417 private final class WorkerThread : Thread {
418 	private {
419 		shared(TaskPool) m_pool;
420 		TaskQueue m_queue;
421 	}
422 
423 	this(shared(TaskPool) pool)
424 	nothrow {
425 		m_pool = pool;
426 		m_queue.setup();
427 		super(&main);
428 	}
429 
430 	private void main()
431 	nothrow {
432 		import core.stdc.stdlib : abort;
433 		import core.exception : InvalidMemoryOperationError;
434 		import std.encoding : sanitize;
435 
436 		try {
437 			if (m_pool.m_state.lock.term) return;
438 			logDebug("entering worker thread");
439 
440 			// There is an issue where a task that periodically calls yield()
441 			// but otherwise only performs a CPU computation will cause a
442 			// call to runEventLoopOnce() or yield() called from the global
443 			// thread context to not return before the task is finished. For
444 			// this reason we start a task here, which in turn is scheduled
445 			// properly together with such a task, and also is schduled
446 			// according to the task priorities.
447 			runTask(&handleWorkerTasks).joinUninterruptible();
448 
449 			logDebug("Worker thread exit.");
450 		} catch (Throwable th) {
451 			th.logException!(LogLevel.fatal)("Worker thread terminated due to uncaught error");
452 			abort();
453 		}
454 	}
455 
456 	private void handleWorkerTasks()
457 	nothrow @safe {
458 		import std.algorithm.iteration : filter;
459 		import std.algorithm.mutation : swap;
460 		import std.algorithm.searching : count;
461 		import std.array : array;
462 
463 		logTrace("worker thread enter");
464 		TaskFuncInfo taskfunc;
465 		auto emit_count = m_pool.m_signal.emitCount;
466 		while(true) {
467 			with (m_pool.m_state.lock) {
468 				logTrace("worker thread check");
469 
470 				if (term) break;
471 
472 				if (m_queue.consume(taskfunc)) {
473 					logTrace("worker thread got specific task");
474 				} else if (queue.consume(taskfunc)) {
475 					logTrace("worker thread got unspecific task");
476 				}
477 			}
478 
479 			if (taskfunc.func !is null)
480 				.runTask_internal!((ref tfi) { swap(tfi, taskfunc); });
481 			else emit_count = m_pool.m_signal.waitUninterruptible(emit_count);
482 		}
483 
484 		logTrace("worker thread exit");
485 
486 		if (!m_queue.empty)
487 			logWarn("Worker thread shuts down with specific worker tasks left in its queue.");
488 
489 		with (m_pool.m_state.lock) {
490 			threads = threads.filter!(t => t !is this).array;
491 			if (threads.length > 0 && !queue.empty)
492 				logWarn("Worker threads shut down with worker tasks still left in the queue.");
493 		}
494 	}
495 }
496 
497 private struct TaskQueue {
498 nothrow @safe:
499 	// TODO: avoid use of GC
500 
501 	import vibe.internal.array : FixedRingBuffer;
502 	FixedRingBuffer!TaskFuncInfo* m_queue;
503 
504 	void setup()
505 	{
506 		m_queue = new FixedRingBuffer!TaskFuncInfo;
507 	}
508 
509 	@property bool empty() const { return m_queue.empty; }
510 
511 	@property size_t length() const { return m_queue.length; }
512 
513 	void put(CALLABLE, ARGS...)(TaskSettings settings, ref CALLABLE c, ref ARGS args)
514 	{
515 		import std.algorithm.comparison : max;
516 		if (m_queue.full) m_queue.capacity = max(16, m_queue.capacity * 3 / 2);
517 		assert(!m_queue.full);
518 
519 		m_queue.peekDst[0].settings = settings;
520 		m_queue.peekDst[0].set(c, args);
521 		m_queue.putN(1);
522 	}
523 
524 	bool consume(ref TaskFuncInfo tfi)
525 	{
526 		import std.algorithm.mutation : swap;
527 
528 		if (m_queue.empty) return false;
529 		swap(tfi, m_queue.front);
530 		m_queue.popFront();
531 		return true;
532 	}
533 }