1 /**
2 	Event loop compatible task synchronization facilities.
3 
4 	This module provides replacement primitives for the modules in `core.sync`
5 	that do not block vibe.d's event loop in their wait states. These should
6 	always be preferred over the ones in Druntime under usual circumstances.
7 
8 	Using a standard `Mutex` is possible as long as it is ensured that no event
9 	loop based functionality (I/O, task interaction or anything that implicitly
10 	calls `vibe.core.core.yield`) is executed within a section of code that is
11 	protected by the mutex. $(B Failure to do so may result in dead-locks and
12 	high-level race-conditions!)
13 
14 	Copyright: © 2012-2019 Sönke Ludwig
15 	Authors: Leonid Kramer, Sönke Ludwig, Manuel Frischknecht
16 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
17 */
18 module vibe.core.sync;
19 
20 import vibe.core.log : logDebugV, logTrace, logInfo;
21 import vibe.core.task;
22 
23 import core.atomic;
24 import core.sync.mutex;
25 import core.sync.condition;
26 import eventcore.core;
27 import std.exception;
28 import std.stdio;
29 import std.traits : ReturnType;
30 
31 
32 /** Creates a new signal that can be shared between fibers.
33 */
34 LocalManualEvent createManualEvent()
35 @safe nothrow {
36 	LocalManualEvent ret;
37 	ret.initialize();
38 	return ret;
39 }
40 /// ditto
41 shared(ManualEvent) createSharedManualEvent()
42 @trusted nothrow {
43 	shared(ManualEvent) ret;
44 	ret.initialize();
45 	return ret;
46 }
47 
48 
49 /** Performs RAII based locking/unlocking of a mutex.
50 
51 	Note that while `TaskMutex` can be used with D's built-in `synchronized`
52 	statement, `InterruptibleTaskMutex` cannot. This function provides a
53 	library based alternative that is suitable for use with all mutex types.
54 */
55 ScopedMutexLock!M scopedMutexLock(M)(M mutex, LockMode mode = LockMode.lock)
56 	if (is(M : Mutex) || is(M : Lockable))
57 {
58 	return ScopedMutexLock!M(mutex, mode);
59 }
60 
61 ///
62 unittest {
63 	import vibe.core.core : runWorkerTaskH;
64 
65 	__gshared int counter;
66 	__gshared InterruptibleTaskMutex mutex;
67 
68 	mutex = new InterruptibleTaskMutex;
69 
70 	Task[] tasks;
71 
72 	foreach (i; 0 .. 100) {
73 		tasks ~= runWorkerTaskH({
74 			auto l = scopedMutexLock(mutex);
75 			counter++;
76 		});
77 	}
78 
79 	foreach (t; tasks) t.join();
80 
81 	assert(counter == 100);
82 }
83 
84 unittest {
85 	scopedMutexLock(new Mutex);
86 	scopedMutexLock(new TaskMutex);
87 	scopedMutexLock(new InterruptibleTaskMutex);
88 }
89 
90 enum LockMode {
91 	lock,
92 	tryLock,
93 	defer
94 }
95 
96 interface Lockable {
97 	@safe:
98 	void lock();
99 	void unlock();
100 	bool tryLock();
101 }
102 
103 /** RAII lock for the Mutex class.
104 */
105 struct ScopedMutexLock(M)
106 	if (is(M : Mutex) || is(M : Lockable))
107 {
108 	@disable this(this);
109 	private {
110 		M m_mutex;
111 		bool m_locked;
112 		LockMode m_mode;
113 	}
114 
115 	this(M mutex, LockMode mode = LockMode.lock) {
116 		assert(mutex !is null);
117 		m_mutex = mutex;
118 
119 		final switch (mode) {
120 			case LockMode.lock: lock(); break;
121 			case LockMode.tryLock: tryLock(); break;
122 			case LockMode.defer: break;
123 		}
124 	}
125 
126 	~this()
127 	{
128 		if( m_locked )
129 			m_mutex.unlock();
130 	}
131 
132 	@property bool locked() const { return m_locked; }
133 
134 	void unlock()
135 	{
136 		enforce(m_locked);
137 		m_mutex.unlock();
138 		m_locked = false;
139 	}
140 
141 	bool tryLock()
142 	{
143 		enforce(!m_locked);
144 		return m_locked = m_mutex.tryLock();
145 	}
146 
147 	void lock()
148 	{
149 		enforce(!m_locked);
150 		m_locked = true;
151 		m_mutex.lock();
152 	}
153 }
154 
155 
156 /*
157 	Only for internal use:
158 	Ensures that a mutex is locked while executing the given procedure.
159 
160 	This function works for all kinds of mutexes, in particular for
161 	$(D core.sync.mutex.Mutex), $(D TaskMutex) and $(D InterruptibleTaskMutex).
162 
163 	Returns:
164 		Returns the value returned from $(D PROC), if any.
165 */
166 /// private
167 ReturnType!PROC performLocked(alias PROC, MUTEX)(MUTEX mutex)
168 {
169 	mutex.lock();
170 	scope (exit) mutex.unlock();
171 	return PROC();
172 }
173 
174 ///
175 unittest {
176 	int protected_var = 0;
177 	auto mtx = new TaskMutex;
178 	mtx.performLocked!({
179 		protected_var++;
180 	});
181 }
182 
183 
184 /**
185 	Thread-local semaphore implementation for tasks.
186 
187 	When the semaphore runs out of concurrent locks, it will suspend. This class
188 	is used in `vibe.core.connectionpool` to limit the number of concurrent
189 	connections.
190 */
191 final class LocalTaskSemaphore
192 {
193 @safe:
194 
195 	// requires a queue
196 	import std.container.binaryheap;
197 	import std.container.array;
198 	//import vibe.utils.memory;
199 
200 	private {
201 		static struct ThreadWaiter {
202 			ubyte priority;
203 			uint seq;
204 		}
205 
206 		BinaryHeap!(Array!ThreadWaiter, asc) m_waiters;
207 		uint m_maxLocks;
208 		uint m_locks;
209 		uint m_seq;
210 		LocalManualEvent m_signal;
211 	}
212 
213 	this(uint max_locks)
214 	{
215 		m_maxLocks = max_locks;
216 		m_signal = createManualEvent();
217 	}
218 
219 	/// Maximum number of concurrent locks
220 	@property void maxLocks(uint max_locks) { m_maxLocks = max_locks; }
221 	/// ditto
222 	@property uint maxLocks() const { return m_maxLocks; }
223 
224 	/// Number of concurrent locks still available
225 	@property uint available() const { return m_maxLocks - m_locks; }
226 
227 	/** Try to acquire a lock.
228 
229 		If a lock cannot be acquired immediately, returns `false` and leaves the
230 		semaphore in its previous state.
231 
232 		Returns:
233 			`true` is returned $(I iff) the number of available locks is greater
234 			than one.
235 	*/
236 	bool tryLock()
237 	{
238 		if (available > 0)
239 		{
240 			m_locks++;
241 			return true;
242 		}
243 		return false;
244 	}
245 
246 	/** Acquires a lock.
247 
248 		Once the limit of concurrent locks is reached, this method will block
249 		until the number of locks drops below the limit.
250 	*/
251 	void lock(ubyte priority = 0)
252 	{
253 		import std.algorithm.comparison : min;
254 
255 		if (tryLock())
256 			return;
257 
258 		ThreadWaiter w;
259 		w.priority = priority;
260 		w.seq = min(0, m_seq - w.priority);
261 		if (++m_seq == uint.max)
262 			rewindSeq();
263 
264 		() @trusted { m_waiters.insert(w); } ();
265 
266 		while (true) {
267 			m_signal.waitUninterruptible();
268 			if (m_waiters.front.seq == w.seq && tryLock()) {
269 				return;
270 			}
271 		}
272 	}
273 
274 	/** Gives up an existing lock.
275 	*/
276 	void unlock()
277 	{
278 		assert(m_locks >= 1);
279 		m_locks--;
280 		if (m_waiters.length > 0)
281 			m_signal.emit(); // resume one
282 	}
283 
284 	// if true, a goes after b. ie. b comes out front()
285 	/// private
286 	static bool asc(ref ThreadWaiter a, ref ThreadWaiter b)
287 	{
288 		if (a.priority != b.priority)
289 			return a.priority < b.priority;
290 		return a.seq > b.seq;
291 	}
292 
293 	private void rewindSeq()
294 	@trusted {
295 		Array!ThreadWaiter waiters = m_waiters.release();
296 		ushort min_seq;
297 		import std.algorithm : min;
298 		foreach (ref waiter; waiters[])
299 			min_seq = min(waiter.seq, min_seq);
300 		foreach (ref waiter; waiters[])
301 			waiter.seq -= min_seq;
302 		m_waiters.assume(waiters);
303 	}
304 }
305 
306 
307 /**
308 	Mutex implementation for fibers.
309 
310 	This mutex type can be used in exchange for a core.sync.mutex.Mutex, but
311 	does not block the event loop when contention happens. Note that this
312 	mutex does not allow recursive locking.
313 
314 	Notice:
315 		Because this class is annotated nothrow, it cannot be interrupted
316 		using $(D vibe.core.task.Task.interrupt()). The corresponding
317 		$(D InterruptException) will be deferred until the next blocking
318 		operation yields the event loop.
319 
320 		Use $(D InterruptibleTaskMutex) as an alternative that can be
321 		interrupted.
322 
323 	See_Also: InterruptibleTaskMutex, RecursiveTaskMutex, core.sync.mutex.Mutex
324 */
325 final class TaskMutex : core.sync.mutex.Mutex, Lockable {
326 @safe:
327 	private TaskMutexImpl!false m_impl;
328 
329 	this(Object o) { m_impl.setup(); super(o); }
330 	this() { m_impl.setup(); }
331 
332 	override bool tryLock() nothrow { return m_impl.tryLock(); }
333 	override void lock() nothrow { m_impl.lock(); }
334 	override void unlock() nothrow { m_impl.unlock(); }
335 }
336 
337 unittest {
338 	auto mutex = new TaskMutex;
339 
340 	{
341 		auto lock = scopedMutexLock(mutex);
342 		assert(lock.locked);
343 		assert(mutex.m_impl.m_locked);
344 
345 		auto lock2 = scopedMutexLock(mutex, LockMode.tryLock);
346 		assert(!lock2.locked);
347 	}
348 	assert(!mutex.m_impl.m_locked);
349 
350 	auto lock = scopedMutexLock(mutex, LockMode.tryLock);
351 	assert(lock.locked);
352 	lock.unlock();
353 	assert(!lock.locked);
354 
355 	synchronized(mutex){
356 		assert(mutex.m_impl.m_locked);
357 	}
358 	assert(!mutex.m_impl.m_locked);
359 
360 	mutex.performLocked!({
361 		assert(mutex.m_impl.m_locked);
362 	});
363 	assert(!mutex.m_impl.m_locked);
364 
365 	static if (__VERSION__ >= 2067) {
366 		with(mutex.scopedMutexLock) {
367 			assert(mutex.m_impl.m_locked);
368 		}
369 	}
370 }
371 
372 unittest { // test deferred throwing
373 	import vibe.core.core;
374 
375 	auto mutex = new TaskMutex;
376 	auto t1 = runTask({
377 		scope (failure) assert(false, "No exception expected in first task!");
378 		mutex.lock();
379 		scope (exit) mutex.unlock();
380 		sleep(20.msecs);
381 	});
382 
383 	auto t2 = runTask({
384 		mutex.lock();
385 		scope (exit) mutex.unlock();
386 		try {
387 			yield();
388 			assert(false, "Yield is supposed to have thrown an InterruptException.");
389 		} catch (InterruptException) {
390 			// as expected!
391 		} catch (Exception) {
392 			assert(false, "Only InterruptException supposed to be thrown!");
393 		}
394 	});
395 
396 	runTask({
397 		// mutex is now locked in first task for 20 ms
398 		// the second tasks is waiting in lock()
399 		t2.interrupt();
400 		t1.join();
401 		t2.join();
402 		assert(!mutex.m_impl.m_locked); // ensure that the scope(exit) has been executed
403 		exitEventLoop();
404 	});
405 
406 	runEventLoop();
407 }
408 
409 unittest {
410 	runMutexUnitTests!TaskMutex();
411 }
412 
413 
414 /**
415 	Alternative to $(D TaskMutex) that supports interruption.
416 
417 	This class supports the use of $(D vibe.core.task.Task.interrupt()) while
418 	waiting in the $(D lock()) method. However, because the interface is not
419 	$(D nothrow), it cannot be used as an object monitor.
420 
421 	See_Also: $(D TaskMutex), $(D InterruptibleRecursiveTaskMutex)
422 */
423 final class InterruptibleTaskMutex : Lockable {
424 @safe:
425 
426 	private TaskMutexImpl!true m_impl;
427 
428 	this()
429 	{
430 		m_impl.setup();
431 
432 		// detects invalid usage within synchronized(...)
433 		() @trusted { this.__monitor = cast(void*)&NoUseMonitor.instance(); } ();
434 	}
435 
436 	bool tryLock() nothrow { return m_impl.tryLock(); }
437 	void lock() { m_impl.lock(); }
438 	void unlock() nothrow { m_impl.unlock(); }
439 }
440 
441 unittest {
442 	runMutexUnitTests!InterruptibleTaskMutex();
443 }
444 
445 
446 
447 /**
448 	Recursive mutex implementation for tasks.
449 
450 	This mutex type can be used in exchange for a `core.sync.mutex.Mutex`, but
451 	does not block the event loop when contention happens.
452 
453 	Notice:
454 		Because this class is annotated `nothrow`, it cannot be interrupted
455 		using $(D vibe.core.task.Task.interrupt()). The corresponding
456 		$(D InterruptException) will be deferred until the next blocking
457 		operation yields the event loop.
458 
459 		Use $(D InterruptibleRecursiveTaskMutex) as an alternative that can be
460 		interrupted.
461 
462 	See_Also: `TaskMutex`, `core.sync.mutex.Mutex`
463 */
464 final class RecursiveTaskMutex : core.sync.mutex.Mutex, Lockable {
465 @safe:
466 
467 	private RecursiveTaskMutexImpl!false m_impl;
468 
469 	this(Object o) { m_impl.setup(); super(o); }
470 	this() { m_impl.setup(); }
471 
472 	override bool tryLock() { return m_impl.tryLock(); }
473 	override void lock() { m_impl.lock(); }
474 	override void unlock() { m_impl.unlock(); }
475 }
476 
477 unittest {
478 	runMutexUnitTests!RecursiveTaskMutex();
479 }
480 
481 
482 /**
483 	Alternative to $(D RecursiveTaskMutex) that supports interruption.
484 
485 	This class supports the use of $(D vibe.core.task.Task.interrupt()) while
486 	waiting in the $(D lock()) method. However, because the interface is not
487 	$(D nothrow), it cannot be used as an object monitor.
488 
489 	See_Also: $(D RecursiveTaskMutex), $(D InterruptibleTaskMutex)
490 */
491 final class InterruptibleRecursiveTaskMutex : Lockable {
492 @safe:
493 	private RecursiveTaskMutexImpl!true m_impl;
494 
495 	this()
496 	{
497 		m_impl.setup();
498 
499 		// detects invalid usage within synchronized(...)
500 		() @trusted { this.__monitor = cast(void*)&NoUseMonitor.instance(); } ();
501 	}
502 
503 	bool tryLock() { return m_impl.tryLock(); }
504 	void lock() { m_impl.lock(); }
505 	void unlock() { m_impl.unlock(); }
506 }
507 
508 unittest {
509 	runMutexUnitTests!InterruptibleRecursiveTaskMutex();
510 }
511 
512 
513 // Helper class to ensure that the non Object.Monitor compatible interruptible
514 // mutex classes are not accidentally used with the `synchronized` statement
515 private final class NoUseMonitor : Object.Monitor {
516 	private static shared Proxy st_instance;
517 
518     static struct Proxy {
519         Object.Monitor monitor;
520     }
521 
522 	static @property ref shared(Proxy) instance()
523 	@safe nothrow {
524 		static shared(Proxy)* inst = null;
525 		if (inst) return *inst;
526 
527 		() @trusted { // synchronized {} not @safe for DMD <= 2.078.3
528 			synchronized {
529 				if (!st_instance.monitor)
530 					st_instance.monitor = new shared NoUseMonitor;
531 				inst = &st_instance;
532 			}
533 		} ();
534 
535 		return *inst;
536 	}
537 
538 	override void lock() @safe @nogc nothrow {
539 		assert(false, "Interruptible task mutexes cannot be used with synchronized(), use scopedMutexLock instead.");
540 	}
541 
542 	override void unlock() @safe @nogc nothrow {}
543 }
544 
545 
546 private void runMutexUnitTests(M)()
547 {
548 	import vibe.core.core;
549 
550 	auto m = new M;
551 	Task t1, t2;
552 	void runContendedTasks(bool interrupt_t1, bool interrupt_t2) {
553 		assert(!m.m_impl.m_locked);
554 
555 		// t1 starts first and acquires the mutex for 20 ms
556 		// t2 starts second and has to wait in m.lock()
557 		t1 = runTask({
558 			assert(!m.m_impl.m_locked);
559 			m.lock();
560 			assert(m.m_impl.m_locked);
561 			if (interrupt_t1) assertThrown!InterruptException(sleep(100.msecs));
562 			else assertNotThrown(sleep(20.msecs));
563 			m.unlock();
564 		});
565 		t2 = runTask({
566 			assert(!m.tryLock());
567 			if (interrupt_t2) {
568 				try m.lock();
569 				catch (InterruptException) return;
570 				try yield(); // rethrows any deferred exceptions
571 				catch (InterruptException) {
572 					m.unlock();
573 					return;
574 				}
575 				assert(false, "Supposed to have thrown an InterruptException.");
576 			} else assertNotThrown(m.lock());
577 			assert(m.m_impl.m_locked);
578 			sleep(20.msecs);
579 			m.unlock();
580 			assert(!m.m_impl.m_locked);
581 		});
582 	}
583 
584 	// basic lock test
585 	m.performLocked!({
586 		assert(m.m_impl.m_locked);
587 	});
588 	assert(!m.m_impl.m_locked);
589 
590 	// basic contention test
591 	runContendedTasks(false, false);
592 	auto t3 = runTask({
593 		assert(t1.running && t2.running);
594 		assert(m.m_impl.m_locked);
595 		t1.join();
596 		assert(!t1.running && t2.running);
597 		yield(); // give t2 a chance to take the lock
598 		assert(m.m_impl.m_locked);
599 		t2.join();
600 		assert(!t2.running);
601 		assert(!m.m_impl.m_locked);
602 		exitEventLoop();
603 	});
604 	runEventLoop();
605 	assert(!t3.running);
606 	assert(!m.m_impl.m_locked);
607 
608 	// interruption test #1
609 	runContendedTasks(true, false);
610 	t3 = runTask({
611 		assert(t1.running && t2.running);
612 		assert(m.m_impl.m_locked);
613 		t1.interrupt();
614 		t1.join();
615 		assert(!t1.running && t2.running);
616 		yield(); // give t2 a chance to take the lock
617 		assert(m.m_impl.m_locked);
618 		t2.join();
619 		assert(!t2.running);
620 		assert(!m.m_impl.m_locked);
621 		exitEventLoop();
622 	});
623 	runEventLoop();
624 	assert(!t3.running);
625 	assert(!m.m_impl.m_locked);
626 
627 	// interruption test #2
628 	runContendedTasks(false, true);
629 	t3 = runTask({
630 		assert(t1.running && t2.running);
631 		assert(m.m_impl.m_locked);
632 		t2.interrupt();
633 		t2.join();
634 		assert(!t2.running);
635 		static if (is(M == InterruptibleTaskMutex) || is (M == InterruptibleRecursiveTaskMutex))
636 			assert(t1.running && m.m_impl.m_locked);
637 		t1.join();
638 		assert(!t1.running);
639 		assert(!m.m_impl.m_locked);
640 		exitEventLoop();
641 	});
642 	runEventLoop();
643 	assert(!t3.running);
644 	assert(!m.m_impl.m_locked);
645 }
646 
647 
648 /**
649 	Event loop based condition variable or "event" implementation.
650 
651 	This class can be used in exchange for a $(D core.sync.condition.Condition)
652 	to avoid blocking the event loop when waiting.
653 
654 	Notice:
655 		Because this class is annotated nothrow, it cannot be interrupted
656 		using $(D vibe.core.task.Task.interrupt()). The corresponding
657 		$(D InterruptException) will be deferred until the next blocking
658 		operation yields to the event loop.
659 
660 		Use $(D InterruptibleTaskCondition) as an alternative that can be
661 		interrupted.
662 
663 		Note that it is generally not safe to use a `TaskCondition` together with an
664 		interruptible mutex type.
665 
666 	See_Also: `InterruptibleTaskCondition`
667 */
668 final class TaskCondition : core.sync.condition.Condition {
669 @safe:
670 
671 	private TaskConditionImpl!(false, Mutex) m_impl;
672 
673 	this(core.sync.mutex.Mutex mtx)
674 	{
675 		assert(mtx.classinfo is Mutex.classinfo || mtx.classinfo is TaskMutex.classinfo,
676 			"TaskCondition can only be used with Mutex or TaskMutex");
677 
678 		m_impl.setup(mtx);
679 		super(mtx);
680 	}
681 	override @property Mutex mutex() nothrow { return m_impl.mutex; }
682 	override void wait() nothrow { m_impl.wait(); }
683 	override bool wait(Duration timeout) nothrow { return m_impl.wait(timeout); }
684 	override void notify() nothrow { m_impl.notify(); }
685 	override void notifyAll() nothrow  { m_impl.notifyAll(); }
686 }
687 
688 unittest {
689 	new TaskCondition(new Mutex);
690 	new TaskCondition(new TaskMutex);
691 }
692 
693 
694 /** This example shows the typical usage pattern using a `while` loop to make
695 	sure that the final condition is reached.
696 */
697 unittest {
698 	import vibe.core.core;
699 	import vibe.core.log;
700 
701 	__gshared Mutex mutex;
702 	__gshared TaskCondition condition;
703 	__gshared int workers_still_running = 0;
704 
705 	// setup the task condition
706 	mutex = new Mutex;
707 	condition = new TaskCondition(mutex);
708 
709 	logDebug("SETTING UP TASKS");
710 
711 	// start up the workers and count how many are running
712 	foreach (i; 0 .. 4) {
713 		workers_still_running++;
714 		runWorkerTask({
715 			// simulate some work
716 			sleep(100.msecs);
717 
718 			// notify the waiter that we're finished
719 			synchronized (mutex) {
720 				workers_still_running--;
721 			logDebug("DECREMENT %s", workers_still_running);
722 			}
723 			logDebug("NOTIFY");
724 			condition.notify();
725 		});
726 	}
727 
728 	logDebug("STARTING WAIT LOOP");
729 
730 	// wait until all tasks have decremented the counter back to zero
731 	synchronized (mutex) {
732 		while (workers_still_running > 0) {
733 			logDebug("STILL running %s", workers_still_running);
734 			condition.wait();
735 		}
736 	}
737 }
738 
739 
740 /**
741 	Alternative to `TaskCondition` that supports interruption.
742 
743 	This class supports the use of `vibe.core.task.Task.interrupt()` while
744 	waiting in the `wait()` method.
745 
746 	See `TaskCondition` for an example.
747 
748 	Notice:
749 		Note that it is generally not safe to use an
750 		`InterruptibleTaskCondition` together with an interruptible mutex type.
751 
752 	See_Also: `TaskCondition`
753 */
754 final class InterruptibleTaskCondition {
755 @safe:
756 
757 	private TaskConditionImpl!(true, Lockable) m_impl;
758 
759 	this(M)(M mutex)
760 		if (is(M : Mutex) || is (M : Lockable))
761 	{
762 		static if (is(M : Lockable))
763 			m_impl.setup(mutex);
764 		else
765 			m_impl.setupForMutex(mutex);
766 	}
767 
768 	@property Lockable mutex() { return m_impl.mutex; }
769 	void wait() { m_impl.wait(); }
770 	bool wait(Duration timeout) { return m_impl.wait(timeout); }
771 	void notify() { m_impl.notify(); }
772 	void notifyAll() { m_impl.notifyAll(); }
773 }
774 
775 unittest {
776 	new InterruptibleTaskCondition(new Mutex);
777 	new InterruptibleTaskCondition(new TaskMutex);
778 	new InterruptibleTaskCondition(new InterruptibleTaskMutex);
779 }
780 
781 
782 /** A manually triggered single threaded cross-task event.
783 
784 	Note: the ownership can be shared between multiple fibers of the same thread.
785 */
786 struct LocalManualEvent {
787 	import core.thread : Thread;
788 	import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny;
789 
790 	@safe:
791 
792 	private {
793 		alias Waiter = ThreadLocalWaiter!false;
794 
795 		Waiter m_waiter;
796 	}
797 
798 	// thread destructor in vibe.core.core will decrement the ref. count
799 	package static EventID ms_threadEvent;
800 
801 	private void initialize()
802 	nothrow {
803 		import vibe.internal.allocator : Mallocator, makeGCSafe;
804 		m_waiter = () @trusted { return Mallocator.instance.makeGCSafe!Waiter; } ();
805 	}
806 
807 	this(this)
808 	{
809 		if (m_waiter)
810 			return m_waiter.addRef();
811 	}
812 
813 	~this()
814 	nothrow {
815 		import vibe.internal.allocator : Mallocator, disposeGCSafe;
816 		if (m_waiter) {
817 			if (!m_waiter.releaseRef()) {
818 				static if (__VERSION__ < 2087) scope (failure) assert(false);
819 				() @trusted { Mallocator.instance.disposeGCSafe(m_waiter); } ();
820 			}
821 		}
822 	}
823 
824 	bool opCast() const nothrow { return m_waiter !is null; }
825 
826 	/// A counter that is increased with every emit() call
827 	int emitCount() const nothrow { return m_waiter.m_emitCount; }
828 
829 	/// Emits the signal, waking up all owners of the signal.
830 	int emit()
831 	nothrow {
832 		assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()");
833 		logTrace("unshared emit");
834 		auto ec = m_waiter.m_emitCount++;
835 		m_waiter.emit();
836 		return ec;
837 	}
838 
839 	/// Emits the signal, waking up a single owners of the signal.
840 	int emitSingle()
841 	nothrow {
842 		assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()");
843 		logTrace("unshared single emit");
844 		auto ec = m_waiter.m_emitCount++;
845 		m_waiter.emitSingle();
846 		return ec;
847 	}
848 
849 	/** Acquires ownership and waits until the signal is emitted.
850 
851 		Note that in order not to miss any emits it is necessary to use the
852 		overload taking an integer.
853 
854 		Throws:
855 			May throw an $(D InterruptException) if the task gets interrupted
856 			using $(D Task.interrupt()).
857 	*/
858 	int wait() { return wait(this.emitCount); }
859 
860 	/** Acquires ownership and waits until the signal is emitted and the emit
861 		count is larger than a given one.
862 
863 		Throws:
864 			May throw an $(D InterruptException) if the task gets interrupted
865 			using $(D Task.interrupt()).
866 	*/
867 	int wait(int emit_count) { return doWait!true(Duration.max, emit_count); }
868 	/// ditto
869 	int wait(Duration timeout, int emit_count) { return doWait!true(timeout, emit_count); }
870 
871 	/** Same as $(D wait), but defers throwing any $(D InterruptException).
872 
873 		This method is annotated $(D nothrow) at the expense that it cannot be
874 		interrupted.
875 	*/
876 	int waitUninterruptible() nothrow { return waitUninterruptible(this.emitCount); }
877 	/// ditto
878 	int waitUninterruptible(int emit_count) nothrow { return doWait!false(Duration.max, emit_count); }
879 	/// ditto
880 	int waitUninterruptible(Duration timeout, int emit_count) nothrow { return doWait!false(timeout, emit_count); }
881 
882 	private int doWait(bool interruptible)(Duration timeout, int emit_count)
883 	{
884 		import core.time : MonoTime;
885 
886 		assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()");
887 
888 		MonoTime target_timeout, now;
889 		if (timeout != Duration.max) {
890 			try now = MonoTime.currTime();
891 			catch (Exception e) { assert(false, e.msg); }
892 			target_timeout = now + timeout;
893 		}
894 
895 		while (m_waiter.m_emitCount - emit_count <= 0) {
896 			m_waiter.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max);
897 			try now = MonoTime.currTime();
898 			catch (Exception e) { assert(false, e.msg); }
899 			if (now >= target_timeout) break;
900 		}
901 
902 		return m_waiter.m_emitCount;
903 	}
904 }
905 
906 unittest {
907 	import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep;
908 
909 	auto e = createManualEvent();
910 	auto w1 = runTask({ e.wait(100.msecs, e.emitCount); });
911 	auto w2 = runTask({ e.wait(500.msecs, e.emitCount); });
912 	runTask({
913 		sleep(50.msecs);
914 		e.emit();
915 		sleep(50.msecs);
916 		assert(!w1.running && !w2.running);
917 		exitEventLoop();
918 	});
919 	runEventLoop();
920 }
921 
922 unittest {
923 	import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep;
924 	auto e = createManualEvent();
925 	// integer overflow test
926 	e.m_waiter.m_emitCount = int.max;
927 	auto w1 = runTask({ e.wait(50.msecs, e.emitCount); });
928 	runTask({
929 		sleep(5.msecs);
930 		e.emit();
931 		sleep(50.msecs);
932 		assert(!w1.running);
933 		exitEventLoop();
934 	});
935 	runEventLoop();
936 }
937 
938 unittest { // ensure that cancelled waiters are properly handled and that a FIFO order is implemented
939 	import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep;
940 
941 	LocalManualEvent l = createManualEvent();
942 
943 	Task t2;
944 	runTask({
945 		l.wait();
946 		t2.interrupt();
947 		sleep(20.msecs);
948 		exitEventLoop();
949 	});
950 	t2 = runTask({
951 		try {
952 			l.wait();
953 			assert(false, "Shouldn't reach this.");
954 		} catch (InterruptException e) {}
955 	});
956 	runTask({
957 		l.emit();
958 	});
959 	runEventLoop();
960 }
961 
962 unittest { // ensure that LocalManualEvent behaves correctly after being copied
963 	import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep;
964 
965 	LocalManualEvent l = createManualEvent();
966 	runTask({
967 		auto lc = l;
968 		sleep(100.msecs);
969 		lc.emit();
970 	});
971 	runTask({
972 		assert(l.wait(1.seconds, l.emitCount));
973 		exitEventLoop();
974 	});
975 	runEventLoop();
976 }
977 
978 
979 /** A manually triggered multi threaded cross-task event.
980 
981 	Note: the ownership can be shared between multiple fibers and threads.
982 */
983 struct ManualEvent {
984 	import core.thread : Thread;
985 	import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny;
986 	import vibe.internal.list : StackSList;
987 
988 	@safe:
989 
990 	private {
991 		alias ThreadWaiter = ThreadLocalWaiter!true;
992 
993 		int m_emitCount;
994 		static struct Waiters {
995 			StackSList!ThreadWaiter active; // actively waiting
996 			StackSList!ThreadWaiter free; // free-list of reusable waiter structs
997 		}
998 		Monitor!(Waiters, shared(Mutex)) m_waiters;
999 	}
1000 
1001 	// thread destructor in vibe.core.core will decrement the ref. count
1002 	package static EventID ms_threadEvent;
1003 
1004 	enum EmitMode {
1005 		single,
1006 		all
1007 	}
1008 
1009 	@disable this(this);
1010 
1011 	private void initialize()
1012 	shared nothrow {
1013 		m_waiters = createMonitor!(ManualEvent.Waiters)(new shared Mutex);
1014 	}
1015 
1016 	deprecated("ManualEvent is always non-null!")
1017 	bool opCast() const shared nothrow { return true; }
1018 
1019 	/// A counter that is increased with every emit() call
1020 	int emitCount() const shared nothrow @trusted { return atomicLoad(m_emitCount); }
1021 
1022 	/// Emits the signal, waking up all owners of the signal.
1023 	int emit()
1024 	shared nothrow @trusted {
1025 		import core.atomic : atomicOp, cas;
1026 
1027 		() @trusted { logTrace("emit shared %s", cast(void*)&this); } ();
1028 
1029 		auto ec = atomicOp!"+="(m_emitCount, 1);
1030 		auto thisthr = Thread.getThis();
1031 
1032 		ThreadWaiter lw;
1033 		auto drv = eventDriver;
1034 		m_waiters.lock.active.filter((ThreadWaiter w) {
1035 			() @trusted { logTrace("waiter %s", cast(void*)w); } ();
1036 			if (w.m_driver is drv) {
1037 				lw = w;
1038 				lw.addRef();
1039 			} else {
1040 				try {
1041 					assert(w.m_event != EventID.init);
1042 					() @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true);
1043 				} catch (Exception e) assert(false, e.msg);
1044 			}
1045 			return true;
1046 		});
1047 		() @trusted { logTrace("lw %s", cast(void*)lw); } ();
1048 		if (lw) {
1049 			lw.emit();
1050 			releaseWaiter(lw);
1051 		}
1052 
1053 		logTrace("emit shared done");
1054 
1055 		return ec;
1056 	}
1057 
1058 	/// Emits the signal, waking up at least one waiting task
1059 	int emitSingle()
1060 	shared nothrow @trusted {
1061 		import core.atomic : atomicOp, cas;
1062 
1063 		() @trusted { logTrace("emit shared single %s", cast(void*)&this); } ();
1064 
1065 		auto ec = atomicOp!"+="(m_emitCount, 1);
1066 		auto thisthr = Thread.getThis();
1067 
1068 		ThreadWaiter lw;
1069 		auto drv = eventDriver;
1070 		m_waiters.lock.active.iterate((ThreadWaiter w) {
1071 			() @trusted { logTrace("waiter %s", cast(void*)w); } ();
1072 			if (w.m_driver is drv) {
1073 				if (w.unused) return true;
1074 				lw = w;
1075 				lw.addRef();
1076 			} else {
1077 				try {
1078 					assert(w.m_event != EventID.invalid);
1079 					() @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true);
1080 				} catch (Exception e) assert(false, e.msg);
1081 			}
1082 			return false;
1083 		});
1084 		() @trusted { logTrace("lw %s", cast(void*)lw); } ();
1085 		if (lw) {
1086 			lw.emitSingle();
1087 			releaseWaiter(lw);
1088 		}
1089 
1090 		logTrace("emit shared done");
1091 
1092 		return ec;
1093 	}
1094 
1095 	/** Acquires ownership and waits until the signal is emitted.
1096 
1097 		Note that in order not to miss any emits it is necessary to use the
1098 		overload taking an integer.
1099 
1100 		Throws:
1101 			May throw an $(D InterruptException) if the task gets interrupted
1102 			using $(D Task.interrupt()).
1103 	*/
1104 	int wait() shared { return wait(this.emitCount); }
1105 
1106 	/** Acquires ownership and waits until the emit count differs from the
1107 		given one or until a timeout is reached.
1108 
1109 		Throws:
1110 			May throw an $(D InterruptException) if the task gets interrupted
1111 			using $(D Task.interrupt()).
1112 	*/
1113 	int wait(int emit_count) shared { return doWaitShared!true(Duration.max, emit_count); }
1114 	/// ditto
1115 	int wait(Duration timeout, int emit_count) shared { return doWaitShared!true(timeout, emit_count); }
1116 
1117 	/** Same as $(D wait), but defers throwing any $(D InterruptException).
1118 
1119 		This method is annotated $(D nothrow) at the expense that it cannot be
1120 		interrupted.
1121 	*/
1122 	int waitUninterruptible() shared nothrow { return waitUninterruptible(this.emitCount); }
1123 	/// ditto
1124 	int waitUninterruptible(int emit_count) shared nothrow { return doWaitShared!false(Duration.max, emit_count); }
1125 	/// ditto
1126 	int waitUninterruptible(Duration timeout, int emit_count) shared nothrow { return doWaitShared!false(timeout, emit_count); }
1127 
1128 	private int doWaitShared(bool interruptible)(Duration timeout, int emit_count)
1129 	shared {
1130 		import core.time : MonoTime;
1131 
1132 		() @trusted { logTrace("wait shared %s", cast(void*)&this); } ();
1133 
1134 		if (ms_threadEvent is EventID.invalid) {
1135 			ms_threadEvent = eventDriver.events.create();
1136 			assert(ms_threadEvent != EventID.invalid, "Failed to create event!");
1137 		}
1138 
1139 		MonoTime target_timeout, now;
1140 		if (timeout != Duration.max) {
1141 			try now = MonoTime.currTime();
1142 			catch (Exception e) { assert(false, e.msg); }
1143 			target_timeout = now + timeout;
1144 		}
1145 
1146 		int ec = this.emitCount;
1147 
1148 		acquireThreadWaiter((scope ThreadWaiter w) {
1149 			while (ec - emit_count <= 0) {
1150 				w.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, ms_threadEvent, () => (this.emitCount - emit_count) > 0);
1151 				ec = this.emitCount;
1152 
1153 				if (timeout != Duration.max) {
1154 					try now = MonoTime.currTime();
1155 					catch (Exception e) { assert(false, e.msg); }
1156 					if (now >= target_timeout) break;
1157 				}
1158 			}
1159 		});
1160 
1161 		return ec;
1162 	}
1163 
1164 	private void acquireThreadWaiter(DEL)(scope DEL del)
1165 	shared {
1166 		import vibe.internal.allocator : processAllocator, makeGCSafe;
1167 
1168 		ThreadWaiter w;
1169 		auto drv = eventDriver;
1170 
1171 		with (m_waiters.lock) {
1172 			active.iterate((aw) {
1173 				if (aw.m_driver is drv) {
1174 					w = aw;
1175 					w.addRef();
1176 					return false;
1177 				}
1178 				return true;
1179 			});
1180 
1181 			if (!w) {
1182 				free.filter((fw) {
1183 					if (fw.m_driver is drv) {
1184 						w = fw;
1185 						w.addRef();
1186 						return false;
1187 					}
1188 					return true;
1189 				});
1190 
1191 				if (!w) {
1192 					() @trusted {
1193 						try {
1194 							w = processAllocator.makeGCSafe!ThreadWaiter;
1195 							w.m_driver = drv;
1196 							w.m_event = ms_threadEvent;
1197 						} catch (Exception e) {
1198 							assert(false, "Failed to allocate thread waiter.");
1199 						}
1200 					} ();
1201 				}
1202 
1203 				assert(w.m_refCount == 1);
1204 				active.add(w);
1205 			}
1206 		}
1207 
1208 		scope (exit) releaseWaiter(w);
1209 
1210 		del(w);
1211 	}
1212 
1213 	private void releaseWaiter(ThreadWaiter w)
1214 	shared nothrow {
1215 		if (!w.releaseRef()) {
1216 			assert(w.m_driver is eventDriver, "Waiter was reassigned a different driver!?");
1217 			assert(w.unused, "Waiter still used, but not referenced!?");
1218 			with (m_waiters.lock) {
1219 				auto rmvd = active.remove(w);
1220 				assert(rmvd, "Waiter not in active queue anymore!?");
1221 				free.add(w);
1222 				// TODO: cap size of m_freeWaiters
1223 			}
1224 		}
1225 	}
1226 }
1227 
1228 unittest {
1229 	import vibe.core.core : exitEventLoop, runEventLoop, runTask, runWorkerTaskH, sleep;
1230 
1231 	auto e = createSharedManualEvent();
1232 	auto w1 = runTask({ e.wait(100.msecs, e.emitCount); });
1233 	static void w(shared(ManualEvent)* e) { e.wait(500.msecs, e.emitCount); }
1234 	auto w2 = runWorkerTaskH(&w, &e);
1235 	runTask({
1236 		sleep(50.msecs);
1237 		e.emit();
1238 		sleep(50.msecs);
1239 		assert(!w1.running && !w2.running);
1240 		exitEventLoop();
1241 	});
1242 	runEventLoop();
1243 }
1244 
1245 unittest {
1246 	import vibe.core.core : runTask, runWorkerTaskH, setTimer, sleep;
1247 	import vibe.core.taskpool : TaskPool;
1248 	import core.time : msecs, usecs;
1249 	import std.concurrency : send, receiveOnly;
1250 	import std.random : uniform;
1251 
1252 	auto tpool = new shared TaskPool(4);
1253 	scope (exit) tpool.terminate();
1254 
1255 	static void test(shared(ManualEvent)* evt, Task owner)
1256 	{
1257 		owner.tid.send(Task.getThis());
1258 
1259 		int ec = evt.emitCount;
1260 		auto thist = Task.getThis();
1261 		auto tm = setTimer(500.msecs, { thist.interrupt(); }); // watchdog
1262 		scope (exit) tm.stop();
1263 		while (ec < 5_000) {
1264 			tm.rearm(500.msecs);
1265 			sleep(uniform(0, 10_000).usecs);
1266 			if (uniform(0, 10) == 0) evt.emit();
1267 			auto ecn = evt.wait(ec);
1268 			assert(ecn > ec);
1269 			ec = ecn;
1270 		}
1271 	}
1272 
1273 	auto watchdog = setTimer(30.seconds, { assert(false, "ManualEvent test has hung."); });
1274 	scope (exit) watchdog.stop();
1275 
1276 	auto e = createSharedManualEvent();
1277 	Task[] tasks;
1278 
1279 	runTask({
1280 		auto thist = Task.getThis();
1281 
1282 		// start 25 tasks in each thread
1283 		foreach (i; 0 .. 25) tpool.runTaskDist(&test, &e, thist);
1284 		// collect all task handles
1285 		foreach (i; 0 .. 4*25) tasks ~= receiveOnly!Task;
1286 
1287 		auto tm = setTimer(500.msecs, { thist.interrupt(); }); // watchdog
1288 		scope (exit) tm.stop();
1289 		int pec = 0;
1290 		while (e.emitCount < 5_000) {
1291 			tm.rearm(500.msecs);
1292 			sleep(50.usecs);
1293 			e.emit();
1294 		}
1295 
1296 		// wait for all worker tasks to finish
1297 		foreach (t; tasks) t.join();
1298 	}).join();
1299 }
1300 
1301 package shared struct Monitor(T, M)
1302 {
1303 	alias Mutex = M;
1304 	alias Data = T;
1305 	private {
1306 		Mutex mutex;
1307 		Data data;
1308 	}
1309 
1310 	static struct Locked {
1311 		shared(Monitor)* m;
1312 		@disable this(this);
1313 		~this() {
1314 			() @trusted {
1315 				static if (is(typeof(Mutex.init.unlock_nothrow())))
1316 					(cast(Mutex)m.mutex).unlock_nothrow();
1317 				else (cast(Mutex)m.mutex).unlock();
1318 			} ();
1319 		}
1320 		ref inout(Data) get() inout @trusted { return *cast(inout(Data)*)&m.data; }
1321 		alias get this;
1322 	}
1323 
1324 	Locked lock() {
1325 		() @trusted {
1326 			static if (is(typeof(Mutex.init.lock_nothrow())))
1327 				(cast(Mutex)mutex).lock_nothrow();
1328 			else (cast(Mutex)mutex).lock();
1329 		} ();
1330 		return Locked(() @trusted { return &this; } ());
1331 	}
1332 
1333 	const(Locked) lock() const {
1334 		() @trusted {
1335 			static if (is(typeof(Mutex.init.lock_nothrow())))
1336 				(cast(Mutex)mutex).lock_nothrow();
1337 			else (cast(Mutex)mutex).lock();
1338 		} ();
1339 		return const(Locked)(() @trusted { return &this; } ());
1340 	}
1341 }
1342 
1343 
1344 package shared(Monitor!(T, M)) createMonitor(T, M)(M mutex)
1345 @trusted {
1346 	shared(Monitor!(T, M)) ret;
1347 	ret.mutex = cast(shared)mutex;
1348 	return ret;
1349 }
1350 
1351 
1352 private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) {
1353 	import vibe.internal.list : CircularDList;
1354 
1355 	private {
1356 		static struct TaskWaiter {
1357 			TaskWaiter* prev, next;
1358 			void delegate() @safe nothrow notifier;
1359 
1360 			void wait(void delegate() @safe nothrow del) @safe nothrow {
1361 				assert(notifier is null, "Local waiter is used twice!");
1362 				notifier = del;
1363 			}
1364 			void cancel() @safe nothrow { notifier = null; }
1365 			void emit() @safe nothrow { auto n = notifier; notifier = null; n(); }
1366 		}
1367 
1368 		static if (EVENT_TRIGGERED) {
1369 			package(vibe) ThreadLocalWaiter next; // queue of other waiters of the same thread
1370 			NativeEventDriver m_driver;
1371 			EventID m_event = EventID.invalid;
1372 		} else {
1373 			int m_emitCount = 0;
1374 		}
1375 		int m_refCount = 1;
1376 		TaskWaiter m_pivot;
1377 		TaskWaiter m_emitPivot;
1378 		CircularDList!(TaskWaiter*) m_waiters;
1379 	}
1380 
1381 	this()
1382 	{
1383 		m_waiters = CircularDList!(TaskWaiter*)(() @trusted { return &m_pivot; } ());
1384 	}
1385 
1386 	static if (EVENT_TRIGGERED) {
1387 		~this()
1388 		{
1389 			import vibe.core.internal.release : releaseHandle;
1390 
1391 			if (m_event != EventID.invalid)
1392 				releaseHandle!"events"(m_event, () @trusted { return cast(shared)m_driver; } ());
1393 		}
1394 	}
1395 
1396 	@property bool unused() const @safe nothrow { return m_waiters.empty; }
1397 
1398 	void addRef() @safe nothrow { assert(m_refCount >= 0); m_refCount++; }
1399 	bool releaseRef() @safe nothrow { assert(m_refCount > 0); return --m_refCount > 0; }
1400 
1401 	bool wait(bool interruptible)(Duration timeout, EventID evt = EventID.invalid, scope bool delegate() @safe nothrow exit_condition = null)
1402 	@safe {
1403 		import core.time : MonoTime;
1404 		import vibe.internal.async : Waitable, asyncAwaitAny;
1405 
1406 		TaskWaiter waiter_store;
1407 		TaskWaiter* waiter = () @trusted { return &waiter_store; } ();
1408 
1409 		m_waiters.insertBack(waiter);
1410 		assert(waiter.next !is null);
1411 		scope (exit)
1412 			if (waiter.next !is null) {
1413 				m_waiters.remove(waiter);
1414 				assert(!waiter.next);
1415 			}
1416 
1417 		MonoTime target_timeout, now;
1418 		if (timeout != Duration.max) {
1419 			try now = MonoTime.currTime();
1420 			catch (Exception e) { assert(false, e.msg); }
1421 			target_timeout = now + timeout;
1422 		}
1423 
1424 		bool cancelled;
1425 
1426 		alias waitable = Waitable!(typeof(TaskWaiter.notifier),
1427 			(cb) { waiter.wait(cb); },
1428 			(cb) { cancelled = true; waiter.cancel(); },
1429 			() {}
1430 		);
1431 
1432 		alias ewaitable = Waitable!(EventCallback,
1433 			(cb) {
1434 				eventDriver.events.wait(evt, cb);
1435 				// check for exit condition *after* starting to wait for the event
1436 				// to avoid a race condition
1437 				if (exit_condition()) {
1438 					eventDriver.events.cancelWait(evt, cb);
1439 					cb(evt);
1440 				}
1441 			},
1442 			(cb) { eventDriver.events.cancelWait(evt, cb); },
1443 			(EventID) {}
1444 		);
1445 
1446 		if (evt != EventID.invalid) {
1447 			asyncAwaitAny!(interruptible, waitable, ewaitable)(timeout);
1448 		} else {
1449 			asyncAwaitAny!(interruptible, waitable)(timeout);
1450 		}
1451 
1452 		if (cancelled) {
1453 			assert(waiter.next !is null, "Cancelled waiter not in queue anymore!?");
1454 			return false;
1455 		} else {
1456 			assert(waiter.next is null, "Triggered waiter still in queue!?");
1457 			return true;
1458 		}
1459 	}
1460 
1461 	void emit()
1462 	@safe nothrow {
1463 		import std.algorithm.mutation : swap;
1464 		import vibe.core.core : yield;
1465 
1466 		if (m_waiters.empty) return;
1467 
1468 		TaskWaiter* pivot = () @trusted { return &m_emitPivot; } ();
1469 
1470 		if (pivot.next) { // another emit in progress?
1471 			// shift pivot to the end, so that the other emit call will process all pending waiters
1472 			if (pivot !is m_waiters.back) {
1473 				m_waiters.remove(pivot);
1474 				m_waiters.insertBack(pivot);
1475 			}
1476 			return;
1477 		}
1478 
1479 		m_waiters.insertBack(pivot);
1480 		scope (exit) m_waiters.remove(pivot);
1481 
1482 		foreach (w; m_waiters) {
1483 			if (w is pivot) break;
1484 			emitWaiter(w);
1485 		}
1486 	}
1487 
1488 	bool emitSingle()
1489 	@safe nothrow {
1490 		if (m_waiters.empty) return false;
1491 
1492 		TaskWaiter* pivot = () @trusted { return &m_emitPivot; } ();
1493 
1494 		if (pivot.next) { // another emit in progress?
1495 			// shift pivot to the right, so that the other emit call will process another waiter
1496 			if (pivot !is m_waiters.back) {
1497 				auto n = pivot.next;
1498 				m_waiters.remove(pivot);
1499 				m_waiters.insertAfter(pivot, n);
1500 			}
1501 			return true;
1502 		}
1503 
1504 		emitWaiter(m_waiters.front);
1505 		return true;
1506 	}
1507 
1508 	private void emitWaiter(TaskWaiter* w)
1509 	@safe nothrow {
1510 		m_waiters.remove(w);
1511 
1512 		if (w.notifier !is null) {
1513 			logTrace("notify task %s %s %s", cast(void*)w, () @trusted { return cast(void*)w.notifier.funcptr; } (), w.notifier.ptr);
1514 			w.emit();
1515 		} else logTrace("notify callback is null");
1516 	}
1517 }
1518 
1519 private struct TaskMutexImpl(bool INTERRUPTIBLE) {
1520 	private {
1521 		shared(bool) m_locked = false;
1522 		shared(uint) m_waiters = 0;
1523 		shared(ManualEvent) m_signal;
1524 		debug Task m_owner;
1525 	}
1526 
1527 	void setup()
1528 	{
1529 		m_signal.initialize();
1530 	}
1531 
1532 	@trusted bool tryLock()
1533 	{
1534 		if (cas(&m_locked, false, true)) {
1535 			debug m_owner = Task.getThis();
1536 			debug(VibeMutexLog) logTrace("mutex %s lock %s", cast(void*)&this, atomicLoad(m_waiters));
1537 			return true;
1538 		}
1539 		return false;
1540 	}
1541 
1542 	@trusted void lock()
1543 	{
1544 		if (tryLock()) return;
1545 		debug assert(m_owner == Task() || m_owner != Task.getThis(), "Recursive mutex lock.");
1546 		atomicOp!"+="(m_waiters, 1);
1547 		debug(VibeMutexLog) logTrace("mutex %s wait %s", cast(void*)&this, atomicLoad(m_waiters));
1548 		scope(exit) atomicOp!"-="(m_waiters, 1);
1549 		auto ecnt = m_signal.emitCount();
1550 		while (!tryLock()) {
1551 			static if (INTERRUPTIBLE) ecnt = m_signal.wait(ecnt);
1552 			else ecnt = m_signal.waitUninterruptible(ecnt);
1553 		}
1554 	}
1555 
1556 	@trusted void unlock()
1557 	{
1558 		assert(m_locked);
1559 		debug {
1560 			assert(m_owner == Task.getThis());
1561 			m_owner = Task();
1562 		}
1563 		atomicStore!(MemoryOrder.rel)(m_locked, false);
1564 		debug(VibeMutexLog) logTrace("mutex %s unlock %s", cast(void*)&this, atomicLoad(m_waiters));
1565 		if (atomicLoad(m_waiters) > 0)
1566 			m_signal.emit();
1567 	}
1568 }
1569 
1570 private struct RecursiveTaskMutexImpl(bool INTERRUPTIBLE) {
1571 	import std.stdio;
1572 	private {
1573 		core.sync.mutex.Mutex m_mutex;
1574 		Task m_owner;
1575 		size_t m_recCount = 0;
1576 		shared(uint) m_waiters = 0;
1577 		shared(ManualEvent) m_signal;
1578 		@property bool m_locked() const { return m_recCount > 0; }
1579 	}
1580 
1581 	void setup()
1582 	{
1583 		m_mutex = new core.sync.mutex.Mutex;
1584 		m_signal.initialize();
1585 	}
1586 
1587 	@trusted bool tryLock()
1588 	{
1589 		auto self = Task.getThis();
1590 		return m_mutex.performLocked!({
1591 			if (!m_owner) {
1592 				assert(m_recCount == 0);
1593 				m_recCount = 1;
1594 				m_owner = self;
1595 				return true;
1596 			} else if (m_owner == self) {
1597 				m_recCount++;
1598 				return true;
1599 			}
1600 			return false;
1601 		});
1602 	}
1603 
1604 	@trusted void lock()
1605 	{
1606 		if (tryLock()) return;
1607 		atomicOp!"+="(m_waiters, 1);
1608 		debug(VibeMutexLog) logTrace("mutex %s wait %s", cast(void*)&this, atomicLoad(m_waiters));
1609 		scope(exit) atomicOp!"-="(m_waiters, 1);
1610 		auto ecnt = m_signal.emitCount();
1611 		while (!tryLock()) {
1612 			static if (INTERRUPTIBLE) ecnt = m_signal.wait(ecnt);
1613 			else ecnt = m_signal.waitUninterruptible(ecnt);
1614 		}
1615 	}
1616 
1617 	@trusted void unlock()
1618 	{
1619 		auto self = Task.getThis();
1620 		m_mutex.performLocked!({
1621 			assert(m_owner == self);
1622 			assert(m_recCount > 0);
1623 			m_recCount--;
1624 			if (m_recCount == 0) {
1625 				m_owner = Task.init;
1626 			}
1627 		});
1628 		debug(VibeMutexLog) logTrace("mutex %s unlock %s", cast(void*)&this, atomicLoad(m_waiters));
1629 		if (atomicLoad(m_waiters) > 0)
1630 			m_signal.emit();
1631 	}
1632 }
1633 
1634 private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) {
1635 	private {
1636 		LOCKABLE m_mutex;
1637 		static if (is(LOCKABLE == Mutex))
1638 			TaskMutex m_taskMutex;
1639 		shared(ManualEvent) m_signal;
1640 	}
1641 
1642 	static if (is(LOCKABLE == Lockable)) {
1643 		final class MutexWrapper : Lockable {
1644 			private core.sync.mutex.Mutex m_mutex;
1645 			this(core.sync.mutex.Mutex mtx) { m_mutex = mtx; }
1646 			@trusted void lock() { m_mutex.lock(); }
1647 			@trusted void unlock() { m_mutex.unlock(); }
1648 			@trusted bool tryLock() { return m_mutex.tryLock(); }
1649 		}
1650 
1651 		void setupForMutex(core.sync.mutex.Mutex mtx)
1652 		{
1653 			setup(new MutexWrapper(mtx));
1654 		}
1655 	}
1656 
1657 	void setup(LOCKABLE mtx)
1658 	{
1659 		m_mutex = mtx;
1660 		static if (is(typeof(m_taskMutex)))
1661 			m_taskMutex = cast(TaskMutex)mtx;
1662 		m_signal.initialize();
1663 	}
1664 
1665 	@property LOCKABLE mutex() { return m_mutex; }
1666 
1667 	@trusted void wait()
1668 	{
1669 		if (auto tm = cast(TaskMutex)m_mutex) {
1670 			assert(tm.m_impl.m_locked);
1671 			debug assert(tm.m_impl.m_owner == Task.getThis());
1672 		}
1673 
1674 		auto refcount = m_signal.emitCount;
1675 
1676 		static if (is(LOCKABLE == Mutex)) {
1677 			if (m_taskMutex) m_taskMutex.unlock();
1678 			else m_mutex.unlock_nothrow();
1679 		} else m_mutex.unlock();
1680 
1681 		scope(exit) {
1682 			static if (is(LOCKABLE == Mutex)) {
1683 				if (m_taskMutex) m_taskMutex.lock();
1684 				else m_mutex.lock_nothrow();
1685 			} else m_mutex.lock();
1686 		}
1687 		static if (INTERRUPTIBLE) m_signal.wait(refcount);
1688 		else m_signal.waitUninterruptible(refcount);
1689 	}
1690 
1691 	@trusted bool wait(Duration timeout)
1692 	{
1693 		assert(!timeout.isNegative());
1694 		if (auto tm = cast(TaskMutex)m_mutex) {
1695 			assert(tm.m_impl.m_locked);
1696 			debug assert(tm.m_impl.m_owner == Task.getThis());
1697 		}
1698 
1699 		auto refcount = m_signal.emitCount;
1700 
1701 		static if (is(LOCKABLE == Mutex)) {
1702 			if (m_taskMutex) m_taskMutex.unlock();
1703 			else m_mutex.unlock_nothrow();
1704 		} else m_mutex.unlock();
1705 
1706 		scope(exit) {
1707 			static if (is(LOCKABLE == Mutex)) {
1708 				if (m_taskMutex) m_taskMutex.lock();
1709 				else m_mutex.lock_nothrow();
1710 			} else m_mutex.lock();
1711 		}
1712 
1713 		static if (INTERRUPTIBLE) return m_signal.wait(timeout, refcount) != refcount;
1714 		else return m_signal.waitUninterruptible(timeout, refcount) != refcount;
1715 	}
1716 
1717 	@trusted void notify()
1718 	{
1719 		m_signal.emit();
1720 	}
1721 
1722 	@trusted void notifyAll()
1723 	{
1724 		m_signal.emit();
1725 	}
1726 }
1727 
1728 /** Contains the shared state of a $(D TaskReadWriteMutex).
1729  *
1730  *  Since a $(D TaskReadWriteMutex) consists of two actual Mutex
1731  *  objects that rely on common memory, this class implements
1732  *  the actual functionality of their method calls.
1733  *
1734  *  The method implementations are based on two static parameters
1735  *  ($(D INTERRUPTIBLE) and $(D INTENT)), which are configured through
1736  *  template arguments:
1737  *
1738  *  - $(D INTERRUPTIBLE) determines whether the mutex implementation
1739  *    are interruptible by vibe.d's $(D vibe.core.task.Task.interrupt())
1740  *    method or not.
1741  *
1742  *  - $(D INTENT) describes the intent, with which a locking operation is
1743  *    performed (i.e. $(D READ_ONLY) or $(D READ_WRITE)). RO locking allows for
1744  *    multiple Tasks holding the mutex, whereas RW locking will cause
1745  *    a "bottleneck" so that only one Task can write to the protected
1746  *    data at once.
1747  */
1748 private struct ReadWriteMutexState(bool INTERRUPTIBLE)
1749 {
1750     /** The policy with which the mutex should operate.
1751      *
1752      *  The policy determines how the acquisition of the locks is
1753      *  performed and can be used to tune the mutex according to the
1754      *  underlying algorithm in which it is used.
1755      *
1756      *  According to the provided policy, the mutex will either favor
1757      *  reading or writing tasks and could potentially starve the
1758      *  respective opposite.
1759      *
1760      *  cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy)
1761      */
1762     enum Policy : int
1763     {
1764         /** Readers are prioritized, writers may be starved as a result. */
1765         PREFER_READERS = 0,
1766         /** Writers are prioritized, readers may be starved as a result. */
1767         PREFER_WRITERS
1768     }
1769 
1770     /** The intent with which a locking operation is performed.
1771      *
1772      *  Since both locks share the same underlying algorithms, the actual
1773      *  intent with which a lock operation is performed (i.e read/write)
1774      *  are passed as a template parameter to each method.
1775      */
1776     enum LockingIntent : bool
1777     {
1778         /** Perform a read lock/unlock operation. Multiple reading locks can be
1779          *  active at a time. */
1780         READ_ONLY = 0,
1781         /** Perform a write lock/unlock operation. Only a single writer can
1782          *  hold a lock at any given time. */
1783         READ_WRITE = 1
1784     }
1785 
1786     private {
1787         //Queue counters
1788         /** The number of reading tasks waiting for the lock to become available. */
1789         shared(uint)  m_waitingForReadLock = 0;
1790         /** The number of writing tasks waiting for the lock to become available. */
1791         shared(uint)  m_waitingForWriteLock = 0;
1792 
1793         //Lock counters
1794         /** The number of reading tasks that currently hold the lock. */
1795         uint  m_activeReadLocks = 0;
1796         /** The number of writing tasks that currently hold the lock (binary). */
1797         ubyte m_activeWriteLocks = 0;
1798 
1799         /** The policy determining the lock's behavior. */
1800         Policy m_policy;
1801 
1802         //Queue Events
1803         /** The event used to wake reading tasks waiting for the lock while it is blocked. */
1804         shared(ManualEvent) m_readyForReadLock;
1805         /** The event used to wake writing tasks waiting for the lock while it is blocked. */
1806         shared(ManualEvent) m_readyForWriteLock;
1807 
1808         /** The underlying mutex that gates the access to the shared state. */
1809         Mutex m_counterMutex;
1810     }
1811 
1812     this(Policy policy)
1813     {
1814         m_policy = policy;
1815         m_counterMutex = new Mutex();
1816         m_readyForReadLock  = createSharedManualEvent();
1817         m_readyForWriteLock = createSharedManualEvent();
1818     }
1819 
1820     @disable this(this);
1821 
1822     /** The policy with which the lock has been created. */
1823     @property policy() const { return m_policy; }
1824 
1825     version(RWMutexPrint)
1826     {
1827         /** Print out debug information during lock operations. */
1828         void printInfo(string OP, LockingIntent INTENT)() nothrow
1829         {
1830         	import std..string;
1831             try
1832             {
1833                 import std.stdio;
1834                 writefln("RWMutex: %s (%s), active: RO: %d, RW: %d; waiting: RO: %d, RW: %d",
1835                     OP.leftJustify(10,' '),
1836                     INTENT == LockingIntent.READ_ONLY ? "RO" : "RW",
1837                     m_activeReadLocks,    m_activeWriteLocks,
1838                     m_waitingForReadLock, m_waitingForWriteLock
1839                     );
1840             }
1841             catch (Throwable t){}
1842         }
1843     }
1844 
1845     /** An internal shortcut method to determine the queue event for a given intent. */
1846     @property ref auto queueEvent(LockingIntent INTENT)()
1847     {
1848         static if (INTENT == LockingIntent.READ_ONLY)
1849             return m_readyForReadLock;
1850         else
1851             return m_readyForWriteLock;
1852     }
1853 
1854     /** An internal shortcut method to determine the queue counter for a given intent. */
1855     @property ref auto queueCounter(LockingIntent INTENT)()
1856     {
1857         static if (INTENT == LockingIntent.READ_ONLY)
1858             return m_waitingForReadLock;
1859         else
1860             return m_waitingForWriteLock;
1861     }
1862 
1863     /** An internal shortcut method to determine the current emitCount of the queue counter for a given intent. */
1864     int emitCount(LockingIntent INTENT)()
1865     {
1866         return queueEvent!INTENT.emitCount();
1867     }
1868 
1869     /** An internal shortcut method to determine the active counter for a given intent. */
1870     @property ref auto activeCounter(LockingIntent INTENT)()
1871     {
1872         static if (INTENT == LockingIntent.READ_ONLY)
1873             return m_activeReadLocks;
1874         else
1875             return m_activeWriteLocks;
1876     }
1877 
1878     /** An internal shortcut method to wait for the queue event for a given intent.
1879      *
1880      *  This method is used during the `lock()` operation, after a
1881      *  `tryLock()` operation has been unsuccessfully finished.
1882      *  The active fiber will yield and be suspended until the queue event
1883      *  for the given intent will be fired.
1884      */
1885     int wait(LockingIntent INTENT)(int count)
1886     {
1887         static if (INTERRUPTIBLE)
1888             return queueEvent!INTENT.wait(count);
1889         else
1890             return queueEvent!INTENT.waitUninterruptible(count);
1891     }
1892 
1893     /** An internal shortcut method to notify tasks waiting for the lock to become available again.
1894      *
1895      *  This method is called whenever the number of owners of the mutex hits
1896      *  zero; this is basically the counterpart to `wait()`.
1897      *  It wakes any Task currently waiting for the mutex to be released.
1898      */
1899     @trusted void notify(LockingIntent INTENT)()
1900     {
1901         static if (INTENT == LockingIntent.READ_ONLY)
1902         { //If the last reader unlocks the mutex, notify all waiting writers
1903             if (atomicLoad(m_waitingForWriteLock) > 0)
1904                 m_readyForWriteLock.emit();
1905         }
1906         else
1907         { //If a writer unlocks the mutex, notify both readers and writers
1908             if (atomicLoad(m_waitingForReadLock) > 0)
1909                 m_readyForReadLock.emit();
1910 
1911             if (atomicLoad(m_waitingForWriteLock) > 0)
1912                 m_readyForWriteLock.emit();
1913         }
1914     }
1915 
1916     /** An internal method that performs the acquisition attempt in different variations.
1917      *
1918      *  Since both locks rely on a common TaskMutex object which gates the access
1919      *  to their common data acquisition attempts for this lock are more complex
1920      *  than for simple mutex variants. This method will thus be performing the
1921      *  `tryLock()` operation in two variations, depending on the callee:
1922      *
1923      *  If called from the outside ($(D WAIT_FOR_BLOCKING_MUTEX) = false), the method
1924      *  will instantly fail if the underlying mutex is locked (i.e. during another
1925      *  `tryLock()` or `unlock()` operation), in order to guarantee the fastest
1926      *  possible locking attempt.
1927      *
1928      *  If used internally by the `lock()` method ($(D WAIT_FOR_BLOCKING_MUTEX) = true),
1929      *  the operation will wait for the mutex to be available before deciding if
1930      *  the lock can be acquired, since the attempt would anyway be repeated until
1931      *  it succeeds. This will prevent frequent retries under heavy loads and thus
1932      *  should ensure better performance.
1933      */
1934     @trusted bool tryLock(LockingIntent INTENT, bool WAIT_FOR_BLOCKING_MUTEX)()
1935     {
1936         //Log a debug statement for the attempt
1937         version(RWMutexPrint)
1938             printInfo!("tryLock",INTENT)();
1939 
1940         //Try to acquire the lock
1941         static if (!WAIT_FOR_BLOCKING_MUTEX)
1942         {
1943             if (!m_counterMutex.tryLock())
1944                 return false;
1945         }
1946         else
1947             m_counterMutex.lock();
1948 
1949         scope(exit)
1950             m_counterMutex.unlock();
1951 
1952         //Log a debug statement for the attempt
1953         version(RWMutexPrint)
1954             printInfo!("checkCtrs",INTENT)();
1955 
1956         //Check if there's already an active writer
1957         if (m_activeWriteLocks > 0)
1958             return false;
1959 
1960         //If writers are preferred over readers, check whether there
1961         //currently is a writer in the waiting queue and abort if
1962         //that's the case.
1963         static if (INTENT == LockingIntent.READ_ONLY)
1964             if (m_policy.PREFER_WRITERS && m_waitingForWriteLock > 0)
1965                 return false;
1966 
1967         //If we are locking the mutex for writing, make sure that
1968         //there's no reader active.
1969         static if (INTENT == LockingIntent.READ_WRITE)
1970             if (m_activeReadLocks > 0)
1971                 return false;
1972 
1973         //We can successfully acquire the lock!
1974         //Log a debug statement for the success.
1975         version(RWMutexPrint)
1976             printInfo!("lock",INTENT)();
1977 
1978         //Increase the according counter
1979         //(number of active readers/writers)
1980         //and return a success code.
1981         activeCounter!INTENT += 1;
1982         return true;
1983     }
1984 
1985     /** Attempt to acquire the lock for a given intent.
1986      *
1987      *  Returns:
1988      *      `true`, if the lock was successfully acquired;
1989      *      `false` otherwise.
1990      */
1991     @trusted bool tryLock(LockingIntent INTENT)()
1992     {
1993         //Try to lock this mutex without waiting for the underlying
1994         //TaskMutex - fail if it is already blocked.
1995         return tryLock!(INTENT,false)();
1996     }
1997 
1998     /** Acquire the lock for the given intent; yield and suspend until the lock has been acquired. */
1999     @trusted void lock(LockingIntent INTENT)()
2000     {
2001         //Prepare a waiting action before the first
2002         //`tryLock()` call in order to avoid a race
2003         //condition that could lead to the queue notification
2004         //not being fired.
2005         auto count = emitCount!INTENT;
2006         atomicOp!"+="(queueCounter!INTENT,1);
2007         scope(exit)
2008             atomicOp!"-="(queueCounter!INTENT,1);
2009 
2010         //Try to lock the mutex
2011         auto locked = tryLock!(INTENT,true)();
2012         if (locked)
2013             return;
2014 
2015         //Retry until we successfully acquired the lock
2016         while(!locked)
2017         {
2018             version(RWMutexPrint)
2019                 printInfo!("wait",INTENT)();
2020 
2021             count  = wait!INTENT(count);
2022             locked = tryLock!(INTENT,true)();
2023         }
2024     }
2025 
2026     /** Unlock the mutex after a successful acquisition. */
2027     @trusted void unlock(LockingIntent INTENT)()
2028     {
2029         version(RWMutexPrint)
2030             printInfo!("unlock",INTENT)();
2031 
2032         debug assert(activeCounter!INTENT > 0);
2033 
2034         synchronized(m_counterMutex)
2035         {
2036             //Decrement the counter of active lock holders.
2037             //If the counter hits zero, notify waiting Tasks
2038             activeCounter!INTENT -= 1;
2039             if (activeCounter!INTENT == 0)
2040             {
2041                 version(RWMutexPrint)
2042                     printInfo!("notify",INTENT)();
2043 
2044                 notify!INTENT();
2045             }
2046         }
2047     }
2048 }
2049 
2050 /** A ReadWriteMutex implementation for fibers.
2051  *
2052  *  This mutex can be used in exchange for a $(D core.sync.mutex.ReadWriteMutex),
2053  *  but does not block the event loop in contention situations. The `reader` and `writer`
2054  *  members are used for locking. Locking the `reader` mutex allows access to multiple
2055  *  readers at once, while the `writer` mutex only allows a single writer to lock it at
2056  *  any given time. Locks on `reader` and `writer` are mutually exclusive (i.e. whenever a
2057  *  writer is active, no readers can be active at the same time, and vice versa).
2058  *
2059  *  Notice:
2060  *      Mutexes implemented by this class cannot be interrupted
2061  *      using $(D vibe.core.task.Task.interrupt()). The corresponding
2062  *      InterruptException will be deferred until the next blocking
2063  *      operation yields the event loop.
2064  *
2065  *      Use $(D InterruptibleTaskReadWriteMutex) as an alternative that can be
2066  *      interrupted.
2067  *
2068  *  cf. $(D core.sync.mutex.ReadWriteMutex)
2069  */
2070 final class TaskReadWriteMutex
2071 {
2072     private {
2073         alias State = ReadWriteMutexState!false;
2074         alias LockingIntent = State.LockingIntent;
2075         alias READ_ONLY  = LockingIntent.READ_ONLY;
2076         alias READ_WRITE = LockingIntent.READ_WRITE;
2077 
2078         /** The shared state used by the reader and writer mutexes. */
2079         State m_state;
2080     }
2081 
2082     /** The policy with which the mutex should operate.
2083      *
2084      *  The policy determines how the acquisition of the locks is
2085      *  performed and can be used to tune the mutex according to the
2086      *  underlying algorithm in which it is used.
2087      *
2088      *  According to the provided policy, the mutex will either favor
2089      *  reading or writing tasks and could potentially starve the
2090      *  respective opposite.
2091      *
2092      *  cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy)
2093      */
2094     alias Policy = State.Policy;
2095 
2096     /** A common baseclass for both of the provided mutexes.
2097      *
2098      *  The intent for the according mutex is specified through the
2099      *  $(D INTENT) template argument, which determines if a mutex is
2100      *  used for read or write locking.
2101      */
2102     final class Mutex(LockingIntent INTENT): core.sync.mutex.Mutex, Lockable
2103     {
2104         /** Try to lock the mutex. cf. $(D core.sync.mutex.Mutex) */
2105         override bool tryLock() { return m_state.tryLock!INTENT(); }
2106         /** Lock the mutex. cf. $(D core.sync.mutex.Mutex) */
2107         override void lock()    { m_state.lock!INTENT(); }
2108         /** Unlock the mutex. cf. $(D core.sync.mutex.Mutex) */
2109         override void unlock()  { m_state.unlock!INTENT(); }
2110     }
2111     alias Reader = Mutex!READ_ONLY;
2112     alias Writer = Mutex!READ_WRITE;
2113 
2114     Reader reader;
2115     Writer writer;
2116 
2117     this(Policy policy = Policy.PREFER_WRITERS)
2118     {
2119         m_state = State(policy);
2120         reader = new Reader();
2121         writer = new Writer();
2122     }
2123 
2124     /** The policy with which the lock has been created. */
2125     @property Policy policy() const { return m_state.policy; }
2126 }
2127 
2128 /** Alternative to $(D TaskReadWriteMutex) that supports interruption.
2129  *
2130  *  This class supports the use of $(D vibe.core.task.Task.interrupt()) while
2131  *  waiting in the `lock()` method.
2132  *
2133  *  cf. $(D core.sync.mutex.ReadWriteMutex)
2134  */
2135 final class InterruptibleTaskReadWriteMutex
2136 {
2137 	@safe:
2138 
2139     private {
2140         alias State = ReadWriteMutexState!true;
2141         alias LockingIntent = State.LockingIntent;
2142         alias READ_ONLY  = LockingIntent.READ_ONLY;
2143         alias READ_WRITE = LockingIntent.READ_WRITE;
2144 
2145         /** The shared state used by the reader and writer mutexes. */
2146         State m_state;
2147     }
2148 
2149     /** The policy with which the mutex should operate.
2150      *
2151      *  The policy determines how the acquisition of the locks is
2152      *  performed and can be used to tune the mutex according to the
2153      *  underlying algorithm in which it is used.
2154      *
2155      *  According to the provided policy, the mutex will either favor
2156      *  reading or writing tasks and could potentially starve the
2157      *  respective opposite.
2158      *
2159      *  cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy)
2160      */
2161     alias Policy = State.Policy;
2162 
2163     /** A common baseclass for both of the provided mutexes.
2164      *
2165      *  The intent for the according mutex is specified through the
2166      *  $(D INTENT) template argument, which determines if a mutex is
2167      *  used for read or write locking.
2168      *
2169      */
2170     final class Mutex(LockingIntent INTENT): core.sync.mutex.Mutex, Lockable
2171     {
2172         /** Try to lock the mutex. cf. $(D core.sync.mutex.Mutex) */
2173         override bool tryLock() { return m_state.tryLock!INTENT(); }
2174         /** Lock the mutex. cf. $(D core.sync.mutex.Mutex) */
2175         override void lock()    { m_state.lock!INTENT(); }
2176         /** Unlock the mutex. cf. $(D core.sync.mutex.Mutex) */
2177         override void unlock()  { m_state.unlock!INTENT(); }
2178     }
2179     alias Reader = Mutex!READ_ONLY;
2180     alias Writer = Mutex!READ_WRITE;
2181 
2182     Reader reader;
2183     Writer writer;
2184 
2185     this(Policy policy = Policy.PREFER_WRITERS)
2186     {
2187         m_state = State(policy);
2188         reader = new Reader();
2189         writer = new Writer();
2190     }
2191 
2192     /** The policy with which the lock has been created. */
2193     @property Policy policy() const { return m_state.policy; }
2194 }