1 /**
2 	Contains interfaces and enums for evented I/O drivers.
3 
4 	Copyright: © 2012-2020 Sönke Ludwig
5 	Authors: Sönke Ludwig
6 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
7 */
8 module vibe.core.task;
9 
10 import vibe.core.log;
11 import vibe.core.sync;
12 
13 import core.atomic : atomicOp, atomicLoad, cas;
14 import core.thread;
15 import std.exception;
16 import std.traits;
17 import std.typecons;
18 
19 
20 /** Represents a single task as started using vibe.core.runTask.
21 
22 	Note that the Task type is considered weakly isolated and thus can be
23 	passed between threads using vibe.core.concurrency.send or by passing
24 	it as a parameter to vibe.core.core.runWorkerTask.
25 */
26 struct Task {
27 	private {
28 		shared(TaskFiber) m_fiber;
29 		size_t m_taskCounter;
30 		import std.concurrency : ThreadInfo, Tid;
31 		static ThreadInfo s_tidInfo;
32 	}
33 
34 	enum basePriority = 0x00010000;
35 
36 	private this(TaskFiber fiber, size_t task_counter)
37 	@safe nothrow {
38 		() @trusted { m_fiber = cast(shared)fiber; } ();
39 		m_taskCounter = task_counter;
40 	}
41 
42 	this(in Task other)
43 	@safe nothrow {
44 		m_fiber = () @trusted { return cast(shared(TaskFiber))other.m_fiber; } ();
45 		m_taskCounter = other.m_taskCounter;
46 	}
47 
48 	/** Returns the Task instance belonging to the calling task.
49 	*/
50 	static Task getThis() @safe nothrow
51 	{
52 		// In 2067, synchronized statements where annotated nothrow.
53 		// DMD#4115, Druntime#1013, Druntime#1021, Phobos#2704
54 		// However, they were "logically" nothrow before.
55 		static if (__VERSION__ <= 2066)
56 			scope (failure) assert(0, "Internal error: function should be nothrow");
57 
58 		auto fiber = () @trusted { return Fiber.getThis(); } ();
59 		if (!fiber) return Task.init;
60 		auto tfiber = cast(TaskFiber)fiber;
61 		if (!tfiber) return Task.init;
62 		// FIXME: returning a non-.init handle for a finished task might break some layered logic
63 		return Task(tfiber, tfiber.getTaskStatusFromOwnerThread().counter);
64 	}
65 
66 	nothrow {
67 		package @property inout(TaskFiber) taskFiber() inout @system { return cast(inout(TaskFiber))m_fiber; }
68 		@property inout(Fiber) fiber() inout @system { return this.taskFiber; }
69 		@property size_t taskCounter() const @safe { return m_taskCounter; }
70 		@property inout(Thread) thread() inout @trusted { if (m_fiber) return this.taskFiber.thread; return null; }
71 
72 		/** Determines if the task is still running or scheduled to be run.
73 		*/
74 		@property bool running()
75 		const @trusted {
76 			assert(m_fiber !is null, "Invalid task handle");
77 			auto tf = this.taskFiber;
78 			try if (tf.state == Fiber.State.TERM) return false; catch (Throwable) {}
79 			auto st = m_fiber.getTaskStatus();
80 			if (st.counter != m_taskCounter)
81 				return false;
82 			return st.initialized;
83 		}
84 
85 		package @property ref ThreadInfo tidInfo() @system { return m_fiber ? taskFiber.tidInfo : s_tidInfo; } // FIXME: this is not thread safe!
86 		package @property ref const(ThreadInfo) tidInfo() const @system { return m_fiber ? taskFiber.tidInfo : s_tidInfo; } // FIXME: this is not thread safe!
87 
88 		/** Gets the `Tid` associated with this task for use with
89 			`std.concurrency`.
90 		*/
91 		@property Tid tid() @trusted { return tidInfo.ident; }
92 		/// ditto
93 		@property const(Tid) tid() const @trusted { return tidInfo.ident; }
94 	}
95 
96 	T opCast(T)() const @safe nothrow if (is(T == bool)) { return m_fiber !is null; }
97 
98 	void join() @trusted { if (m_fiber) m_fiber.join!true(m_taskCounter); }
99 	void joinUninterruptible() @trusted nothrow { if (m_fiber) m_fiber.join!false(m_taskCounter); }
100 	void interrupt() @trusted nothrow { if (m_fiber) m_fiber.interrupt(m_taskCounter); }
101 
102 	string toString() const @safe { import std..string; return format("%s:%s", () @trusted { return cast(void*)m_fiber; } (), m_taskCounter); }
103 
104 	void getDebugID(R)(ref R dst)
105 	{
106 		import std.digest.md : MD5;
107 		import std.bitmanip : nativeToLittleEndian;
108 		import std.base64 : Base64;
109 
110 		if (!m_fiber) {
111 			dst.put("----");
112 			return;
113 		}
114 
115 		MD5 md;
116 		md.start();
117 		md.put(nativeToLittleEndian(() @trusted { return cast(size_t)cast(void*)m_fiber; } ()));
118 		md.put(nativeToLittleEndian(m_taskCounter));
119 		Base64.encode(md.finish()[0 .. 3], dst);
120 		if (!this.running) dst.put("-fin");
121 	}
122 	string getDebugID()
123 	@trusted {
124 		import std.array : appender;
125 		auto app = appender!string;
126 		getDebugID(app);
127 		return app.data;
128 	}
129 
130 	bool opEquals(in ref Task other) const @safe nothrow { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; }
131 	bool opEquals(in Task other) const @safe nothrow { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; }
132 }
133 
134 
135 /** Settings to control the behavior of newly started tasks.
136 */
137 struct TaskSettings {
138 	/** Scheduling priority of the task
139 
140 		The priority of a task is roughly proportional to the amount of
141 		times it gets scheduled in comparison to competing tasks. For
142 		example, a task with priority 100 will be scheduled every 10 rounds
143 		when competing against a task with priority 1000.
144 
145 		The default priority is defined by `basePriority` and has a value
146 		of 65536. Priorities should be computed relative to `basePriority`.
147 
148 		A task with a priority of zero will only be executed if no other
149 		non-zero task is competing.
150 	*/
151 	uint priority = Task.basePriority;
152 }
153 
154 
155 /**
156 	Implements a task local storage variable.
157 
158 	Task local variables, similar to thread local variables, exist separately
159 	in each task. Consequently, they do not need any form of synchronization
160 	when accessing them.
161 
162 	Note, however, that each TaskLocal variable will increase the memory footprint
163 	of any task that uses task local storage. There is also an overhead to access
164 	TaskLocal variables, higher than for thread local variables, but generelly
165 	still O(1) (since actual storage acquisition is done lazily the first access
166 	can require a memory allocation with unknown computational costs).
167 
168 	Notice:
169 		FiberLocal instances MUST be declared as static/global thread-local
170 		variables. Defining them as a temporary/stack variable will cause
171 		crashes or data corruption!
172 
173 	Examples:
174 		---
175 		TaskLocal!string s_myString = "world";
176 
177 		void taskFunc()
178 		{
179 			assert(s_myString == "world");
180 			s_myString = "hello";
181 			assert(s_myString == "hello");
182 		}
183 
184 		shared static this()
185 		{
186 			// both tasks will get independent storage for s_myString
187 			runTask(&taskFunc);
188 			runTask(&taskFunc);
189 		}
190 		---
191 */
192 struct TaskLocal(T)
193 {
194 	private {
195 		size_t m_offset = size_t.max;
196 		size_t m_id;
197 		T m_initValue;
198 		bool m_hasInitValue = false;
199 	}
200 
201 	this(T init_val) { m_initValue = init_val; m_hasInitValue = true; }
202 
203 	@disable this(this);
204 
205 	void opAssign(T value) { this.storage = value; }
206 
207 	@property ref T storage()
208 	@safe {
209 		import std.conv : emplace;
210 
211 		auto fiber = TaskFiber.getThis();
212 
213 		// lazily register in FLS storage
214 		if (m_offset == size_t.max) {
215 			static assert(T.alignof <= 8, "Unsupported alignment for type "~T.stringof);
216 			assert(TaskFiber.ms_flsFill % 8 == 0, "Misaligned fiber local storage pool.");
217 			m_offset = TaskFiber.ms_flsFill;
218 			m_id = TaskFiber.ms_flsCounter++;
219 
220 
221 			TaskFiber.ms_flsFill += T.sizeof;
222 			while (TaskFiber.ms_flsFill % 8 != 0)
223 				TaskFiber.ms_flsFill++;
224 		}
225 
226 		// make sure the current fiber has enough FLS storage
227 		if (fiber.m_fls.length < TaskFiber.ms_flsFill) {
228 			fiber.m_fls.length = TaskFiber.ms_flsFill + 128;
229 			() @trusted { fiber.m_flsInit.length = TaskFiber.ms_flsCounter + 64; } ();
230 		}
231 
232 		// return (possibly default initialized) value
233 		auto data = () @trusted { return fiber.m_fls.ptr[m_offset .. m_offset+T.sizeof]; } ();
234 		if (!() @trusted { return fiber.m_flsInit[m_id]; } ()) {
235 			() @trusted { fiber.m_flsInit[m_id] = true; } ();
236 			import std.traits : hasElaborateDestructor, hasAliasing;
237 			static if (hasElaborateDestructor!T || hasAliasing!T) {
238 				void function(void[], size_t) destructor = (void[] fls, size_t offset){
239 					static if (hasElaborateDestructor!T) {
240 						auto obj = cast(T*)&fls[offset];
241 						// call the destructor on the object if a custom one is known declared
242 						obj.destroy();
243 					}
244 					else static if (hasAliasing!T) {
245 						// zero the memory to avoid false pointers
246 						foreach (size_t i; offset .. offset + T.sizeof) {
247 							ubyte* u = cast(ubyte*)&fls[i];
248 							*u = 0;
249 						}
250 					}
251 				};
252 				FLSInfo fls_info;
253 				fls_info.fct = destructor;
254 				fls_info.offset = m_offset;
255 
256 				// make sure flsInfo has enough space
257 				if (TaskFiber.ms_flsInfo.length <= m_id)
258 					TaskFiber.ms_flsInfo.length = m_id + 64;
259 
260 				TaskFiber.ms_flsInfo[m_id] = fls_info;
261 			}
262 
263 			if (m_hasInitValue) {
264 				static if (__traits(compiles, () @trusted { emplace!T(data, m_initValue); } ()))
265 					() @trusted { emplace!T(data, m_initValue); } ();
266 				else assert(false, "Cannot emplace initialization value for type "~T.stringof);
267 			} else () @trusted { emplace!T(data); } ();
268 		}
269 		return *() @trusted { return cast(T*)data.ptr; } ();
270 	}
271 
272 	alias storage this;
273 }
274 
275 
276 /** Exception that is thrown by Task.interrupt.
277 */
278 class InterruptException : Exception {
279 	this()
280 	@safe nothrow {
281 		super("Task interrupted.");
282 	}
283 }
284 
285 /**
286 	High level state change events for a Task
287 */
288 enum TaskEvent {
289 	preStart,  /// Just about to invoke the fiber which starts execution
290 	postStart, /// After the fiber has returned for the first time (by yield or exit)
291 	start,     /// Just about to start execution
292 	yield,     /// Temporarily paused
293 	resume,    /// Resumed from a prior yield
294 	end,       /// Ended normally
295 	fail       /// Ended with an exception
296 }
297 
298 struct TaskCreationInfo {
299 	Task handle;
300 	const(void)* functionPointer;
301 }
302 
303 alias TaskEventCallback = void function(TaskEvent, Task) nothrow;
304 alias TaskCreationCallback = void function(ref TaskCreationInfo) nothrow @safe;
305 
306 /**
307 	The maximum combined size of all parameters passed to a task delegate
308 
309 	See_Also: runTask
310 */
311 enum maxTaskParameterSize = 128;
312 
313 
314 /** The base class for a task aka Fiber.
315 
316 	This class represents a single task that is executed concurrently
317 	with other tasks. Each task is owned by a single thread.
318 */
319 final package class TaskFiber : Fiber {
320 	static if ((void*).sizeof >= 8) enum defaultTaskStackSize = 16*1024*1024;
321 	else enum defaultTaskStackSize = 512*1024;
322 
323 	private enum Flags {
324 		running = 1UL << 0,
325 		initialized = 1UL << 1,
326 		interrupt = 1UL << 2,
327 
328 		shiftAmount = 3,
329 		flagsMask = (1<<shiftAmount) - 1
330 	}
331 
332 	private {
333 		import std.concurrency : ThreadInfo;
334 		import std.bitmanip : BitArray;
335 
336 		// task queue management (TaskScheduler.m_taskQueue)
337 		TaskFiber m_prev, m_next;
338 		TaskFiberQueue* m_queue;
339 
340 		Thread m_thread;
341 		ThreadInfo m_tidInfo;
342 		uint m_staticPriority, m_dynamicPriority;
343 		shared ulong m_taskCounterAndFlags = 0; // bits 0-Flags.shiftAmount are flags
344 
345 		bool m_shutdown = false;
346 
347 		shared(ManualEvent) m_onExit;
348 
349 		// task local storage
350 		BitArray m_flsInit;
351 		void[] m_fls;
352 
353 		package int m_yieldLockCount;
354 
355 		static TaskFiber ms_globalDummyFiber;
356 		static FLSInfo[] ms_flsInfo;
357 		static size_t ms_flsFill = 0; // thread-local
358 		static size_t ms_flsCounter = 0;
359 	}
360 
361 
362 	package TaskFuncInfo m_taskFunc;
363 	package __gshared size_t ms_taskStackSize = defaultTaskStackSize;
364 	package __gshared debug TaskEventCallback ms_taskEventCallback;
365 	package __gshared debug TaskCreationCallback ms_taskCreationCallback;
366 
367 	this()
368 	@trusted nothrow {
369 		super(&run, ms_taskStackSize);
370 		m_onExit = createSharedManualEvent();
371 		m_thread = Thread.getThis();
372 	}
373 
374 	static TaskFiber getThis()
375 	@safe nothrow {
376 		auto f = () @trusted nothrow { return Fiber.getThis(); } ();
377 		if (auto tf = cast(TaskFiber)f) return tf;
378 		if (!ms_globalDummyFiber) ms_globalDummyFiber = new TaskFiber;
379 		return ms_globalDummyFiber;
380 	}
381 
382 	// expose Fiber.state as @safe on older DMD versions
383 	static if (!__traits(compiles, () @safe { return Fiber.init.state; } ()))
384 		@property State state() @trusted const nothrow { return super.state; }
385 
386 	private void run()
387 	nothrow {
388 		import std.algorithm.mutation : swap;
389 		import std.concurrency : Tid, thisTid;
390 		import std.encoding : sanitize;
391 		import vibe.core.core : isEventLoopRunning, recycleFiber, taskScheduler, yield, yieldLock;
392 
393 		version (VibeDebugCatchAll) alias UncaughtException = Throwable;
394 		else alias UncaughtException = Exception;
395 		try {
396 			while (true) {
397 				while (!m_taskFunc.func) {
398 					try {
399 						debug (VibeTaskLog) logTrace("putting fiber to sleep waiting for new task...");
400 						Fiber.yield();
401 					} catch (Exception e) {
402 						logWarn("CoreTaskFiber was resumed with exception but without active task!");
403 						logDiagnostic("Full error: %s", e.toString().sanitize());
404 					}
405 					if (m_shutdown) return;
406 				}
407 
408 				debug assert(Thread.getThis() is m_thread, "Fiber moved between threads!?");
409 
410 				TaskFuncInfo task;
411 				swap(task, m_taskFunc);
412 				m_dynamicPriority = m_staticPriority = task.settings.priority;
413 				Task handle = this.task;
414 				try {
415 					atomicOp!"|="(m_taskCounterAndFlags, Flags.running); // set running
416 					scope(exit) atomicOp!"&="(m_taskCounterAndFlags, ~Flags.flagsMask); // clear running/initialized
417 
418 					thisTid; // force creation of a message box
419 
420 					debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.start, handle);
421 					if (!isEventLoopRunning) {
422 						debug (VibeTaskLog) logTrace("Event loop not running at task start - yielding.");
423 						taskScheduler.yieldUninterruptible();
424 						debug (VibeTaskLog) logTrace("Initial resume of task.");
425 					}
426 					task.call();
427 					debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.end, handle);
428 
429 					debug if (() @trusted { return (cast(shared)this); } ().getTaskStatus().interrupt)
430 						logDebug("Task exited while an interrupt was in flight.");
431 				} catch (Exception e) {
432 					debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.fail, handle);
433 					import std.encoding;
434 					logCritical("Task terminated with uncaught exception: %s", e.msg);
435 					logDebug("Full error: %s", e.toString().sanitize());
436 				}
437 
438 				debug assert(Thread.getThis() is m_thread, "Fiber moved?");
439 
440 				this.tidInfo.ident = Tid.init; // clear message box
441 
442 				debug (VibeTaskLog) logTrace("Notifying joining tasks.");
443 
444 				// Issue #161: This fiber won't be resumed before the next task
445 				// is assigned, because it is already marked as de-initialized.
446 				// Since ManualEvent.emit() will need to switch tasks, this
447 				// would mean that only the first waiter is notified before
448 				// this fiber gets a new task assigned.
449 				// Using a yield lock forces all corresponding tasks to be
450 				// enqueued into the schedule queue and resumed in sequence
451 				// at the end of the scope.
452 				auto l = yieldLock();
453 
454 				m_onExit.emit();
455 
456 				// make sure that the task does not get left behind in the yielder queue if terminated during yield()
457 				if (m_queue) m_queue.remove(this);
458 
459 				// zero the fls initialization ByteArray for memory safety
460 				foreach (size_t i, ref bool b; m_flsInit) {
461 					if (b) {
462 						if (ms_flsInfo !is null && ms_flsInfo.length >= i && ms_flsInfo[i] != FLSInfo.init)
463 							ms_flsInfo[i].destroy(m_fls);
464 						b = false;
465 					}
466 				}
467 
468 				assert(!m_queue, "Fiber done but still scheduled to be resumed!?");
469 
470 				debug assert(Thread.getThis() is m_thread, "Fiber moved between threads!?");
471 
472 				// make the fiber available for the next task
473 				recycleFiber(this);
474 			}
475 		} catch(UncaughtException th) {
476 			logCritical("CoreTaskFiber was terminated unexpectedly: %s", th.msg);
477 			logDiagnostic("Full error: %s", th.toString().sanitize());
478 		} catch (Throwable th) {
479 			import std.stdio : stderr, writeln;
480 			import core.stdc.stdlib : abort;
481 			try stderr.writeln(th);
482 			catch (Exception e) {
483 				try stderr.writeln(th.msg);
484 				catch (Exception e) {}
485 			}
486 			abort();
487 		}
488 	}
489 
490 
491 	/** Returns the thread that owns this task.
492 	*/
493 	@property inout(Thread) thread() inout @safe nothrow { return m_thread; }
494 
495 	/** Returns the handle of the current Task running on this fiber.
496 	*/
497 	@property Task task()
498 	@safe nothrow {
499 		auto ts = getTaskStatusFromOwnerThread();
500 		if (!ts.initialized) return Task.init;
501 		return Task(this, ts.counter);
502 	}
503 
504 	@property ref inout(ThreadInfo) tidInfo() inout @safe nothrow { return m_tidInfo; }
505 
506 	/** Shuts down the task handler loop.
507 	*/
508 	void shutdown()
509 	@safe nothrow {
510 		debug assert(Thread.getThis() is m_thread);
511 
512 		assert(!() @trusted { return cast(shared)this; } ().getTaskStatus().initialized);
513 
514 		m_shutdown = true;
515 		while (state != Fiber.State.TERM)
516 			() @trusted {
517 				try call(Fiber.Rethrow.no);
518 				catch (Exception e) assert(false, e.msg);
519 			} ();
520 		}
521 
522 	/** Blocks until the task has ended.
523 	*/
524 	void join(bool interruptiple)(size_t task_counter)
525 	shared @trusted {
526 		auto cnt = m_onExit.emitCount;
527 		while (true) {
528 			auto st = getTaskStatus();
529 			if (!st.initialized || st.counter != task_counter)
530 				break;
531 			static if (interruptiple)
532 				cnt = m_onExit.wait(cnt);
533 			else
534 				cnt = m_onExit.waitUninterruptible(cnt);
535 		}
536 	}
537 
538 	/** Throws an InterruptExeption within the task as soon as it calls an interruptible function.
539 	*/
540 	void interrupt(size_t task_counter)
541 	shared @safe nothrow {
542 		import vibe.core.core : taskScheduler;
543 
544 		auto caller = () @trusted { return cast(shared)TaskFiber.getThis(); } ();
545 
546 		assert(caller !is this, "A task cannot interrupt itself.");
547 
548 		while (true) {
549 			auto tcf = atomicLoad(m_taskCounterAndFlags);
550 			auto st = getTaskStatus(tcf);
551 			if (!st.initialized || st.interrupt || st.counter != task_counter)
552 				return;
553 			auto tcf_int = tcf | Flags.interrupt;
554 			if (cas(&m_taskCounterAndFlags, tcf, tcf_int))
555 				break;
556 		}
557 
558 		if (caller.m_thread is m_thread) {
559 			auto thisus = () @trusted { return cast()this; } ();
560 			debug (VibeTaskLog) logTrace("Resuming task with interrupt flag.");
561 			auto defer = caller.m_yieldLockCount > 0;
562 			taskScheduler.switchTo(thisus.task, defer ? TaskSwitchPriority.prioritized : TaskSwitchPriority.immediate);
563 		} else {
564 			debug (VibeTaskLog) logTrace("Set interrupt flag on task without resuming.");
565 		}
566 	}
567 
568 	/** Sets the fiber to initialized state and increments the task counter.
569 
570 		Note that the task information needs to be set up first.
571 	*/
572 	void bumpTaskCounter()
573 	@safe nothrow {
574 		debug {
575 			auto ts = atomicLoad(m_taskCounterAndFlags);
576 			assert((ts & Flags.flagsMask) == 0, "bumpTaskCounter() called on fiber with non-zero flags");
577 			assert(m_taskFunc.func !is null, "bumpTaskCounter() called without initializing the task function");
578 		}
579 
580 		() @trusted { atomicOp!"+="(m_taskCounterAndFlags, (1 << Flags.shiftAmount) + Flags.initialized); } ();
581 	}
582 
583 	private auto getTaskStatus()
584 	shared const @safe nothrow {
585 		return getTaskStatus(atomicLoad(m_taskCounterAndFlags));
586 	}
587 
588 	private auto getTaskStatusFromOwnerThread()
589 	const @safe nothrow {
590 		debug assert(Thread.getThis() is m_thread);
591 		return getTaskStatus(atomicLoad(m_taskCounterAndFlags));
592 	}
593 
594 	private static auto getTaskStatus(ulong counter_and_flags)
595 	@safe nothrow {
596 		static struct S {
597 			size_t counter;
598 			bool running;
599 			bool initialized;
600 			bool interrupt;
601 		}
602 		S ret;
603 		ret.counter = cast(size_t)(counter_and_flags >> Flags.shiftAmount);
604 		ret.running = (counter_and_flags & Flags.running) != 0;
605 		ret.initialized = (counter_and_flags & Flags.initialized) != 0;
606 		ret.interrupt = (counter_and_flags & Flags.interrupt) != 0;
607 		return ret;
608 	}
609 
610 	package void handleInterrupt(scope void delegate() @safe nothrow on_interrupt)
611 	@safe nothrow {
612 		assert(() @trusted { return Task.getThis().fiber; } () is this,
613 			"Handling interrupt outside of the corresponding fiber.");
614 		if (getTaskStatusFromOwnerThread().interrupt && on_interrupt) {
615 			debug (VibeTaskLog) logTrace("Handling interrupt flag.");
616 			clearInterruptFlag();
617 			on_interrupt();
618 		}
619 	}
620 
621 	package void handleInterrupt()
622 	@safe {
623 		assert(() @trusted { return Task.getThis().fiber; } () is this,
624 			"Handling interrupt outside of the corresponding fiber.");
625 		if (getTaskStatusFromOwnerThread().interrupt) {
626 			clearInterruptFlag();
627 			throw new InterruptException;
628 		}
629 	}
630 
631 	private void clearInterruptFlag()
632 	@safe nothrow {
633 		auto tcf = atomicLoad(m_taskCounterAndFlags);
634 		auto st = getTaskStatus(tcf);
635 		while (true) {
636 			assert(st.initialized);
637 			if (!st.interrupt) break;
638 			auto tcf_int = tcf & ~Flags.interrupt;
639 			if (cas(&m_taskCounterAndFlags, tcf, tcf_int))
640 				break;
641 		}
642 	}
643 }
644 
645 
646 /** Controls the priority to use for switching execution to a task.
647 */
648 enum TaskSwitchPriority {
649 	/** Rescheduled according to the tasks priority
650 	*/
651 	normal,
652 
653 	/** Rescheduled with maximum priority.
654 
655 		The task will resume as soon as the current task yields.
656 	*/
657 	prioritized,
658 
659 	/** Switch to the task immediately.
660 	*/
661 	immediate
662 }
663 
664 package struct TaskFuncInfo {
665 	void function(ref TaskFuncInfo) func;
666 	void[2*size_t.sizeof] callable;
667 	void[maxTaskParameterSize] args;
668 	debug ulong functionPointer;
669 	TaskSettings settings;
670 
671 	void set(CALLABLE, ARGS...)(ref CALLABLE callable, ref ARGS args)
672 	{
673 		assert(!func, "Setting TaskFuncInfo that is already set.");
674 
675 		import std.algorithm : move;
676 		import std.traits : hasElaborateAssign;
677 		import std.conv : to;
678 
679 		static struct TARGS { ARGS expand; }
680 
681 		static assert(CALLABLE.sizeof <= TaskFuncInfo.callable.length,
682 			"Storage required for task callable is too large ("~CALLABLE.sizeof~" vs max "~callable.length~"): "~CALLABLE.stringof);
683 		static assert(TARGS.sizeof <= maxTaskParameterSize,
684 			"The arguments passed to run(Worker)Task must not exceed "~
685 			maxTaskParameterSize.to!string~" bytes in total size: "~TARGS.sizeof.stringof~" bytes");
686 
687 		debug functionPointer = callPointer(callable);
688 
689 		static void callDelegate(ref TaskFuncInfo tfi) {
690 			assert(tfi.func is &callDelegate, "Wrong callDelegate called!?");
691 
692 			// copy original call data to stack
693 			CALLABLE c;
694 			TARGS args;
695 			move(*(cast(CALLABLE*)tfi.callable.ptr), c);
696 			move(*(cast(TARGS*)tfi.args.ptr), args);
697 
698 			// reset the info
699 			tfi.func = null;
700 
701 			// make the call
702 			mixin(callWithMove!ARGS("c", "args.expand"));
703 		}
704 
705 		func = &callDelegate;
706 
707 		() @trusted {
708 			static if (hasElaborateAssign!CALLABLE) initCallable!CALLABLE();
709 			static if (hasElaborateAssign!TARGS) initArgs!TARGS();
710 			typedCallable!CALLABLE = callable;
711 			foreach (i, A; ARGS) {
712 				static if (needsMove!A) args[i].move(typedArgs!TARGS.expand[i]);
713 				else typedArgs!TARGS.expand[i] = args[i];
714 			}
715 		} ();
716 	}
717 
718 	void call()
719 	{
720 		this.func(this);
721 	}
722 
723 	@property ref C typedCallable(C)()
724 	{
725 		static assert(C.sizeof <= callable.sizeof);
726 		return *cast(C*)callable.ptr;
727 	}
728 
729 	@property ref A typedArgs(A)()
730 	{
731 		static assert(A.sizeof <= args.sizeof);
732 		return *cast(A*)args.ptr;
733 	}
734 
735 	void initCallable(C)()
736 	nothrow {
737 		static const C cinit;
738 		this.callable[0 .. C.sizeof] = cast(void[])(&cinit)[0 .. 1];
739 	}
740 
741 	void initArgs(A)()
742 	nothrow {
743 		static const A ainit;
744 		this.args[0 .. A.sizeof] = cast(void[])(&ainit)[0 .. 1];
745 	}
746 }
747 
748 private ulong callPointer(C)(ref C callable)
749 @trusted nothrow @nogc {
750 	alias IP = ulong;
751 	static if (is(C == function)) return cast(IP)cast(void*)callable;
752 	else static if (is(C == delegate)) return cast(IP)callable.funcptr;
753 	else static if (is(typeof(&callable.opCall) == function)) return cast(IP)cast(void*)&callable.opCall;
754 	else static if (is(typeof(&callable.opCall) == delegate)) return cast(IP)(&callable.opCall).funcptr;
755 	else return cast(IP)&callable;
756 }
757 
758 package struct TaskScheduler {
759 	import eventcore.driver : ExitReason;
760 	import eventcore.core : eventDriver;
761 
762 	private {
763 		TaskFiberQueue m_taskQueue;
764 	}
765 
766 	@safe:
767 
768 	@disable this(this);
769 
770 	@property size_t scheduledTaskCount() const nothrow { return m_taskQueue.length; }
771 
772 	/** Lets other pending tasks execute before continuing execution.
773 
774 		This will give other tasks or events a chance to be processed. If
775 		multiple tasks call this function, they will be processed in a
776 		fírst-in-first-out manner.
777 	*/
778 	void yield()
779 	{
780 		auto t = Task.getThis();
781 		if (t == Task.init) return; // not really a task -> no-op
782 		auto tf = () @trusted { return t.taskFiber; } ();
783 		debug (VibeTaskLog) logTrace("Yielding (interrupt=%s)", () @trusted { return (cast(shared)tf).getTaskStatus().interrupt; } ());
784 		tf.handleInterrupt();
785 		if (tf.m_queue !is null) return; // already scheduled to be resumed
786 		doYieldAndReschedule(t);
787 		tf.handleInterrupt();
788 	}
789 
790 	nothrow:
791 
792 	/** Performs a single round of scheduling without blocking.
793 
794 		This will execute scheduled tasks and process events from the
795 		event queue, as long as possible without having to wait.
796 
797 		Returns:
798 			A reason is returned:
799 			$(UL
800 				$(LI `ExitReason.exit`: The event loop was exited due to a manual request)
801 				$(LI `ExitReason.outOfWaiters`: There are no more scheduled
802 					tasks or events, so the application would do nothing from
803 					now on)
804 				$(LI `ExitReason.idle`: All scheduled tasks and pending events
805 					have been processed normally)
806 				$(LI `ExitReason.timeout`: Scheduled tasks have been processed,
807 					but there were no pending events present.)
808 			)
809 	*/
810 	ExitReason process()
811 	{
812 		assert(TaskFiber.getThis().m_yieldLockCount == 0, "May not process events within an active yieldLock()!");
813 
814 		bool any_events = false;
815 		while (true) {
816 			// process pending tasks
817 			bool any_tasks_processed = schedule() != ScheduleStatus.idle;
818 
819 			debug (VibeTaskLog) logTrace("Processing pending events...");
820 			ExitReason er = eventDriver.core.processEvents(0.seconds);
821 			debug (VibeTaskLog) logTrace("Done.");
822 
823 			final switch (er) {
824 				case ExitReason.exited: return ExitReason.exited;
825 				case ExitReason.outOfWaiters:
826 					if (!scheduledTaskCount)
827 						return ExitReason.outOfWaiters;
828 					break;
829 				case ExitReason.timeout:
830 					if (!scheduledTaskCount)
831 						return any_events || any_tasks_processed ? ExitReason.idle : ExitReason.timeout;
832 					break;
833 				case ExitReason.idle:
834 					any_events = true;
835 					if (!scheduledTaskCount)
836 						return ExitReason.idle;
837 					break;
838 			}
839 		}
840 	}
841 
842 	/** Performs a single round of scheduling, blocking if necessary.
843 
844 		Returns:
845 			A reason is returned:
846 			$(UL
847 				$(LI `ExitReason.exit`: The event loop was exited due to a manual request)
848 				$(LI `ExitReason.outOfWaiters`: There are no more scheduled
849 					tasks or events, so the application would do nothing from
850 					now on)
851 				$(LI `ExitReason.idle`: All scheduled tasks and pending events
852 					have been processed normally)
853 			)
854 	*/
855 	ExitReason waitAndProcess()
856 	{
857 		// first, process tasks without blocking
858 		auto er = process();
859 
860 		final switch (er) {
861 			case ExitReason.exited, ExitReason.outOfWaiters: return er;
862 			case ExitReason.idle: return ExitReason.idle;
863 			case ExitReason.timeout: break;
864 		}
865 
866 		// if the first run didn't process any events, block and
867 		// process one chunk
868 		debug (VibeTaskLog) logTrace("Wait for new events to process...");
869 		er = eventDriver.core.processEvents(Duration.max);
870 		debug (VibeTaskLog) logTrace("Done.");
871 		final switch (er) {
872 			case ExitReason.exited: return ExitReason.exited;
873 			case ExitReason.outOfWaiters:
874 				if (!scheduledTaskCount)
875 					return ExitReason.outOfWaiters;
876 				break;
877 			case ExitReason.timeout: assert(false, "Unexpected return code");
878 			case ExitReason.idle: break;
879 		}
880 
881 		// finally, make sure that all scheduled tasks are run
882 		er = process();
883 		if (er == ExitReason.timeout) return ExitReason.idle;
884 		else return er;
885 	}
886 
887 	void yieldUninterruptible()
888 	{
889 		auto t = Task.getThis();
890 		if (t == Task.init) return; // not really a task -> no-op
891 		auto tf = () @trusted { return t.taskFiber; } ();
892 		if (tf.m_queue !is null) return; // already scheduled to be resumed
893 		doYieldAndReschedule(t);
894 	}
895 
896 	/** Holds execution until the task gets explicitly resumed.
897 	*/
898 	void hibernate()
899 	{
900 		import vibe.core.core : isEventLoopRunning;
901 		auto thist = Task.getThis();
902 		if (thist == Task.init) {
903 			assert(!isEventLoopRunning, "Event processing outside of a fiber should only happen before the event loop is running!?");
904 			static import vibe.core.core;
905 			vibe.core.core.runEventLoopOnce();
906 		} else {
907 			doYield(thist);
908 		}
909 	}
910 
911 	/** Immediately switches execution to the specified task without giving up execution privilege.
912 
913 		This forces immediate execution of the specified task. After the tasks finishes or yields,
914 		the calling task will continue execution.
915 	*/
916 	void switchTo(Task t, TaskSwitchPriority priority)
917 	{
918 		auto thist = Task.getThis();
919 
920 		if (t == thist) return;
921 
922 		auto thisthr = thist ? thist.thread : () @trusted { return Thread.getThis(); } ();
923 		assert(t.thread is thisthr, "Cannot switch to a task that lives in a different thread.");
924 
925 		auto tf = () @trusted { return t.taskFiber; } ();
926 		if (tf.m_queue) {
927 			// don't reset the position of already scheduled tasks
928 			if (priority == TaskSwitchPriority.normal) return;
929 
930 			debug (VibeTaskLog) logTrace("Task to switch to is already scheduled. Moving to front of queue.");
931 			assert(tf.m_queue is &m_taskQueue, "Task is already enqueued, but not in the main task queue.");
932 			m_taskQueue.remove(tf);
933 			assert(!tf.m_queue, "Task removed from queue, but still has one set!?");
934 		}
935 
936 		if (thist == Task.init && priority == TaskSwitchPriority.immediate) {
937 			assert(TaskFiber.getThis().m_yieldLockCount == 0, "Cannot yield within an active yieldLock()!");
938 			debug (VibeTaskLog) logTrace("switch to task from global context");
939 			resumeTask(t);
940 			debug (VibeTaskLog) logTrace("task yielded control back to global context");
941 		} else {
942 			auto thistf = () @trusted { return thist.taskFiber; } ();
943 			assert(!thistf || !thistf.m_queue, "Calling task is running, but scheduled to be resumed!?");
944 
945 			debug (VibeTaskLog) logDebugV("Switching tasks (%s already in queue)", m_taskQueue.length);
946 			final switch (priority) {
947 				case TaskSwitchPriority.normal:
948 					reschedule(tf);
949 					break;
950 				case TaskSwitchPriority.prioritized:
951 					tf.m_dynamicPriority = uint.max;
952 					reschedule(tf);
953 					break;
954 				case TaskSwitchPriority.immediate:
955 					tf.m_dynamicPriority = uint.max;
956 					m_taskQueue.insertFront(thistf);
957 					m_taskQueue.insertFront(tf);
958 					doYield(thist);
959 					break;
960 			}
961 		}
962 	}
963 
964 	/** Runs any pending tasks.
965 
966 		A pending tasks is a task that is scheduled to be resumed by either `yield` or
967 		`switchTo`.
968 
969 		Returns:
970 			Returns `true` $(I iff) there are more tasks left to process.
971 	*/
972 	ScheduleStatus schedule()
973 	nothrow {
974 		if (m_taskQueue.empty)
975 			return ScheduleStatus.idle;
976 
977 
978 		assert(Task.getThis() == Task.init, "TaskScheduler.schedule() may not be called from a task!");
979 
980 		if (m_taskQueue.empty) return ScheduleStatus.idle;
981 
982 		foreach (i; 0 .. m_taskQueue.length) {
983 			auto t = m_taskQueue.front;
984 			m_taskQueue.popFront();
985 
986 			// reset priority
987 			t.m_dynamicPriority = t.m_staticPriority;
988 
989 			debug (VibeTaskLog) logTrace("resuming task");
990 			auto task = t.task;
991 			if (task != Task.init)
992 				resumeTask(t.task);
993 			debug (VibeTaskLog) logTrace("task out");
994 
995 			if (m_taskQueue.empty) break;
996 		}
997 
998 		debug (VibeTaskLog) logDebugV("schedule finished - %s tasks left in queue", m_taskQueue.length);
999 
1000 		return m_taskQueue.empty ? ScheduleStatus.allProcessed : ScheduleStatus.busy;
1001 	}
1002 
1003 	/// Resumes execution of a yielded task.
1004 	private void resumeTask(Task t)
1005 	nothrow {
1006 		import std.encoding : sanitize;
1007 
1008 		assert(t != Task.init, "Resuming null task");
1009 
1010 		debug (VibeTaskLog) logTrace("task fiber resume");
1011 		auto uncaught_exception = () @trusted nothrow { return t.fiber.call!(Fiber.Rethrow.no)(); } ();
1012 		debug (VibeTaskLog) logTrace("task fiber yielded");
1013 
1014 		if (uncaught_exception) {
1015 			auto th = cast(Throwable)uncaught_exception;
1016 			assert(th, "Fiber returned exception object that is not a Throwable!?");
1017 
1018 			assert(() @trusted nothrow { return t.fiber.state; } () == Fiber.State.TERM);
1019 			logError("Task terminated with unhandled exception: %s", th.msg);
1020 			logDebug("Full error: %s", () @trusted { return th.toString().sanitize; } ());
1021 
1022 			// always pass Errors on
1023 			if (auto err = cast(Error)th) throw err;
1024 		}
1025 	}
1026 
1027 	private void reschedule(TaskFiber tf)
1028 	{
1029 		import std.algorithm.comparison : min;
1030 
1031 		// insert according to priority, limited to a priority
1032 		// factor of 1:10 in case of heavy concurrency
1033 		m_taskQueue.insertBackPred(tf, 10, (t) {
1034 			if (t.m_dynamicPriority >= tf.m_dynamicPriority)
1035 				return true;
1036 
1037 			// increase dynamic priority each time a task gets overtaken to
1038 			// ensure a fair schedule
1039 			t.m_dynamicPriority += min(t.m_staticPriority, uint.max - t.m_dynamicPriority);
1040 			return false;
1041 		});
1042 	}
1043 
1044 	private void doYieldAndReschedule(Task task)
1045 	{
1046 		auto tf = () @trusted { return task.taskFiber; } ();
1047 
1048 		reschedule(tf);
1049 
1050 		doYield(task);
1051 	}
1052 
1053 	private void doYield(Task task)
1054 	{
1055 		assert(() @trusted { return task.taskFiber; } ().m_yieldLockCount == 0, "May not yield while in an active yieldLock()!");
1056 		debug if (TaskFiber.ms_taskEventCallback) () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.yield, task); } ();
1057 		() @trusted { Fiber.yield(); } ();
1058 		debug if (TaskFiber.ms_taskEventCallback) () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.resume, task); } ();
1059 		assert(!task.m_fiber.m_queue, "Task is still scheduled after resumption.");
1060 	}
1061 }
1062 
1063 package enum ScheduleStatus {
1064 	idle,
1065 	allProcessed,
1066 	busy
1067 }
1068 
1069 private struct TaskFiberQueue {
1070 	@safe nothrow:
1071 
1072 	TaskFiber first, last;
1073 	size_t length;
1074 
1075 	@disable this(this);
1076 
1077 	@property bool empty() const { return first is null; }
1078 
1079 	@property TaskFiber front() { return first; }
1080 
1081 	void insertFront(TaskFiber task)
1082 	{
1083 		assert(task.m_queue is null, "Task is already scheduled to be resumed!");
1084 		assert(task.m_prev is null, "Task has m_prev set without being in a queue!?");
1085 		assert(task.m_next is null, "Task has m_next set without being in a queue!?");
1086 		task.m_queue = &this;
1087 		if (empty) {
1088 			first = task;
1089 			last = task;
1090 		} else {
1091 			first.m_prev = task;
1092 			task.m_next = first;
1093 			first = task;
1094 		}
1095 		length++;
1096 	}
1097 
1098 	void insertBack(TaskFiber task)
1099 	{
1100 		assert(task.m_queue is null, "Task is already scheduled to be resumed!");
1101 		assert(task.m_prev is null, "Task has m_prev set without being in a queue!?");
1102 		assert(task.m_next is null, "Task has m_next set without being in a queue!?");
1103 		task.m_queue = &this;
1104 		if (empty) {
1105 			first = task;
1106 			last = task;
1107 		} else {
1108 			last.m_next = task;
1109 			task.m_prev = last;
1110 			last = task;
1111 		}
1112 		length++;
1113 	}
1114 
1115 	// inserts a task after the first task for which the predicate yields `true`,
1116 	// starting from the back. a maximum of max_skip tasks will be skipped
1117 	// before the task is inserted regardless of the predicate.
1118 	void insertBackPred(TaskFiber task, size_t max_skip,
1119 		scope bool delegate(TaskFiber) @safe nothrow pred)
1120 	{
1121 		assert(task.m_queue is null, "Task is already scheduled to be resumed!");
1122 		assert(task.m_prev is null, "Task has m_prev set without being in a queue!?");
1123 		assert(task.m_next is null, "Task has m_next set without being in a queue!?");
1124 
1125 		for (auto t = last; t; t = t.m_prev) {
1126 			if (!max_skip-- || pred(t)) {
1127 				task.m_queue = &this;
1128 				task.m_next = t.m_next;
1129 				if (task.m_next) task.m_next.m_prev = task;
1130 				t.m_next = task;
1131 				task.m_prev = t;
1132 				if (!task.m_next) last = task;
1133 				length++;
1134 				return;
1135 			}
1136 		}
1137 
1138 		insertFront(task);
1139 	}
1140 
1141 	void popFront()
1142 	{
1143 		if (first is last) last = null;
1144 		assert(first && first.m_queue == &this, "Popping from empty or mismatching queue");
1145 		auto next = first.m_next;
1146 		if (next) next.m_prev = null;
1147 		first.m_next = null;
1148 		first.m_queue = null;
1149 		first = next;
1150 		length--;
1151 	}
1152 
1153 	void remove(TaskFiber task)
1154 	{
1155 		assert(task.m_queue is &this, "Task is not contained in task queue.");
1156 		if (task.m_prev) task.m_prev.m_next = task.m_next;
1157 		else first = task.m_next;
1158 		if (task.m_next) task.m_next.m_prev = task.m_prev;
1159 		else last = task.m_prev;
1160 		task.m_queue = null;
1161 		task.m_prev = null;
1162 		task.m_next = null;
1163 		length--;
1164 	}
1165 }
1166 
1167 unittest {
1168 	auto f1 = new TaskFiber;
1169 	auto f2 = new TaskFiber;
1170 
1171 	TaskFiberQueue q;
1172 	assert(q.empty && q.length == 0);
1173 	q.insertFront(f1);
1174 	assert(!q.empty && q.length == 1);
1175 	q.insertFront(f2);
1176 	assert(!q.empty && q.length == 2);
1177 	q.popFront();
1178 	assert(!q.empty && q.length == 1);
1179 	q.popFront();
1180 	assert(q.empty && q.length == 0);
1181 	q.insertFront(f1);
1182 	q.remove(f1);
1183 	assert(q.empty && q.length == 0);
1184 }
1185 
1186 unittest {
1187 	auto f1 = new TaskFiber;
1188 	auto f2 = new TaskFiber;
1189 	auto f3 = new TaskFiber;
1190 	auto f4 = new TaskFiber;
1191 	auto f5 = new TaskFiber;
1192 	auto f6 = new TaskFiber;
1193 	TaskFiberQueue q;
1194 
1195 	void checkQueue()
1196 	{
1197 		TaskFiber p;
1198 		for (auto t = q.front; t; t = t.m_next) {
1199 			assert(t.m_prev is p);
1200 			assert(t.m_next || t is q.last);
1201 			p = t;
1202 		}
1203 
1204 		TaskFiber n;
1205 		for (auto t = q.last; t; t = t.m_prev) {
1206 			assert(t.m_next is n);
1207 			assert(t.m_prev || t is q.first);
1208 			n = t;
1209 		}
1210 	}
1211 
1212 	q.insertBackPred(f1, 0, delegate bool(tf) { assert(false); });
1213 	assert(q.first is f1 && q.last is f1);
1214 	checkQueue();
1215 
1216 	q.insertBackPred(f2, 0, delegate bool(tf) { assert(false); });
1217 	assert(q.first is f1 && q.last is f2);
1218 	checkQueue();
1219 
1220 	q.insertBackPred(f3, 1, (tf) => false);
1221 	assert(q.first is f1 && q.last is f2);
1222 	assert(f1.m_next is f3);
1223 	assert(f3.m_prev is f1);
1224 	checkQueue();
1225 
1226 	q.insertBackPred(f4, 10, (tf) => false);
1227 	assert(q.first is f4 && q.last is f2);
1228 	checkQueue();
1229 
1230 	q.insertBackPred(f5, 10, (tf) => true);
1231 	assert(q.first is f4 && q.last is f5);
1232 	checkQueue();
1233 
1234 	q.insertBackPred(f6, 10, (tf) => tf is f4);
1235 	assert(q.first is f4 && q.last is f5);
1236 	assert(f4.m_next is f6);
1237 	checkQueue();
1238 }
1239 
1240 private struct FLSInfo {
1241 	void function(void[], size_t) fct;
1242 	size_t offset;
1243 	void destroy(void[] fls) {
1244 		fct(fls, offset);
1245 	}
1246 }
1247 
1248 // mixin string helper to call a function with arguments that potentially have
1249 // to be moved
1250 package string callWithMove(ARGS...)(string func, string args)
1251 {
1252 	import std..string;
1253 	string ret = func ~ "(";
1254 	foreach (i, T; ARGS) {
1255 		if (i > 0) ret ~= ", ";
1256 		ret ~= format("%s[%s]", args, i);
1257 		static if (needsMove!T) ret ~= ".move";
1258 	}
1259 	return ret ~ ");";
1260 }
1261 
1262 private template needsMove(T)
1263 {
1264 	template isCopyable(T)
1265 	{
1266 		enum isCopyable = __traits(compiles, (T a) { return a; });
1267 	}
1268 
1269 	template isMoveable(T)
1270 	{
1271 		enum isMoveable = __traits(compiles, (T a) { return a.move; });
1272 	}
1273 
1274 	enum needsMove = !isCopyable!T;
1275 
1276 	static assert(isCopyable!T || isMoveable!T,
1277 				  "Non-copyable type "~T.stringof~" must be movable with a .move property.");
1278 }
1279 
1280 unittest {
1281 	enum E { a, move }
1282 	static struct S {
1283 		@disable this(this);
1284 		@property S move() { return S.init; }
1285 	}
1286 	static struct T { @property T move() { return T.init; } }
1287 	static struct U { }
1288 	static struct V {
1289 		@disable this();
1290 		@disable this(this);
1291 		@property V move() { return V.init; }
1292 	}
1293 	static struct W { @disable this(); }
1294 
1295 	static assert(needsMove!S);
1296 	static assert(!needsMove!int);
1297 	static assert(!needsMove!string);
1298 	static assert(!needsMove!E);
1299 	static assert(!needsMove!T);
1300 	static assert(!needsMove!U);
1301 	static assert(needsMove!V);
1302 	static assert(!needsMove!W);
1303 }