1 /**
2 	Contains interfaces and enums for evented I/O drivers.
4 	Copyright: © 2012-2020 Sönke Ludwig
5 	Authors: Sönke Ludwig
6 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
7 */
8 module vibe.core.task;
10 import vibe.core.log;
11 import vibe.core.sync;
13 import core.atomic : atomicOp, atomicLoad, atomicStore, cas;
14 import core.thread;
15 import std.exception;
16 import std.traits;
17 import std.typecons;
20 /** Represents a single task as started using vibe.core.runTask.
22 	Note that the Task type is considered weakly isolated and thus can be
23 	passed between threads using vibe.core.concurrency.send or by passing
24 	it as a parameter to vibe.core.core.runWorkerTask.
25 */
26 struct Task {
27 	private {
28 		shared(TaskFiber) m_fiber;
29 		size_t m_taskCounter;
30 		import std.concurrency : ThreadInfo, Tid;
31 		static ThreadInfo s_tidInfo;
32 	}
34 	enum basePriority = 0x00010000;
36 	private this(TaskFiber fiber, size_t task_counter)
37 	@safe nothrow {
38 		() @trusted { m_fiber = cast(shared)fiber; } ();
39 		m_taskCounter = task_counter;
40 	}
42 	this(in Task other)
43 	@safe nothrow {
44 		m_fiber = () @trusted { return cast(shared(TaskFiber))other.m_fiber; } ();
45 		m_taskCounter = other.m_taskCounter;
46 	}
48 	/** Returns the Task instance belonging to the calling task.
49 	*/
50 	static Task getThis() @safe nothrow
51 	{
52 		// In 2067, synchronized statements where annotated nothrow.
53 		// DMD#4115, Druntime#1013, Druntime#1021, Phobos#2704
54 		// However, they were "logically" nothrow before.
55 		static if (__VERSION__ <= 2066)
56 			scope (failure) assert(0, "Internal error: function should be nothrow");
58 		auto fiber = () @trusted { return Fiber.getThis(); } ();
59 		if (!fiber) return Task.init;
60 		auto tfiber = cast(TaskFiber)fiber;
61 		if (!tfiber) return Task.init;
62 		// FIXME: returning a non-.init handle for a finished task might break some layered logic
63 		return Task(tfiber, tfiber.getTaskStatusFromOwnerThread().counter);
64 	}
66 	nothrow {
67 		package @property inout(TaskFiber) taskFiber() inout @system { return cast(inout(TaskFiber))m_fiber; }
68 		@property inout(Fiber) fiber() inout @system { return this.taskFiber; }
69 		@property size_t taskCounter() const @safe { return m_taskCounter; }
70 		@property inout(Thread) thread() inout @trusted { if (m_fiber) return this.taskFiber.thread; return null; }
72 		/** Determines if the task is still running or scheduled to be run.
73 		*/
74 		@property bool running()
75 		const @trusted {
76 			assert(m_fiber !is null, "Invalid task handle");
77 			auto tf = this.taskFiber;
78 			try if (tf.state == Fiber.State.TERM) return false; catch (Throwable) {}
79 			auto st = m_fiber.getTaskStatus();
80 			if (st.counter != m_taskCounter)
81 				return false;
82 			return st.initialized;
83 		}
85 		package @property ref ThreadInfo tidInfo() @system { return m_fiber ? taskFiber.tidInfo : s_tidInfo; } // FIXME: this is not thread safe!
86 		package @property ref const(ThreadInfo) tidInfo() const @system { return m_fiber ? taskFiber.tidInfo : s_tidInfo; } // FIXME: this is not thread safe!
88 		/** Gets the `Tid` associated with this task for use with
89 			`std.concurrency`.
90 		*/
91 		@property Tid tid() @trusted { return tidInfo.ident; }
92 		/// ditto
93 		@property const(Tid) tid() const @trusted { return tidInfo.ident; }
94 	}
96 	T opCast(T)() const @safe nothrow if (is(T == bool)) { return m_fiber !is null; }
98 	void join() @trusted { if (m_fiber) m_fiber.join!true(m_taskCounter); }
99 	void joinUninterruptible() @trusted nothrow { if (m_fiber) m_fiber.join!false(m_taskCounter); }
100 	void interrupt() @trusted nothrow { if (m_fiber && this.running) m_fiber.interrupt(m_taskCounter); }
102 	unittest { // regression test for bogus "task cannot interrupt itself"
103 		import vibe.core.core : runTask;
104 		auto t = runTask({});
105 		t.join();
106 		runTask({ t.interrupt(); }).join();
107 	}
109 	string toString() const @safe { import std.string; return format("%s:%s", () @trusted { return cast(void*)m_fiber; } (), m_taskCounter); }
111 	void getDebugID(R)(ref R dst)
112 	{
113 		import std.digest.md : MD5;
114 		import std.bitmanip : nativeToLittleEndian;
115 		import std.base64 : Base64;
117 		if (!m_fiber) {
118 			dst.put("----");
119 			return;
120 		}
122 		MD5 md;
123 		md.start();
124 		md.put(nativeToLittleEndian(() @trusted { return cast(size_t)cast(void*)m_fiber; } ()));
125 		md.put(nativeToLittleEndian(m_taskCounter));
126 		Base64.encode(md.finish()[0 .. 3], dst);
127 		if (!this.running) dst.put("-fin");
128 	}
129 	string getDebugID()
130 	@trusted {
131 		import std.array : appender;
132 		auto app = appender!string;
133 		getDebugID(app);
134 		return app.data;
135 	}
137 	// Remove me when `-preview=in` becomes the default
138 	static if (is(typeof(mixin(q{(in ref int a) => a}))))
139 		mixin(q{
140 			bool opEquals(in ref Task other) const @safe nothrow {
141 				return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter;
142 			}});
143 	bool opEquals(in Task other) const @safe nothrow {
144 		return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter;
145 	}
146 }
148 /** Settings to control the behavior of newly started tasks.
149 */
150 struct TaskSettings {
151 	/** Scheduling priority of the task
153 		The priority of a task is roughly proportional to the amount of
154 		times it gets scheduled in comparison to competing tasks. For
155 		example, a task with priority 100 will be scheduled every 10 rounds
156 		when competing against a task with priority 1000.
158 		The default priority is defined by `basePriority` and has a value
159 		of 65536. Priorities should be computed relative to `basePriority`.
161 		A task with a priority of zero will only be executed if no other
162 		non-zero task is competing.
163 	*/
164 	uint priority = Task.basePriority;
165 }
168 /**
169 	Implements a task local storage variable.
171 	Task local variables, similar to thread local variables, exist separately
172 	in each task. Consequently, they do not need any form of synchronization
173 	when accessing them.
175 	Note, however, that each TaskLocal variable will increase the memory footprint
176 	of any task that uses task local storage. There is also an overhead to access
177 	TaskLocal variables, higher than for thread local variables, but generelly
178 	still O(1) (since actual storage acquisition is done lazily the first access
179 	can require a memory allocation with unknown computational costs).
181 	Notice:
182 		FiberLocal instances MUST be declared as static/global thread-local
183 		variables. Defining them as a temporary/stack variable will cause
184 		crashes or data corruption!
186 	Examples:
187 		---
188 		TaskLocal!string s_myString = "world";
190 		void taskFunc()
191 		{
192 			assert(s_myString == "world");
193 			s_myString = "hello";
194 			assert(s_myString == "hello");
195 		}
197 		shared static this()
198 		{
199 			// both tasks will get independent storage for s_myString
200 			runTask(&taskFunc);
201 			runTask(&taskFunc);
202 		}
203 		---
204 */
205 struct TaskLocal(T)
206 {
207 	private {
208 		size_t m_offset = size_t.max;
209 		size_t m_id;
210 		T m_initValue;
211 		bool m_hasInitValue = false;
212 	}
214 	this(T init_val) { m_initValue = init_val; m_hasInitValue = true; }
216 	@disable this(this);
218 	void opAssign(T value) { this.storage = value; }
220 	@property ref T storage()
221 	@safe {
222 		import std.conv : emplace;
224 		auto fiber = TaskFiber.getThis();
226 		// lazily register in FLS storage
227 		if (m_offset == size_t.max) {
228 			static assert(T.alignof <= 8, "Unsupported alignment for type "~T.stringof);
229 			assert(TaskFiber.ms_flsFill % 8 == 0, "Misaligned fiber local storage pool.");
230 			m_offset = TaskFiber.ms_flsFill;
231 			m_id = TaskFiber.ms_flsCounter++;
234 			TaskFiber.ms_flsFill += T.sizeof;
235 			while (TaskFiber.ms_flsFill % 8 != 0)
236 				TaskFiber.ms_flsFill++;
237 		}
239 		// make sure the current fiber has enough FLS storage
240 		if (fiber.m_fls.length < TaskFiber.ms_flsFill) {
241 			fiber.m_fls.length = TaskFiber.ms_flsFill + 128;
242 			() @trusted { fiber.m_flsInit.length = TaskFiber.ms_flsCounter + 64; } ();
243 		}
245 		// return (possibly default initialized) value
246 		auto data = () @trusted { return fiber.m_fls.ptr[m_offset .. m_offset+T.sizeof]; } ();
247 		if (!() @trusted { return fiber.m_flsInit[m_id]; } ()) {
248 			() @trusted { fiber.m_flsInit[m_id] = true; } ();
249 			import std.traits : hasElaborateDestructor, hasAliasing;
250 			static if (hasElaborateDestructor!T || hasAliasing!T) {
251 				void function(void[], size_t) destructor = (void[] fls, size_t offset){
252 					static if (hasElaborateDestructor!T) {
253 						auto obj = cast(T*)&fls[offset];
254 						// call the destructor on the object if a custom one is known declared
255 						obj.destroy();
256 					}
257 					else static if (hasAliasing!T) {
258 						// zero the memory to avoid false pointers
259 						foreach (size_t i; offset .. offset + T.sizeof) {
260 							ubyte* u = cast(ubyte*)&fls[i];
261 							*u = 0;
262 						}
263 					}
264 				};
265 				FLSInfo fls_info;
266 				fls_info.fct = destructor;
267 				fls_info.offset = m_offset;
269 				// make sure flsInfo has enough space
270 				if (TaskFiber.ms_flsInfo.length <= m_id)
271 					TaskFiber.ms_flsInfo.length = m_id + 64;
273 				TaskFiber.ms_flsInfo[m_id] = fls_info;
274 			}
276 			if (m_hasInitValue) {
277 				static if (__traits(compiles, () @trusted { emplace!T(data, m_initValue); } ()))
278 					() @trusted { emplace!T(data, m_initValue); } ();
279 				else assert(false, "Cannot emplace initialization value for type "~T.stringof);
280 			} else () @trusted { emplace!T(data); } ();
281 		}
282 		return *() @trusted { return cast(T*)data.ptr; } ();
283 	}
285 	alias storage this;
286 }
289 /** Exception that is thrown by Task.interrupt.
290 */
291 class InterruptException : Exception {
292 	this()
293 	@safe nothrow {
294 		super("Task interrupted.");
295 	}
296 }
298 /**
299 	High level state change events for a Task
300 */
301 enum TaskEvent {
302 	preStart,  /// Just about to invoke the fiber which starts execution
303 	postStart, /// After the fiber has returned for the first time (by yield or exit)
304 	start,     /// Just about to start execution
305 	yield,     /// Temporarily paused
306 	resume,    /// Resumed from a prior yield
307 	end,       /// Ended normally
308 	fail       /// Ended with an exception
309 }
311 struct TaskCreationInfo {
312 	Task handle;
313 	const(void)* functionPointer;
314 }
316 alias TaskEventCallback = void function(TaskEvent, Task) nothrow;
317 alias TaskCreationCallback = void function(ref TaskCreationInfo) nothrow @safe;
319 /**
320 	The maximum combined size of all parameters passed to a task delegate
322 	See_Also: runTask
323 */
324 enum maxTaskParameterSize = 128;
327 /** The base class for a task aka Fiber.
329 	This class represents a single task that is executed concurrently
330 	with other tasks. Each task is owned by a single thread.
331 */
332 final package class TaskFiber : Fiber {
333 	import std.concurrency : Tid, thisTid;
335 	static if ((void*).sizeof >= 8) enum defaultTaskStackSize = 16*1024*1024;
336 	else enum defaultTaskStackSize = 512*1024;
338 	private enum Flags {
339 		running = 1UL << 0,
340 		initialized = 1UL << 1,
341 		interrupt = 1UL << 2,
343 		shiftAmount = 3,
344 		flagsMask = (1<<shiftAmount) - 1
345 	}
347 	private {
348 		import std.concurrency : ThreadInfo;
349 		import std.bitmanip : BitArray;
351 		// task queue management (TaskScheduler.m_taskQueue)
352 		TaskFiber m_prev, m_next;
353 		TaskFiberQueue* m_queue;
355 		Thread m_thread;
356 		ThreadInfo m_tidInfo;
357 		uint m_staticPriority, m_dynamicPriority;
358 		shared ulong m_taskCounterAndFlags = 0; // bits 0-Flags.shiftAmount are flags
360 		bool m_shutdown = false;
361 		shared(bool) m_tidUsed = false;
363 		shared(ManualEvent) m_onExit;
365 		// task local storage
366 		BitArray m_flsInit;
367 		void[] m_fls;
369 		package int m_yieldLockCount;
371 		static TaskFiber ms_globalDummyFiber;
372 		static FLSInfo[] ms_flsInfo;
373 		static size_t ms_flsFill = 0; // thread-local
374 		static size_t ms_flsCounter = 0;
375 	}
377 	package {
378 		TaskFuncInfo m_taskFunc;
379 		__gshared size_t ms_taskStackSize = defaultTaskStackSize;
380 		__gshared debug TaskEventCallback ms_taskEventCallback;
381 		__gshared debug TaskCreationCallback ms_taskCreationCallback;
383 		debug (VibeRunningTasks)
384 			static string[TaskFiber] s_runningTasks;
385 	}
387 	this()
388 	@trusted nothrow {
389 		super(&run, ms_taskStackSize);
390 		m_onExit = createSharedManualEvent();
391 		m_thread = Thread.getThis();
392 	}
394 	static TaskFiber getThis()
395 	@safe nothrow {
396 		auto f = () @trusted nothrow { return Fiber.getThis(); } ();
397 		if (auto tf = cast(TaskFiber)f) return tf;
398 		if (!ms_globalDummyFiber) ms_globalDummyFiber = new TaskFiber;
399 		return ms_globalDummyFiber;
400 	}
402 	// expose Fiber.state as @safe on older DMD versions
403 	static if (!__traits(compiles, () @safe { return Fiber.init.state; } ()))
404 		@property State state() @trusted const nothrow { return super.state; }
406 	private void run()
407 	nothrow {
408 		import std.algorithm.mutation : swap;
409 		import std.encoding : sanitize;
410 		import vibe.core.core : isEventLoopRunning, recycleFiber, taskScheduler, yield, yieldLock;
412 		version (VibeDebugCatchAll) alias UncaughtException = Throwable;
413 		else alias UncaughtException = Exception;
414 		try {
415 			// force creation of a message box/Tid
416 			try thisTid;
417 			catch (Exception e) assert(false, e.msg);
419 			while (true) {
420 				while (!m_taskFunc.func) {
421 					try {
422 						debug (VibeTaskLog) logTrace("putting fiber to sleep waiting for new task...");
423 						static if (__VERSION__ >= 2090) {
424 							import core.memory : GC;
425 							assert(!GC.inFinalizer, "Yiedling within finalizer - this would most likely result in a dead-lock!");
426 						}
427 						Fiber.yield();
428 					} catch (Exception e) {
429 						e.logException!(LogLevel.warn)(
430 							"CoreTaskFiber was resumed with exception but without active task");
431 					}
432 					if (m_shutdown) return;
433 				}
435 				debug assert(Thread.getThis() is m_thread, "Fiber moved between threads!?");
437 				TaskFuncInfo task;
438 				swap(task, m_taskFunc);
439 				m_dynamicPriority = m_staticPriority = task.settings.priority;
440 				Task handle = this.task;
441 				try {
442 					atomicOp!"|="(m_taskCounterAndFlags, Flags.running); // set running
443 					scope(exit) atomicOp!"&="(m_taskCounterAndFlags, ~Flags.flagsMask); // clear running/initialized
445 					debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.start, handle);
446 					if (!isEventLoopRunning) {
447 						debug (VibeTaskLog) logTrace("Event loop not running at task start - yielding.");
448 						taskScheduler.yieldUninterruptible();
449 						debug (VibeTaskLog) logTrace("Initial resume of task.");
450 					}
452 					debug (VibeRunningTasks) s_runningTasks[this] = "initial";
454 					task.call();
455 					debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.end, handle);
457 					debug if (() @trusted { return (cast(shared)this); } ().getTaskStatus().interrupt)
458 						logDebugV("Task exited while an interrupt was in flight.");
459 				} catch (Exception e) {
460 					debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.fail, handle);
461 					e.logException!(LogLevel.critical)("Task terminated with uncaught exception");
462 				}
464 				debug (VibeRunningTasks) s_runningTasks.remove(this);
466 				debug assert(Thread.getThis() is m_thread, "Fiber moved?");
468 				// re-create message box if it has been used to guarantee a
469 				// unique Tid per task
470 				if (atomicLoad(m_tidUsed)) {
471 					m_tidInfo.cleanup();
472 					m_tidInfo.ident = Tid.init;
473 					try thisTid;
474 					catch (Exception e) assert(false, e.msg);
475 					atomicStore(m_tidUsed, false);
476 				}
478 				debug (VibeTaskLog) logTrace("Notifying joining tasks.");
480 				// Issue #161: This fiber won't be resumed before the next task
481 				// is assigned, because it is already marked as de-initialized.
482 				// Since ManualEvent.emit() will need to switch tasks, this
483 				// would mean that only the first waiter is notified before
484 				// this fiber gets a new task assigned.
485 				// Using a yield lock forces all corresponding tasks to be
486 				// enqueued into the schedule queue and resumed in sequence
487 				// at the end of the scope.
488 				auto l = yieldLock();
490 				m_onExit.emit();
492 				// make sure that the task does not get left behind in the yielder queue if terminated during yield()
493 				if (m_queue) m_queue.remove(this);
495 				// zero the fls initialization ByteArray for memory safety
496 				foreach (size_t i, ref bool b; m_flsInit) {
497 					if (b) {
498 						if (ms_flsInfo !is null && ms_flsInfo.length >= i && ms_flsInfo[i] != FLSInfo.init)
499 							ms_flsInfo[i].destroy(m_fls);
500 						b = false;
501 					}
502 				}
504 				assert(!m_queue, "Fiber done but still scheduled to be resumed!?");
506 				debug assert(Thread.getThis() is m_thread, "Fiber moved between threads!?");
508 				// make the fiber available for the next task
509 				recycleFiber(this);
510 			}
511 		} catch (UncaughtException th) {
512 			th.logException("CoreTaskFiber was terminated unexpectedly");
513 		} catch (Throwable th) {
514 			import std.stdio : stderr, writeln;
515 			import core.stdc.stdlib : abort;
516 			try stderr.writeln(th);
517 			catch (Exception e) {
518 				try stderr.writeln(th.msg);
519 				catch (Exception e) {}
520 			}
521 			abort();
522 		}
523 	}
526 	/** Returns the thread that owns this task.
527 	*/
528 	@property inout(Thread) thread() inout @safe nothrow { return m_thread; }
530 	/** Returns the handle of the current Task running on this fiber.
531 	*/
532 	@property Task task()
533 	@safe nothrow {
534 		auto ts = getTaskStatusFromOwnerThread();
535 		if (!ts.initialized) return Task.init;
536 		return Task(this, ts.counter);
537 	}
539 	@property ref inout(ThreadInfo) tidInfo() inout @trusted nothrow { atomicStore((cast()this).m_tidUsed, true); return m_tidInfo; }
541 	/** Shuts down the task handler loop.
542 	*/
543 	void shutdown()
544 	@safe nothrow {
545 		debug assert(Thread.getThis() is m_thread);
547 		assert(!() @trusted { return cast(shared)this; } ().getTaskStatus().initialized);
549 		m_shutdown = true;
550 		while (state != Fiber.State.TERM)
551 			() @trusted {
552 				try call(Fiber.Rethrow.no);
553 				catch (Exception e) assert(false, e.msg);
554 			} ();
555 		}
557 	/** Blocks until the task has ended.
558 	*/
559 	void join(bool interruptiple)(size_t task_counter)
560 	shared @trusted {
561 		auto cnt = m_onExit.emitCount;
562 		while (true) {
563 			auto st = getTaskStatus();
564 			if (!st.initialized || st.counter != task_counter)
565 				break;
566 			static if (interruptiple)
567 				cnt = m_onExit.wait(cnt);
568 			else
569 				cnt = m_onExit.waitUninterruptible(cnt);
570 		}
571 	}
573 	/** Throws an InterruptExeption within the task as soon as it calls an interruptible function.
574 	*/
575 	void interrupt(size_t task_counter)
576 	shared @safe nothrow {
577 		import vibe.core.core : taskScheduler;
579 		auto caller = () @trusted { return cast(shared)TaskFiber.getThis(); } ();
581 		assert(caller !is this, "A task cannot interrupt itself.");
583 		while (true) {
584 			auto tcf = atomicLoad(m_taskCounterAndFlags);
585 			auto st = getTaskStatus(tcf);
586 			if (!st.initialized || st.interrupt || st.counter != task_counter)
587 				return;
588 			auto tcf_int = tcf | Flags.interrupt;
589 			if (cas(&m_taskCounterAndFlags, tcf, tcf_int))
590 				break;
591 		}
593 		if (caller.m_thread is m_thread) {
594 			auto thisus = () @trusted { return cast()this; } ();
595 			debug (VibeTaskLog) logTrace("Resuming task with interrupt flag.");
596 			auto defer = caller.m_yieldLockCount > 0;
597 			taskScheduler.switchTo(thisus.task, defer ? TaskSwitchPriority.prioritized : TaskSwitchPriority.immediate);
598 		} else {
599 			debug (VibeTaskLog) logTrace("Set interrupt flag on task without resuming.");
600 		}
601 	}
603 	/** Sets the fiber to initialized state and increments the task counter.
605 		Note that the task information needs to be set up first.
606 	*/
607 	void bumpTaskCounter()
608 	@safe nothrow {
609 		debug {
610 			auto ts = atomicLoad(m_taskCounterAndFlags);
611 			assert((ts & Flags.flagsMask) == 0, "bumpTaskCounter() called on fiber with non-zero flags");
612 			assert(m_taskFunc.func !is null, "bumpTaskCounter() called without initializing the task function");
613 		}
615 		() @trusted { atomicOp!"+="(m_taskCounterAndFlags, (1 << Flags.shiftAmount) + Flags.initialized); } ();
616 	}
618 	private auto getTaskStatus()
619 	shared const @safe nothrow {
620 		return getTaskStatus(atomicLoad(m_taskCounterAndFlags));
621 	}
623 	private auto getTaskStatusFromOwnerThread()
624 	const @safe nothrow {
625 		debug assert(Thread.getThis() is m_thread);
626 		return getTaskStatus(atomicLoad(m_taskCounterAndFlags));
627 	}
629 	private static auto getTaskStatus(ulong counter_and_flags)
630 	@safe nothrow {
631 		static struct S {
632 			size_t counter;
633 			bool running;
634 			bool initialized;
635 			bool interrupt;
636 		}
637 		S ret;
638 		ret.counter = cast(size_t)(counter_and_flags >> Flags.shiftAmount);
639 		ret.running = (counter_and_flags & Flags.running) != 0;
640 		ret.initialized = (counter_and_flags & Flags.initialized) != 0;
641 		ret.interrupt = (counter_and_flags & Flags.interrupt) != 0;
642 		return ret;
643 	}
645 	package void handleInterrupt(scope void delegate() @safe nothrow on_interrupt)
646 	@safe nothrow {
647 		assert(() @trusted { return Task.getThis().fiber; } () is this,
648 			"Handling interrupt outside of the corresponding fiber.");
649 		if (getTaskStatusFromOwnerThread().interrupt && on_interrupt) {
650 			debug (VibeTaskLog) logTrace("Handling interrupt flag.");
651 			clearInterruptFlag();
652 			on_interrupt();
653 		}
654 	}
656 	package void handleInterrupt()
657 	@safe {
658 		assert(() @trusted { return Task.getThis().fiber; } () is this,
659 			"Handling interrupt outside of the corresponding fiber.");
660 		if (getTaskStatusFromOwnerThread().interrupt) {
661 			clearInterruptFlag();
662 			throw new InterruptException;
663 		}
664 	}
666 	private void clearInterruptFlag()
667 	@safe nothrow {
668 		auto tcf = atomicLoad(m_taskCounterAndFlags);
669 		auto st = getTaskStatus(tcf);
670 		while (true) {
671 			assert(st.initialized);
672 			if (!st.interrupt) break;
673 			auto tcf_int = tcf & ~Flags.interrupt;
674 			if (cas(&m_taskCounterAndFlags, tcf, tcf_int))
675 				break;
676 		}
677 	}
678 }
681 /** Controls the priority to use for switching execution to a task.
682 */
683 enum TaskSwitchPriority {
684 	/** Rescheduled according to the tasks priority
685 	*/
686 	normal,
688 	/** Rescheduled with maximum priority.
690 		The task will resume as soon as the current task yields.
691 	*/
692 	prioritized,
694 	/** Switch to the task immediately.
695 	*/
696 	immediate
697 }
699 package struct TaskFuncInfo {
700 	void function(ref TaskFuncInfo) func;
701 	void[2*size_t.sizeof] callable;
702 	void[maxTaskParameterSize] args;
703 	debug ulong functionPointer;
704 	TaskSettings settings;
706 	void set(CALLABLE, ARGS...)(ref CALLABLE callable, ref ARGS args)
707 	{
708 		assert(!func, "Setting TaskFuncInfo that is already set.");
710 		import std.algorithm : move;
711 		import std.traits : hasElaborateAssign;
712 		import std.conv : to;
714 		static struct TARGS { ARGS expand; }
716 		static assert(CALLABLE.sizeof <= TaskFuncInfo.callable.length,
717 			"Storage required for task callable is too large ("~CALLABLE.sizeof~" vs max "~callable.length~"): "~CALLABLE.stringof);
718 		static assert(TARGS.sizeof <= maxTaskParameterSize,
719 			"The arguments passed to run(Worker)Task must not exceed "~
720 			maxTaskParameterSize.to!string~" bytes in total size: "~TARGS.sizeof.stringof~" bytes");
722 		debug functionPointer = callPointer(callable);
724 		static void callDelegate(ref TaskFuncInfo tfi) {
725 			assert(tfi.func is &callDelegate, "Wrong callDelegate called!?");
727 			// copy original call data to stack
728 			CALLABLE c;
729 			TARGS args;
730 			move(*(cast(CALLABLE*)tfi.callable.ptr), c);
731 			move(*(cast(TARGS*)tfi.args.ptr), args);
733 			// reset the info
734 			tfi.func = null;
736 			// make the call
737 			mixin(callWithMove!ARGS("c", "args.expand"));
738 		}
740 		func = &callDelegate;
742 		() @trusted {
743 			static if (hasElaborateAssign!CALLABLE) initCallable!CALLABLE();
744 			static if (hasElaborateAssign!TARGS) initArgs!TARGS();
745 			typedCallable!CALLABLE = callable;
746 			foreach (i, A; ARGS) {
747 				static if (needsMove!A) args[i].move(typedArgs!TARGS.expand[i]);
748 				else typedArgs!TARGS.expand[i] = args[i];
749 			}
750 		} ();
751 	}
753 	void call()
754 	{
755 		this.func(this);
756 	}
758 	@property ref C typedCallable(C)()
759 	{
760 		static assert(C.sizeof <= callable.sizeof);
761 		return *cast(C*)callable.ptr;
762 	}
764 	@property ref A typedArgs(A)()
765 	{
766 		static assert(A.sizeof <= args.sizeof);
767 		return *cast(A*)args.ptr;
768 	}
770 	void initCallable(C)()
771 	nothrow {
772 		static const C cinit;
773 		this.callable[0 .. C.sizeof] = cast(void[])(&cinit)[0 .. 1];
774 	}
776 	void initArgs(A)()
777 	nothrow {
778 		static const A ainit;
779 		this.args[0 .. A.sizeof] = cast(void[])(&ainit)[0 .. 1];
780 	}
781 }
783 private ulong callPointer(C)(ref C callable)
784 @trusted nothrow @nogc {
785 	alias IP = ulong;
786 	static if (is(C == function)) return cast(IP)cast(void*)callable;
787 	else static if (is(C == delegate)) return cast(IP)callable.funcptr;
788 	else static if (is(typeof(&callable.opCall) == function)) return cast(IP)cast(void*)&callable.opCall;
789 	else static if (is(typeof(&callable.opCall) == delegate)) return cast(IP)(&callable.opCall).funcptr;
790 	else return cast(IP)&callable;
791 }
793 package struct TaskScheduler {
794 	import eventcore.driver : ExitReason;
795 	import eventcore.core : eventDriver;
797 	private {
798 		TaskFiberQueue m_taskQueue;
799 	}
801 	@safe:
803 	@disable this(this);
805 	@property size_t scheduledTaskCount() const nothrow { return m_taskQueue.length; }
807 	/** Lets other pending tasks execute before continuing execution.
809 		This will give other tasks or events a chance to be processed. If
810 		multiple tasks call this function, they will be processed in a
811 		fírst-in-first-out manner.
812 	*/
813 	void yield()
814 	{
815 		auto t = Task.getThis();
816 		if (t == Task.init) return; // not really a task -> no-op
817 		auto tf = () @trusted { return t.taskFiber; } ();
818 		debug (VibeTaskLog) logTrace("Yielding (interrupt=%s)", () @trusted { return (cast(shared)tf).getTaskStatus().interrupt; } ());
819 		tf.handleInterrupt();
820 		if (tf.m_queue !is null) return; // already scheduled to be resumed
821 		doYieldAndReschedule(t);
822 		tf.handleInterrupt();
823 	}
825 	nothrow:
827 	/** Performs a single round of scheduling without blocking.
829 		This will execute scheduled tasks and process events from the
830 		event queue, as long as possible without having to wait.
832 		Returns:
833 			A reason is returned:
834 			$(UL
835 				$(LI `ExitReason.exit`: The event loop was exited due to a manual request)
836 				$(LI `ExitReason.outOfWaiters`: There are no more scheduled
837 					tasks or events, so the application would do nothing from
838 					now on)
839 				$(LI `ExitReason.idle`: All scheduled tasks and pending events
840 					have been processed normally)
841 				$(LI `ExitReason.timeout`: Scheduled tasks have been processed,
842 					but there were no pending events present.)
843 			)
844 	*/
845 	ExitReason process()
846 	{
847 		assert(TaskFiber.getThis().m_yieldLockCount == 0, "May not process events within an active yieldLock()!");
849 		bool any_events = false;
850 		while (true) {
851 			debug (VibeTaskLog) logTrace("Scheduling before peeking new events...");
852 			// process pending tasks
853 			bool any_tasks_processed = schedule() != ScheduleStatus.idle;
855 			debug (VibeTaskLog) logTrace("Processing pending events...");
856 			ExitReason er = eventDriver.core.processEvents(0.seconds);
857 			debug (VibeTaskLog) logTrace("Done: %s", er);
859 			final switch (er) {
860 				case ExitReason.exited: return ExitReason.exited;
861 				case ExitReason.outOfWaiters:
862 					if (!scheduledTaskCount)
863 						return ExitReason.outOfWaiters;
864 					break;
865 				case ExitReason.timeout:
866 					if (!scheduledTaskCount)
867 						return any_events || any_tasks_processed ? ExitReason.idle : ExitReason.timeout;
868 					break;
869 				case ExitReason.idle:
870 					any_events = true;
871 					if (!scheduledTaskCount)
872 						return ExitReason.idle;
873 					break;
874 			}
875 		}
876 	}
878 	/** Performs a single round of scheduling, blocking if necessary.
880 		Returns:
881 			A reason is returned:
882 			$(UL
883 				$(LI `ExitReason.exit`: The event loop was exited due to a manual request)
884 				$(LI `ExitReason.outOfWaiters`: There are no more scheduled
885 					tasks or events, so the application would do nothing from
886 					now on)
887 				$(LI `ExitReason.idle`: All scheduled tasks and pending events
888 					have been processed normally)
889 			)
890 	*/
891 	ExitReason waitAndProcess()
892 	{
893 		// first, process tasks without blocking
894 		auto er = process();
896 		final switch (er) {
897 			case ExitReason.exited, ExitReason.outOfWaiters: return er;
898 			case ExitReason.idle: return ExitReason.idle;
899 			case ExitReason.timeout: break;
900 		}
902 		// if the first run didn't process any events, block and
903 		// process one chunk
904 		debug (VibeTaskLog) logTrace("Wait for new events to process...");
905 		er = eventDriver.core.processEvents(Duration.max);
906 		debug (VibeTaskLog) logTrace("Done: %s", er);
907 		final switch (er) {
908 			case ExitReason.exited: return ExitReason.exited;
909 			case ExitReason.outOfWaiters:
910 				if (!scheduledTaskCount)
911 					return ExitReason.outOfWaiters;
912 				break;
913 			case ExitReason.timeout: assert(false, "Unexpected return code");
914 			case ExitReason.idle: break;
915 		}
917 		// finally, make sure that all scheduled tasks are run
918 		er = process();
919 		if (er == ExitReason.timeout) return ExitReason.idle;
920 		else return er;
921 	}
923 	void yieldUninterruptible()
924 	{
925 		auto t = Task.getThis();
926 		if (t == Task.init) return; // not really a task -> no-op
927 		auto tf = () @trusted { return t.taskFiber; } ();
928 		if (tf.m_queue !is null) return; // already scheduled to be resumed
929 		doYieldAndReschedule(t);
930 	}
932 	/** Holds execution until the task gets explicitly resumed.
933 	*/
934 	void hibernate()
935 	{
936 		import vibe.core.core : isEventLoopRunning;
937 		auto thist = Task.getThis();
938 		if (thist == Task.init) {
939 			assert(!isEventLoopRunning, "Event processing outside of a fiber should only happen before the event loop is running!?");
940 			static import vibe.core.core;
941 			vibe.core.core.runEventLoopOnce();
942 		} else {
943 			doYield(thist);
944 		}
945 	}
947 	/** Immediately switches execution to the specified task without giving up execution privilege.
949 		This forces immediate execution of the specified task. After the tasks finishes or yields,
950 		the calling task will continue execution.
951 	*/
952 	void switchTo(Task t, TaskSwitchPriority priority)
953 	{
954 		auto thist = Task.getThis();
956 		if (t == thist) return;
958 		auto thisthr = thist ? thist.thread : () @trusted { return Thread.getThis(); } ();
959 		assert(t.thread is thisthr, "Cannot switch to a task that lives in a different thread.");
961 		auto tf = () @trusted { return t.taskFiber; } ();
962 		if (tf.m_queue) {
963 			// don't reset the position of already scheduled tasks
964 			if (priority == TaskSwitchPriority.normal) return;
966 			debug (VibeTaskLog) logTrace("Task to switch to is already scheduled. Moving to front of queue.");
967 			assert(tf.m_queue is &m_taskQueue, "Task is already enqueued, but not in the main task queue.");
968 			m_taskQueue.remove(tf);
969 			assert(!tf.m_queue, "Task removed from queue, but still has one set!?");
970 		}
972 		if (thist == Task.init && priority == TaskSwitchPriority.immediate) {
973 			assert(TaskFiber.getThis().m_yieldLockCount == 0, "Cannot yield within an active yieldLock()!");
974 			debug (VibeTaskLog) logTrace("switch to task from global context");
975 			resumeTask(t);
976 			debug (VibeTaskLog) logTrace("task yielded control back to global context");
977 		} else {
978 			auto thistf = () @trusted { return thist.taskFiber; } ();
979 			assert(!thistf || !thistf.m_queue, "Calling task is running, but scheduled to be resumed!?");
981 			debug (VibeTaskLog) logDebugV("Switching tasks (%s already in queue, prio=%s)", m_taskQueue.length, priority);
982 			final switch (priority) {
983 				case TaskSwitchPriority.normal:
984 					reschedule(tf);
985 					break;
986 				case TaskSwitchPriority.prioritized:
987 					tf.m_dynamicPriority = uint.max;
988 					reschedule(tf);
989 					break;
990 				case TaskSwitchPriority.immediate:
991 					tf.m_dynamicPriority = uint.max;
992 					m_taskQueue.insertFront(thistf);
993 					m_taskQueue.insertFront(tf);
994 					doYield(thist);
995 					break;
996 			}
997 		}
998 	}
1000 	/** Runs any pending tasks.
1002 		A pending tasks is a task that is scheduled to be resumed by either `yield` or
1003 		`switchTo`.
1005 		Returns:
1006 			Returns `true` $(I iff) there are more tasks left to process.
1007 	*/
1008 	ScheduleStatus schedule()
1009 	nothrow {
1010 		if (m_taskQueue.empty)
1011 			return ScheduleStatus.idle;
1014 		assert(Task.getThis() == Task.init, "TaskScheduler.schedule() may not be called from a task!");
1016 		if (m_taskQueue.empty) return ScheduleStatus.idle;
1018 		foreach (i; 0 .. m_taskQueue.length) {
1019 			auto t = m_taskQueue.front;
1020 			m_taskQueue.popFront();
1022 			// reset priority
1023 			t.m_dynamicPriority = t.m_staticPriority;
1025 			debug (VibeTaskLog) logTrace("resuming task");
1026 			auto task = t.task;
1027 			if (task != Task.init)
1028 				resumeTask(t.task);
1029 			debug (VibeTaskLog) logTrace("task out");
1031 			if (m_taskQueue.empty) break;
1032 		}
1034 		debug (VibeTaskLog) logDebugV("schedule finished - %s tasks left in queue", m_taskQueue.length);
1036 		return m_taskQueue.empty ? ScheduleStatus.allProcessed : ScheduleStatus.busy;
1037 	}
1039 	/// Resumes execution of a yielded task.
1040 	private void resumeTask(Task t)
1041 	nothrow {
1042 		import std.encoding : sanitize;
1044 		assert(t != Task.init, "Resuming null task");
1046 		debug (VibeTaskLog) logTrace("task fiber resume");
1047 		auto uncaught_exception = () @trusted nothrow { return t.fiber.call!(Fiber.Rethrow.no)(); } ();
1048 		debug (VibeTaskLog) logTrace("task fiber yielded");
1050 		if (uncaught_exception) {
1051 			auto th = cast(Throwable)uncaught_exception;
1052 			assert(th, "Fiber returned exception object that is not a Throwable!?");
1054 			assert(() @trusted nothrow { return t.fiber.state; } () == Fiber.State.TERM);
1055 			th.logException("Task terminated with unhandled exception");
1057 			// always pass Errors on
1058 			if (auto err = cast(Error)th) throw err;
1059 		}
1060 	}
1062 	private void reschedule(TaskFiber tf)
1063 	{
1064 		import std.algorithm.comparison : min;
1066 		// insert according to priority, limited to a priority
1067 		// factor of 1:10 in case of heavy concurrency
1068 		m_taskQueue.insertBackPred(tf, 10, (t) {
1069 			if (t.m_dynamicPriority >= tf.m_dynamicPriority)
1070 				return true;
1072 			// increase dynamic priority each time a task gets overtaken to
1073 			// ensure a fair schedule
1074 			t.m_dynamicPriority += min(t.m_staticPriority, uint.max - t.m_dynamicPriority);
1075 			return false;
1076 		});
1077 	}
1079 	private void doYieldAndReschedule(Task task)
1080 	{
1081 		auto tf = () @trusted { return task.taskFiber; } ();
1083 		reschedule(tf);
1085 		doYield(task);
1086 	}
1088 	private void doYield(Task task)
1089 	{
1090 		debug (VibeRunningTasks) () nothrow {
1091 			try throw new Exception("");
1092 			catch (Exception e) {
1093 				try TaskFiber.s_runningTasks[task.taskFiber] = e.info.toString;
1094 				catch (Exception e) TaskFiber.s_runningTasks[task.taskFiber] = "(failed to get stack)";
1095 			}
1096 		} ();
1098 		assert(() @trusted { return task.taskFiber; } ().m_yieldLockCount == 0, "May not yield while in an active yieldLock()!");
1099 		debug if (TaskFiber.ms_taskEventCallback) () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.yield, task); } ();
1100 		static if (__VERSION__ >= 2090) {
1101 			import core.memory : GC;
1102 			assert(!GC.inFinalizer, "Yiedling within finalizer - this would most likely result in a dead-lock!");
1103 		}
1104 		() @trusted { Fiber.yield(); } ();
1105 		debug if (TaskFiber.ms_taskEventCallback) () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.resume, task); } ();
1106 		assert(!task.m_fiber.m_queue, "Task is still scheduled after resumption.");
1107 	}
1108 }
1110 package enum ScheduleStatus {
1111 	idle,
1112 	allProcessed,
1113 	busy
1114 }
1116 private struct TaskFiberQueue {
1117 	@safe nothrow:
1119 	TaskFiber first, last;
1120 	size_t length;
1122 	@disable this(this);
1124 	@property bool empty() const { return first is null; }
1126 	@property TaskFiber front() { return first; }
1128 	void insertFront(TaskFiber task) @trusted
1129 	{
1130 		assert(task.m_queue is null, "Task is already scheduled to be resumed!");
1131 		assert(task.m_prev is null, "Task has m_prev set without being in a queue!?");
1132 		assert(task.m_next is null, "Task has m_next set without being in a queue!?");
1133 		task.m_queue = &this;
1134 		if (empty) {
1135 			first = task;
1136 			last = task;
1137 		} else {
1138 			first.m_prev = task;
1139 			task.m_next = first;
1140 			first = task;
1141 		}
1142 		length++;
1143 	}
1145 	void insertBack(TaskFiber task) @trusted
1146 	{
1147 		assert(task.m_queue is null, "Task is already scheduled to be resumed!");
1148 		assert(task.m_prev is null, "Task has m_prev set without being in a queue!?");
1149 		assert(task.m_next is null, "Task has m_next set without being in a queue!?");
1150 		task.m_queue = &this;
1151 		if (empty) {
1152 			first = task;
1153 			last = task;
1154 		} else {
1155 			last.m_next = task;
1156 			task.m_prev = last;
1157 			last = task;
1158 		}
1159 		length++;
1160 	}
1162 	// inserts a task after the first task for which the predicate yields `true`,
1163 	// starting from the back. a maximum of max_skip tasks will be skipped
1164 	// before the task is inserted regardless of the predicate.
1165 	void insertBackPred(TaskFiber task, size_t max_skip,
1166 		scope bool delegate(TaskFiber) @safe nothrow pred) @trusted
1167 	{
1168 		assert(task.m_queue is null, "Task is already scheduled to be resumed!");
1169 		assert(task.m_prev is null, "Task has m_prev set without being in a queue!?");
1170 		assert(task.m_next is null, "Task has m_next set without being in a queue!?");
1172 		for (auto t = last; t; t = t.m_prev) {
1173 			if (!max_skip-- || pred(t)) {
1174 				task.m_queue = &this;
1175 				task.m_next = t.m_next;
1176 				if (task.m_next) task.m_next.m_prev = task;
1177 				t.m_next = task;
1178 				task.m_prev = t;
1179 				if (!task.m_next) last = task;
1180 				length++;
1181 				return;
1182 			}
1183 		}
1185 		insertFront(task);
1186 	}
1188 	void popFront()
1189 	{
1190 		if (first is last) last = null;
1191 		assert(first && first.m_queue == &this, "Popping from empty or mismatching queue");
1192 		auto next = first.m_next;
1193 		if (next) next.m_prev = null;
1194 		first.m_next = null;
1195 		first.m_queue = null;
1196 		first = next;
1197 		length--;
1198 	}
1200 	void remove(TaskFiber task)
1201 	{
1202 		assert(task.m_queue is &this, "Task is not contained in task queue.");
1203 		if (task.m_prev) task.m_prev.m_next = task.m_next;
1204 		else first = task.m_next;
1205 		if (task.m_next) task.m_next.m_prev = task.m_prev;
1206 		else last = task.m_prev;
1207 		task.m_queue = null;
1208 		task.m_prev = null;
1209 		task.m_next = null;
1210 		length--;
1211 	}
1212 }
1214 unittest {
1215 	auto f1 = new TaskFiber;
1216 	auto f2 = new TaskFiber;
1218 	TaskFiberQueue q;
1219 	assert(q.empty && q.length == 0);
1220 	q.insertFront(f1);
1221 	assert(!q.empty && q.length == 1);
1222 	q.insertFront(f2);
1223 	assert(!q.empty && q.length == 2);
1224 	q.popFront();
1225 	assert(!q.empty && q.length == 1);
1226 	q.popFront();
1227 	assert(q.empty && q.length == 0);
1228 	q.insertFront(f1);
1229 	q.remove(f1);
1230 	assert(q.empty && q.length == 0);
1231 }
1233 unittest {
1234 	auto f1 = new TaskFiber;
1235 	auto f2 = new TaskFiber;
1236 	auto f3 = new TaskFiber;
1237 	auto f4 = new TaskFiber;
1238 	auto f5 = new TaskFiber;
1239 	auto f6 = new TaskFiber;
1240 	TaskFiberQueue q;
1242 	void checkQueue()
1243 	{
1244 		TaskFiber p;
1245 		for (auto t = q.front; t; t = t.m_next) {
1246 			assert(t.m_prev is p);
1247 			assert(t.m_next || t is q.last);
1248 			p = t;
1249 		}
1251 		TaskFiber n;
1252 		for (auto t = q.last; t; t = t.m_prev) {
1253 			assert(t.m_next is n);
1254 			assert(t.m_prev || t is q.first);
1255 			n = t;
1256 		}
1257 	}
1259 	q.insertBackPred(f1, 0, delegate bool(tf) { assert(false); });
1260 	assert(q.first is f1 && q.last is f1);
1261 	checkQueue();
1263 	q.insertBackPred(f2, 0, delegate bool(tf) { assert(false); });
1264 	assert(q.first is f1 && q.last is f2);
1265 	checkQueue();
1267 	q.insertBackPred(f3, 1, (tf) => false);
1268 	assert(q.first is f1 && q.last is f2);
1269 	assert(f1.m_next is f3);
1270 	assert(f3.m_prev is f1);
1271 	checkQueue();
1273 	q.insertBackPred(f4, 10, (tf) => false);
1274 	assert(q.first is f4 && q.last is f2);
1275 	checkQueue();
1277 	q.insertBackPred(f5, 10, (tf) => true);
1278 	assert(q.first is f4 && q.last is f5);
1279 	checkQueue();
1281 	q.insertBackPred(f6, 10, (tf) => tf is f4);
1282 	assert(q.first is f4 && q.last is f5);
1283 	assert(f4.m_next is f6);
1284 	checkQueue();
1285 }
1287 private struct FLSInfo {
1288 	void function(void[], size_t) fct;
1289 	size_t offset;
1290 	void destroy(void[] fls) {
1291 		fct(fls, offset);
1292 	}
1293 }
1295 // mixin string helper to call a function with arguments that potentially have
1296 // to be moved
1297 package string callWithMove(ARGS...)(string func, string args)
1298 {
1299 	import std.string;
1300 	string ret = func ~ "(";
1301 	foreach (i, T; ARGS) {
1302 		if (i > 0) ret ~= ", ";
1303 		ret ~= format("%s[%s]", args, i);
1304 		static if (needsMove!T) ret ~= ".move";
1305 	}
1306 	return ret ~ ");";
1307 }
1309 private template needsMove(T)
1310 {
1311 	template isCopyable(T)
1312 	{
1313 		enum isCopyable = __traits(compiles, (T a) { return a; });
1314 	}
1316 	template isMoveable(T)
1317 	{
1318 		enum isMoveable = __traits(compiles, (T a) { return a.move; });
1319 	}
1321 	enum needsMove = !isCopyable!T;
1323 	static assert(isCopyable!T || isMoveable!T,
1324 				  "Non-copyable type "~T.stringof~" must be movable with a .move property.");
1325 }
1327 unittest {
1328 	enum E { a, move }
1329 	static struct S {
1330 		@disable this(this);
1331 		@property S move() { return S.init; }
1332 	}
1333 	static struct T { @property T move() { return T.init; } }
1334 	static struct U { }
1335 	static struct V {
1336 		@disable this();
1337 		@disable this(this);
1338 		@property V move() { return V.init; }
1339 	}
1340 	static struct W { @disable this(); }
1342 	static assert(needsMove!S);
1343 	static assert(!needsMove!int);
1344 	static assert(!needsMove!string);
1345 	static assert(!needsMove!E);
1346 	static assert(!needsMove!T);
1347 	static assert(!needsMove!U);
1348 	static assert(needsMove!V);
1349 	static assert(!needsMove!W);
1350 }