1 /**
2 	Event loop compatible task synchronization facilities.
3 
4 	This module provides replacement primitives for the modules in `core.sync`
5 	that do not block vibe.d's event loop in their wait states. These should
6 	always be preferred over the ones in Druntime under usual circumstances.
7 
8 	Using a standard `Mutex` is possible as long as it is ensured that no event
9 	loop based functionality (I/O, task interaction or anything that implicitly
10 	calls `vibe.core.core.yield`) is executed within a section of code that is
11 	protected by the mutex. $(B Failure to do so may result in dead-locks and
12 	high-level race-conditions!)
13 
14 	Copyright: © 2012-2019 Sönke Ludwig
15 	Authors: Leonid Kramer, Sönke Ludwig, Manuel Frischknecht
16 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
17 */
18 module vibe.core.sync;
19 
20 import vibe.core.log : logDebugV, logTrace, logInfo;
21 import vibe.core.task;
22 
23 import core.atomic;
24 import core.sync.mutex;
25 import core.sync.condition;
26 import eventcore.core;
27 import std.exception;
28 import std.stdio;
29 import std.traits : ReturnType;
30 
31 
32 /** Creates a new signal that can be shared between fibers.
33 */
34 LocalManualEvent createManualEvent()
35 @safe nothrow {
36 	LocalManualEvent ret;
37 	ret.initialize();
38 	return ret;
39 }
40 /// ditto
41 shared(ManualEvent) createSharedManualEvent()
42 @trusted nothrow {
43 	shared(ManualEvent) ret;
44 	ret.initialize();
45 	return ret;
46 }
47 
48 
49 /** Performs RAII based locking/unlocking of a mutex.
50 
51 	Note that while `TaskMutex` can be used with D's built-in `synchronized`
52 	statement, `InterruptibleTaskMutex` cannot. This function provides a
53 	library based alternative that is suitable for use with all mutex types.
54 */
55 ScopedMutexLock!M scopedMutexLock(M)(M mutex, LockMode mode = LockMode.lock)
56 	if (is(M : Mutex) || is(M : Lockable))
57 {
58 	return ScopedMutexLock!M(mutex, mode);
59 }
60 
61 ///
62 unittest {
63 	import vibe.core.core : runWorkerTaskH;
64 
65 	__gshared int counter;
66 	__gshared TaskMutex mutex;
67 
68 	mutex = new TaskMutex;
69 
70 	Task[] tasks;
71 
72 	foreach (i; 0 .. 100) {
73 		tasks ~= runWorkerTaskH(() nothrow {
74 			auto l = scopedMutexLock(mutex);
75 			counter++;
76 		});
77 	}
78 
79 	foreach (t; tasks) t.join();
80 
81 	assert(counter == 100);
82 }
83 
84 unittest {
85 	scopedMutexLock(new Mutex);
86 	scopedMutexLock(new TaskMutex);
87 	scopedMutexLock(new InterruptibleTaskMutex);
88 }
89 
90 enum LockMode {
91 	lock,
92 	tryLock,
93 	defer
94 }
95 
96 interface Lockable {
97 	@safe:
98 	void lock();
99 	void unlock();
100 	bool tryLock();
101 }
102 
103 /** RAII lock for the Mutex class.
104 */
105 struct ScopedMutexLock(M)
106 	if (is(M : Mutex) || is(M : Lockable))
107 {
108 	@disable this(this);
109 	private {
110 		M m_mutex;
111 		bool m_locked;
112 		LockMode m_mode;
113 	}
114 
115 	this(M mutex, LockMode mode = LockMode.lock)
116 	{
117 		assert(mutex !is null);
118 		m_mutex = mutex;
119 
120 		final switch (mode) {
121 			case LockMode.lock: lock(); break;
122 			case LockMode.tryLock: tryLock(); break;
123 			case LockMode.defer: break;
124 		}
125 	}
126 
127 	~this()
128 	{
129 		if (m_locked) unlock();
130 	}
131 
132 	@property bool locked() const { return m_locked; }
133 
134 	void unlock()
135 	in { assert(this.locked); }
136 	do {
137 		assert(m_locked, "Unlocking unlocked scoped mutex lock");
138 		static if (is(typeof(m_mutex.unlock_nothrow())))
139 			m_mutex.unlock_nothrow();
140 		else
141 			m_mutex.unlock();
142 		m_locked = false;
143 	}
144 
145 	bool tryLock()
146 	in { assert(!this.locked); }
147 	do {
148 		static if (is(typeof(m_mutex.tryLock_nothrow())))
149 			return m_locked = m_mutex.tryLock_nothrow();
150 		else
151 			return m_locked = m_mutex.tryLock();
152 	}
153 
154 	void lock()
155 	in { assert(!this.locked); }
156 	do {
157 		m_locked = true;
158 		static if (is(typeof(m_mutex.lock_nothrow())))
159 			m_mutex.lock_nothrow();
160 		else
161 			m_mutex.lock();
162 	}
163 }
164 
165 
166 /*
167 	Only for internal use:
168 	Ensures that a mutex is locked while executing the given procedure.
169 
170 	This function works for all kinds of mutexes, in particular for
171 	$(D core.sync.mutex.Mutex), $(D TaskMutex) and $(D InterruptibleTaskMutex).
172 
173 	Returns:
174 		Returns the value returned from $(D PROC), if any.
175 */
176 /// private
177 ReturnType!PROC performLocked(alias PROC, MUTEX)(MUTEX mutex)
178 {
179 	auto l = scopedMutexLock(mutex);
180 	return PROC();
181 }
182 
183 ///
184 unittest {
185 	int protected_var = 0;
186 	auto mtx = new TaskMutex;
187 	mtx.performLocked!({
188 		protected_var++;
189 	});
190 }
191 
192 
193 /**
194 	Thread-local semaphore implementation for tasks.
195 
196 	When the semaphore runs out of concurrent locks, it will suspend. This class
197 	is used in `vibe.core.connectionpool` to limit the number of concurrent
198 	connections.
199 */
200 final class LocalTaskSemaphore
201 {
202 @safe:
203 
204 	// requires a queue
205 	import std.container.binaryheap;
206 	import std.container.array;
207 	//import vibe.utils.memory;
208 
209 	private {
210 		static struct ThreadWaiter {
211 			ubyte priority;
212 			uint seq;
213 		}
214 
215 		BinaryHeap!(Array!ThreadWaiter, asc) m_waiters;
216 		uint m_maxLocks;
217 		uint m_locks;
218 		uint m_seq;
219 		LocalManualEvent m_signal;
220 	}
221 
222 	this(uint max_locks) nothrow
223 	{
224 		m_maxLocks = max_locks;
225 		m_signal = createManualEvent();
226 	}
227 
228 	/// Maximum number of concurrent locks
229 	@property void maxLocks(uint max_locks) { m_maxLocks = max_locks; }
230 	/// ditto
231 	@property uint maxLocks() const nothrow { return m_maxLocks; }
232 
233 	/// Number of concurrent locks still available
234 	@property uint available() const { return m_maxLocks - m_locks; }
235 
236 	/** Try to acquire a lock.
237 
238 		If a lock cannot be acquired immediately, returns `false` and leaves the
239 		semaphore in its previous state.
240 
241 		Returns:
242 			`true` is returned $(I iff) the number of available locks is greater
243 			than one.
244 	*/
245 	bool tryLock()
246 	{
247 		if (available > 0)
248 		{
249 			m_locks++;
250 			return true;
251 		}
252 		return false;
253 	}
254 
255 	/** Acquires a lock.
256 
257 		Once the limit of concurrent locks is reached, this method will block
258 		until the number of locks drops below the limit.
259 	*/
260 	void lock(ubyte priority = 0)
261 	{
262 		import std.algorithm.comparison : min;
263 
264 		if (tryLock())
265 			return;
266 
267 		ThreadWaiter w;
268 		w.priority = priority;
269 		w.seq = min(0, m_seq - w.priority);
270 		if (++m_seq == uint.max)
271 			rewindSeq();
272 
273 		() @trusted { m_waiters.insert(w); } ();
274 
275 		while (true) {
276 			m_signal.waitUninterruptible();
277 			if (m_waiters.front.seq == w.seq && tryLock()) {
278 				return;
279 			}
280 		}
281 	}
282 
283 	/** Gives up an existing lock.
284 	*/
285 	void unlock()
286 	{
287 		assert(m_locks >= 1);
288 		m_locks--;
289 		if (m_waiters.length > 0)
290 			m_signal.emit(); // resume one
291 	}
292 
293 	// if true, a goes after b. ie. b comes out front()
294 	/// private
295 	static bool asc(ref ThreadWaiter a, ref ThreadWaiter b)
296 	{
297 		if (a.priority != b.priority)
298 			return a.priority < b.priority;
299 		return a.seq > b.seq;
300 	}
301 
302 	private void rewindSeq()
303 	@trusted {
304 		Array!ThreadWaiter waiters = m_waiters.release();
305 		ushort min_seq;
306 		import std.algorithm : min;
307 		foreach (ref waiter; waiters[])
308 			min_seq = min(waiter.seq, min_seq);
309 		foreach (ref waiter; waiters[])
310 			waiter.seq -= min_seq;
311 		m_waiters.assume(waiters);
312 	}
313 }
314 
315 
316 /**
317 	Mutex implementation for fibers.
318 
319 	This mutex type can be used in exchange for a core.sync.mutex.Mutex, but
320 	does not block the event loop when contention happens. Note that this
321 	mutex does not allow recursive locking.
322 
323 	Notice:
324 		Because this class is annotated nothrow, it cannot be interrupted
325 		using $(D vibe.core.task.Task.interrupt()). The corresponding
326 		$(D InterruptException) will be deferred until the next blocking
327 		operation yields the event loop.
328 
329 		Use $(D InterruptibleTaskMutex) as an alternative that can be
330 		interrupted.
331 
332 	See_Also: InterruptibleTaskMutex, RecursiveTaskMutex, core.sync.mutex.Mutex
333 */
334 final class TaskMutex : core.sync.mutex.Mutex, Lockable {
335 @safe:
336 	private TaskMutexImpl!false m_impl;
337 
338 	this(Object o) nothrow { m_impl.setup(); super(o); }
339 	this() nothrow { m_impl.setup(); }
340 
341 	override bool tryLock() nothrow { return m_impl.tryLock(); }
342 	override void lock() nothrow { m_impl.lock(); }
343 	override void unlock() nothrow { m_impl.unlock(); }
344 	bool lock(Duration timeout) nothrow { return m_impl.lock(timeout); }
345 }
346 
347 unittest {
348 	auto mutex = new TaskMutex;
349 
350 	{
351 		auto lock = scopedMutexLock(mutex);
352 		assert(lock.locked);
353 		assert(mutex.m_impl.m_locked);
354 
355 		auto lock2 = scopedMutexLock(mutex, LockMode.tryLock);
356 		assert(!lock2.locked);
357 	}
358 	assert(!mutex.m_impl.m_locked);
359 
360 	auto lock = scopedMutexLock(mutex, LockMode.tryLock);
361 	assert(lock.locked);
362 	lock.unlock();
363 	assert(!lock.locked);
364 
365 	synchronized(mutex){
366 		assert(mutex.m_impl.m_locked);
367 	}
368 	assert(!mutex.m_impl.m_locked);
369 
370 	mutex.performLocked!({
371 		assert(mutex.m_impl.m_locked);
372 	});
373 	assert(!mutex.m_impl.m_locked);
374 
375 	static if (__VERSION__ >= 2067) {
376 		with(mutex.scopedMutexLock) {
377 			assert(mutex.m_impl.m_locked);
378 		}
379 	}
380 }
381 
382 unittest { // test deferred throwing
383 	import vibe.core.core;
384 
385 	auto mutex = new TaskMutex;
386 	auto t1 = runTask({
387 		scope (failure) assert(false, "No exception expected in first task!");
388 		mutex.lock();
389 		scope (exit) mutex.unlock();
390 		sleep(20.msecs);
391 	});
392 
393 	auto t2 = runTask({
394 		mutex.lock();
395 		scope (exit) mutex.unlock();
396 		try {
397 			yield();
398 			assert(false, "Yield is supposed to have thrown an InterruptException.");
399 		} catch (InterruptException) {
400 			// as expected!
401 		} catch (Exception) {
402 			assert(false, "Only InterruptException supposed to be thrown!");
403 		}
404 	});
405 
406 	runTask({
407 		// mutex is now locked in first task for 20 ms
408 		// the second tasks is waiting in lock()
409 		t2.interrupt();
410 		t1.joinUninterruptible();
411 		t2.joinUninterruptible();
412 		assert(!mutex.m_impl.m_locked); // ensure that the scope(exit) has been executed
413 		exitEventLoop();
414 	});
415 
416 	runEventLoop();
417 }
418 
419 unittest {
420 	runMutexUnitTests!TaskMutex();
421 }
422 
423 
424 /**
425 	Alternative to $(D TaskMutex) that supports interruption.
426 
427 	This class supports the use of $(D vibe.core.task.Task.interrupt()) while
428 	waiting in the $(D lock()) method. However, because the interface is not
429 	$(D nothrow), it cannot be used as an object monitor.
430 
431 	See_Also: $(D TaskMutex), $(D InterruptibleRecursiveTaskMutex)
432 */
433 final class InterruptibleTaskMutex : Lockable {
434 @safe:
435 
436 	private TaskMutexImpl!true m_impl;
437 
438 	this()
439 	{
440 		m_impl.setup();
441 
442 		// detects invalid usage within synchronized(...)
443 		() @trusted { this.__monitor = cast(void*)&NoUseMonitor.instance(); } ();
444 	}
445 
446 	bool tryLock() nothrow { return m_impl.tryLock(); }
447 	void lock() { m_impl.lock(); }
448 	void unlock() nothrow { m_impl.unlock(); }
449 }
450 
451 unittest {
452 	runMutexUnitTests!InterruptibleTaskMutex();
453 }
454 
455 
456 
457 /**
458 	Recursive mutex implementation for tasks.
459 
460 	This mutex type can be used in exchange for a `core.sync.mutex.Mutex`, but
461 	does not block the event loop when contention happens.
462 
463 	Notice:
464 		Because this class is annotated `nothrow`, it cannot be interrupted
465 		using $(D vibe.core.task.Task.interrupt()). The corresponding
466 		$(D InterruptException) will be deferred until the next blocking
467 		operation yields the event loop.
468 
469 		Use $(D InterruptibleRecursiveTaskMutex) as an alternative that can be
470 		interrupted.
471 
472 	See_Also: `TaskMutex`, `core.sync.mutex.Mutex`
473 */
474 final class RecursiveTaskMutex : core.sync.mutex.Mutex, Lockable {
475 @safe:
476 
477 	private RecursiveTaskMutexImpl!false m_impl;
478 
479 	this(Object o) { m_impl.setup(); super(o); }
480 	this() { m_impl.setup(); }
481 
482 	override bool tryLock() nothrow { return m_impl.tryLock(); }
483 	override void lock() { m_impl.lock(); }
484 	override void unlock() nothrow { m_impl.unlock(); }
485 }
486 
487 unittest {
488 	runMutexUnitTests!RecursiveTaskMutex();
489 }
490 
491 
492 /**
493 	Alternative to $(D RecursiveTaskMutex) that supports interruption.
494 
495 	This class supports the use of $(D vibe.core.task.Task.interrupt()) while
496 	waiting in the $(D lock()) method. However, because the interface is not
497 	$(D nothrow), it cannot be used as an object monitor.
498 
499 	See_Also: $(D RecursiveTaskMutex), $(D InterruptibleTaskMutex)
500 */
501 final class InterruptibleRecursiveTaskMutex : Lockable {
502 @safe:
503 	private RecursiveTaskMutexImpl!true m_impl;
504 
505 	this()
506 	{
507 		m_impl.setup();
508 
509 		// detects invalid usage within synchronized(...)
510 		() @trusted { this.__monitor = cast(void*)&NoUseMonitor.instance(); } ();
511 	}
512 
513 	bool tryLock() nothrow { return m_impl.tryLock(); }
514 	void lock() { m_impl.lock(); }
515 	void unlock() nothrow { m_impl.unlock(); }
516 }
517 
518 unittest {
519 	runMutexUnitTests!InterruptibleRecursiveTaskMutex();
520 }
521 
522 
523 // Helper class to ensure that the non Object.Monitor compatible interruptible
524 // mutex classes are not accidentally used with the `synchronized` statement
525 private final class NoUseMonitor : Object.Monitor {
526 	private static shared Proxy st_instance;
527 
528     static struct Proxy {
529         Object.Monitor monitor;
530     }
531 
532 	static @property ref shared(Proxy) instance()
533 	@safe nothrow {
534 		static shared(Proxy)* inst = null;
535 		if (inst) return *inst;
536 
537 		() @trusted { // synchronized {} not @safe for DMD <= 2.078.3
538 			synchronized {
539 				if (!st_instance.monitor)
540 					st_instance.monitor = new shared NoUseMonitor;
541 				inst = &st_instance;
542 			}
543 		} ();
544 
545 		return *inst;
546 	}
547 
548 	override void lock() @safe @nogc nothrow {
549 		assert(false, "Interruptible task mutexes cannot be used with synchronized(), use scopedMutexLock instead.");
550 	}
551 
552 	override void unlock() @safe @nogc nothrow {}
553 }
554 
555 
556 private void runMutexUnitTests(M)()
557 {
558 	import vibe.core.core;
559 
560 	auto m = new M;
561 	Task t1, t2;
562 	void runContendedTasks(bool interrupt_t1, bool interrupt_t2) {
563 		assert(!m.m_impl.m_locked);
564 
565 		// t1 starts first and acquires the mutex for 20 ms
566 		// t2 starts second and has to wait in m.lock()
567 		t1 = runTask(() nothrow {
568 			assert(!m.m_impl.m_locked);
569 			try m.lock();
570 			catch (Exception e) assert(false, e.msg);
571 			assert(m.m_impl.m_locked);
572 			if (interrupt_t1) {
573 				try assertThrown!InterruptException(sleep(100.msecs));
574 				catch (Exception e) assert(false, e.msg);
575 			} else assertNotThrown(sleep(20.msecs));
576 			m.unlock();
577 		});
578 		t2 = runTask(() nothrow {
579 			assert(!m.tryLock());
580 			if (interrupt_t2) {
581 				try m.lock();
582 				catch (InterruptException) return;
583 				catch (Exception e) assert(false, e.msg);
584 				try yield(); // rethrows any deferred exceptions
585 				catch (InterruptException) {
586 					m.unlock();
587 					return;
588 				}
589 				catch (Exception e) assert(false, e.msg);
590 				assert(false, "Supposed to have thrown an InterruptException.");
591 			} else assertNotThrown(m.lock());
592 			assert(m.m_impl.m_locked);
593 			try sleep(20.msecs);
594 			catch (Exception e) assert(false, e.msg);
595 			m.unlock();
596 			assert(!m.m_impl.m_locked);
597 		});
598 	}
599 
600 	// basic lock test
601 	m.performLocked!({
602 		assert(m.m_impl.m_locked);
603 	});
604 	assert(!m.m_impl.m_locked);
605 
606 	// basic contention test
607 	runContendedTasks(false, false);
608 	auto t3 = runTask({
609 		assert(t1.running && t2.running);
610 		assert(m.m_impl.m_locked);
611 		t1.joinUninterruptible();
612 		assert(!t1.running && t2.running);
613 		try yield(); // give t2 a chance to take the lock
614 		catch (Exception e) assert(false, e.msg);
615 		assert(m.m_impl.m_locked);
616 		t2.joinUninterruptible();
617 		assert(!t2.running);
618 		assert(!m.m_impl.m_locked);
619 		exitEventLoop();
620 	});
621 	runEventLoop();
622 	assert(!t3.running);
623 	assert(!m.m_impl.m_locked);
624 
625 	// interruption test #1
626 	runContendedTasks(true, false);
627 	t3 = runTask({
628 		assert(t1.running && t2.running);
629 		assert(m.m_impl.m_locked);
630 		t1.interrupt();
631 		t1.joinUninterruptible();
632 		assert(!t1.running && t2.running);
633 		try yield(); // give t2 a chance to take the lock
634 		catch (Exception e) assert(false, e.msg);
635 		assert(m.m_impl.m_locked);
636 		t2.joinUninterruptible();
637 		assert(!t2.running);
638 		assert(!m.m_impl.m_locked);
639 		exitEventLoop();
640 	});
641 	runEventLoop();
642 	assert(!t3.running);
643 	assert(!m.m_impl.m_locked);
644 
645 	// interruption test #2
646 	runContendedTasks(false, true);
647 	t3 = runTask({
648 		assert(t1.running && t2.running);
649 		assert(m.m_impl.m_locked);
650 		t2.interrupt();
651 		t2.joinUninterruptible();
652 		assert(!t2.running);
653 		static if (is(M == InterruptibleTaskMutex) || is (M == InterruptibleRecursiveTaskMutex))
654 			assert(t1.running && m.m_impl.m_locked);
655 		t1.joinUninterruptible();
656 		assert(!t1.running);
657 		assert(!m.m_impl.m_locked);
658 		exitEventLoop();
659 	});
660 	runEventLoop();
661 	assert(!t3.running);
662 	assert(!m.m_impl.m_locked);
663 }
664 
665 
666 /**
667 	Event loop based condition variable or "event" implementation.
668 
669 	This class can be used in exchange for a $(D core.sync.condition.Condition)
670 	to avoid blocking the event loop when waiting.
671 
672 	Notice:
673 		Because this class is annotated nothrow, it cannot be interrupted
674 		using $(D vibe.core.task.Task.interrupt()). The corresponding
675 		$(D InterruptException) will be deferred until the next blocking
676 		operation yields to the event loop.
677 
678 		Use $(D InterruptibleTaskCondition) as an alternative that can be
679 		interrupted.
680 
681 		Note that it is generally not safe to use a `TaskCondition` together with an
682 		interruptible mutex type.
683 
684 	See_Also: `InterruptibleTaskCondition`
685 */
686 final class TaskCondition : core.sync.condition.Condition {
687 @safe:
688 
689 	private TaskConditionImpl!(false, Mutex) m_impl;
690 
691 	this(core.sync.mutex.Mutex mtx)
692 	nothrow {
693 		assert(mtx.classinfo is Mutex.classinfo || mtx.classinfo is TaskMutex.classinfo,
694 			"TaskCondition can only be used with Mutex or TaskMutex");
695 
696 		m_impl.setup(mtx);
697 		super(mtx);
698 	}
699 	override @property Mutex mutex() nothrow { return m_impl.mutex; }
700 	override void wait() nothrow { m_impl.wait(); }
701 	override bool wait(Duration timeout) nothrow { return m_impl.wait(timeout); }
702 	override void notify() nothrow { m_impl.notify(); }
703 	override void notifyAll() nothrow  { m_impl.notifyAll(); }
704 }
705 
706 unittest {
707 	new TaskCondition(new Mutex);
708 	new TaskCondition(new TaskMutex);
709 }
710 
711 
712 /** This example shows the typical usage pattern using a `while` loop to make
713 	sure that the final condition is reached.
714 */
715 unittest {
716 	import vibe.core.core;
717 	import vibe.core.log;
718 
719 	__gshared Mutex mutex;
720 	__gshared TaskCondition condition;
721 	__gshared int workers_still_running = 0;
722 
723 	// setup the task condition
724 	mutex = new Mutex;
725 	condition = new TaskCondition(mutex);
726 
727 	logDebug("SETTING UP TASKS");
728 
729 	// start up the workers and count how many are running
730 	foreach (i; 0 .. 4) {
731 		workers_still_running++;
732 		runWorkerTask(() nothrow {
733 			// simulate some work
734 			try sleep(100.msecs);
735 			catch (Exception e) {}
736 
737 			// notify the waiter that we're finished
738 			{
739 				auto l = scopedMutexLock(mutex);
740 				workers_still_running--;
741 			logDebug("DECREMENT %s", workers_still_running);
742 			}
743 			logDebug("NOTIFY");
744 			condition.notify();
745 		});
746 	}
747 
748 	logDebug("STARTING WAIT LOOP");
749 
750 	// wait until all tasks have decremented the counter back to zero
751 	synchronized (mutex) {
752 		while (workers_still_running > 0) {
753 			logDebug("STILL running %s", workers_still_running);
754 			condition.wait();
755 		}
756 	}
757 }
758 
759 
760 /**
761 	Alternative to `TaskCondition` that supports interruption.
762 
763 	This class supports the use of `vibe.core.task.Task.interrupt()` while
764 	waiting in the `wait()` method.
765 
766 	See `TaskCondition` for an example.
767 
768 	Notice:
769 		Note that it is generally not safe to use an
770 		`InterruptibleTaskCondition` together with an interruptible mutex type.
771 
772 	See_Also: `TaskCondition`
773 */
774 final class InterruptibleTaskCondition {
775 @safe:
776 
777 	private TaskConditionImpl!(true, Lockable) m_impl;
778 
779 	this(M)(M mutex)
780 		if (is(M : Mutex) || is (M : Lockable))
781 	{
782 		static if (is(M : Lockable))
783 			m_impl.setup(mutex);
784 		else
785 			m_impl.setupForMutex(mutex);
786 	}
787 
788 	@property Lockable mutex() { return m_impl.mutex; }
789 	void wait() { m_impl.wait(); }
790 	bool wait(Duration timeout) { return m_impl.wait(timeout); }
791 	void notify() nothrow { m_impl.notify(); }
792 	void notifyAll() nothrow { m_impl.notifyAll(); }
793 }
794 
795 unittest {
796 	new InterruptibleTaskCondition(new Mutex);
797 	new InterruptibleTaskCondition(new TaskMutex);
798 	new InterruptibleTaskCondition(new InterruptibleTaskMutex);
799 }
800 
801 
802 /** A manually triggered single threaded cross-task event.
803 
804 	Note: the ownership can be shared between multiple fibers of the same thread.
805 */
806 struct LocalManualEvent {
807 	import core.thread : Thread;
808 	import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny;
809 
810 	@safe:
811 
812 	private {
813 		alias Waiter = ThreadLocalWaiter!false;
814 
815 		Waiter m_waiter;
816 	}
817 
818 	// thread destructor in vibe.core.core will decrement the ref. count
819 	package static EventID ms_threadEvent;
820 
821 	private void initialize()
822 	nothrow {
823 		import vibe.internal.allocator : Mallocator, makeGCSafe;
824 		m_waiter = () @trusted { return Mallocator.instance.makeGCSafe!Waiter; } ();
825 	}
826 
827 	this(this)
828 	nothrow {
829 		if (m_waiter)
830 			return m_waiter.addRef();
831 	}
832 
833 	~this()
834 	nothrow {
835 		import vibe.internal.allocator : Mallocator, disposeGCSafe;
836 		if (m_waiter) {
837 			if (!m_waiter.releaseRef()) {
838 				static if (__VERSION__ < 2087) scope (failure) assert(false);
839 				() @trusted { Mallocator.instance.disposeGCSafe(m_waiter); } ();
840 			}
841 		}
842 	}
843 
844 	bool opCast (T : bool) () const nothrow { return m_waiter !is null; }
845 
846 	/// A counter that is increased with every emit() call
847 	int emitCount() const nothrow { return m_waiter.m_emitCount; }
848 
849 	/// Emits the signal, waking up all owners of the signal.
850 	int emit()
851 	nothrow {
852 		assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()");
853 		logTrace("unshared emit");
854 		auto ec = m_waiter.m_emitCount++;
855 		m_waiter.emit();
856 		return ec;
857 	}
858 
859 	/// Emits the signal, waking up a single owners of the signal.
860 	int emitSingle()
861 	nothrow {
862 		assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()");
863 		logTrace("unshared single emit");
864 		auto ec = m_waiter.m_emitCount++;
865 		m_waiter.emitSingle();
866 		return ec;
867 	}
868 
869 	/** Acquires ownership and waits until the signal is emitted.
870 
871 		Note that in order not to miss any emits it is necessary to use the
872 		overload taking an integer.
873 
874 		Throws:
875 			May throw an $(D InterruptException) if the task gets interrupted
876 			using $(D Task.interrupt()).
877 	*/
878 	int wait() { return wait(this.emitCount); }
879 
880 	/** Acquires ownership and waits until the signal is emitted and the emit
881 		count is larger than a given one.
882 
883 		Throws:
884 			May throw an $(D InterruptException) if the task gets interrupted
885 			using $(D Task.interrupt()).
886 	*/
887 	int wait(int emit_count) { return doWait!true(Duration.max, emit_count); }
888 	/// ditto
889 	int wait(Duration timeout, int emit_count) { return doWait!true(timeout, emit_count); }
890 
891 	/** Same as $(D wait), but defers throwing any $(D InterruptException).
892 
893 		This method is annotated $(D nothrow) at the expense that it cannot be
894 		interrupted.
895 	*/
896 	int waitUninterruptible() nothrow { return waitUninterruptible(this.emitCount); }
897 	/// ditto
898 	int waitUninterruptible(int emit_count) nothrow { return doWait!false(Duration.max, emit_count); }
899 	/// ditto
900 	int waitUninterruptible(Duration timeout, int emit_count) nothrow { return doWait!false(timeout, emit_count); }
901 
902 	private int doWait(bool interruptible)(Duration timeout, int emit_count)
903 	{
904 		import core.time : MonoTime;
905 
906 		assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()");
907 
908 		MonoTime target_timeout, now;
909 		if (timeout != Duration.max) {
910 			try now = MonoTime.currTime();
911 			catch (Exception e) { assert(false, e.msg); }
912 			target_timeout = now + timeout;
913 		}
914 
915 		while (m_waiter.m_emitCount - emit_count <= 0) {
916 			m_waiter.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max);
917 			try now = MonoTime.currTime();
918 			catch (Exception e) { assert(false, e.msg); }
919 			if (now >= target_timeout) break;
920 		}
921 
922 		return m_waiter.m_emitCount;
923 	}
924 }
925 
926 unittest {
927 	import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep;
928 
929 	auto e = createManualEvent();
930 	auto w1 = runTask({ e.waitUninterruptible(100.msecs, e.emitCount); });
931 	auto w2 = runTask({ e.waitUninterruptible(500.msecs, e.emitCount); });
932 	runTask({
933 		try sleep(50.msecs);
934 		catch (Exception e) assert(false, e.msg);
935 		e.emit();
936 		try sleep(50.msecs);
937 		catch (Exception e) assert(false, e.msg);
938 		assert(!w1.running && !w2.running);
939 		exitEventLoop();
940 	});
941 	runEventLoop();
942 }
943 
944 unittest {
945 	import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep;
946 	auto e = createManualEvent();
947 	// integer overflow test
948 	e.m_waiter.m_emitCount = int.max;
949 	auto w1 = runTask({ e.waitUninterruptible(50.msecs, e.emitCount); });
950 	runTask({
951 		try sleep(5.msecs);
952 		catch (Exception e) assert(false, e.msg);
953 		e.emit();
954 		try sleep(50.msecs);
955 		catch (Exception e) assert(false, e.msg);
956 		assert(!w1.running);
957 		exitEventLoop();
958 	});
959 	runEventLoop();
960 }
961 
962 unittest { // ensure that cancelled waiters are properly handled and that a FIFO order is implemented
963 	import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep;
964 
965 	LocalManualEvent l = createManualEvent();
966 
967 	Task t2;
968 	runTask({
969 		l.waitUninterruptible();
970 		t2.interrupt();
971 		try sleep(20.msecs);
972 		catch (Exception e) assert(false, e.msg);
973 		exitEventLoop();
974 	});
975 	t2 = runTask({
976 		try {
977 			l.wait();
978 			assert(false, "Shouldn't reach this.");
979 		}
980 		catch (InterruptException e) {}
981 		catch (Exception e) assert(false, e.msg);
982 	});
983 	runTask({
984 		l.emit();
985 	});
986 	runEventLoop();
987 }
988 
989 unittest { // ensure that LocalManualEvent behaves correctly after being copied
990 	import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep;
991 
992 	LocalManualEvent l = createManualEvent();
993 	runTask(() nothrow {
994 		auto lc = l;
995 		try sleep(100.msecs);
996 		catch (Exception e) assert(false, e.msg);
997 		lc.emit();
998 	});
999 	runTask({
1000 		assert(l.waitUninterruptible(1.seconds, l.emitCount));
1001 		exitEventLoop();
1002 	});
1003 	runEventLoop();
1004 }
1005 
1006 
1007 /** A manually triggered multi threaded cross-task event.
1008 
1009 	Note: the ownership can be shared between multiple fibers and threads.
1010 */
1011 struct ManualEvent {
1012 	import core.thread : Thread;
1013 	import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny;
1014 	import vibe.internal.list : StackSList;
1015 
1016 	@safe:
1017 
1018 	private {
1019 		alias ThreadWaiter = ThreadLocalWaiter!true;
1020 
1021 		int m_emitCount;
1022 		static struct Waiters {
1023 			StackSList!ThreadWaiter active; // actively waiting
1024 			StackSList!ThreadWaiter free; // free-list of reusable waiter structs
1025 		}
1026 		Monitor!(Waiters, shared(Mutex)) m_waiters;
1027 	}
1028 
1029 	// thread destructor in vibe.core.core will decrement the ref. count
1030 	package static EventID ms_threadEvent;
1031 
1032 	enum EmitMode {
1033 		single,
1034 		all
1035 	}
1036 
1037 	@disable this(this);
1038 
1039 	private void initialize()
1040 	shared nothrow {
1041 		m_waiters = createMonitor!(ManualEvent.Waiters)(new shared Mutex);
1042 	}
1043 
1044 	deprecated("ManualEvent is always non-null!")
1045 	bool opCast (T : bool) () const shared nothrow { return true; }
1046 
1047 	/// A counter that is increased with every emit() call
1048 	int emitCount() const shared nothrow @trusted { return atomicLoad(m_emitCount); }
1049 
1050 	/// Emits the signal, waking up all owners of the signal.
1051 	int emit()
1052 	shared nothrow @trusted {
1053 		import core.atomic : atomicOp, cas;
1054 
1055 		debug (VibeMutexLog) () @trusted { logTrace("emit shared %s", cast(void*)&this); } ();
1056 
1057 		auto ec = atomicOp!"+="(m_emitCount, 1);
1058 		auto thisthr = Thread.getThis();
1059 
1060 		ThreadWaiter lw;
1061 		auto drv = eventDriver;
1062 		m_waiters.lock.active.filter((ThreadWaiter w) {
1063 			debug (VibeMutexLog) () @trusted { logTrace("waiter %s", cast(void*)w); } ();
1064 			if (w.m_driver is drv) {
1065 				lw = w;
1066 				lw.addRef();
1067 			} else {
1068 				try {
1069 					assert(w.m_event != EventID.init);
1070 					() @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true);
1071 				} catch (Exception e) assert(false, e.msg);
1072 			}
1073 			return true;
1074 		});
1075 		debug (VibeMutexLog) () @trusted { logTrace("lw %s", cast(void*)lw); } ();
1076 		if (lw) {
1077 			lw.emit();
1078 			releaseWaiter(lw);
1079 		}
1080 
1081 		debug (VibeMutexLog) logTrace("emit shared done");
1082 
1083 		return ec;
1084 	}
1085 
1086 	/// Emits the signal, waking up at least one waiting task
1087 	int emitSingle()
1088 	shared nothrow @trusted {
1089 		import core.atomic : atomicOp, cas;
1090 
1091 		() @trusted { logTrace("emit shared single %s", cast(void*)&this); } ();
1092 
1093 		auto ec = atomicOp!"+="(m_emitCount, 1);
1094 		auto thisthr = Thread.getThis();
1095 
1096 		ThreadWaiter lw;
1097 		auto drv = eventDriver;
1098 		m_waiters.lock.active.iterate((ThreadWaiter w) {
1099 			() @trusted { logTrace("waiter %s", cast(void*)w); } ();
1100 			if (w.m_driver is drv) {
1101 				if (w.unused) return true;
1102 				lw = w;
1103 				lw.addRef();
1104 			} else {
1105 				try {
1106 					assert(w.m_event != EventID.invalid);
1107 					() @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true);
1108 				} catch (Exception e) assert(false, e.msg);
1109 			}
1110 			return false;
1111 		});
1112 		() @trusted { logTrace("lw %s", cast(void*)lw); } ();
1113 		if (lw) {
1114 			lw.emitSingle();
1115 			releaseWaiter(lw);
1116 		}
1117 
1118 		logTrace("emit shared done");
1119 
1120 		return ec;
1121 	}
1122 
1123 	/** Acquires ownership and waits until the signal is emitted.
1124 
1125 		Note that in order not to miss any emits it is necessary to use the
1126 		overload taking an integer.
1127 
1128 		Throws:
1129 			May throw an $(D InterruptException) if the task gets interrupted
1130 			using $(D Task.interrupt()).
1131 	*/
1132 	int wait() shared { return wait(this.emitCount); }
1133 
1134 	/** Acquires ownership and waits until the emit count differs from the
1135 		given one or until a timeout is reached.
1136 
1137 		Throws:
1138 			May throw an $(D InterruptException) if the task gets interrupted
1139 			using $(D Task.interrupt()).
1140 	*/
1141 	int wait(int emit_count) shared { return doWaitShared!true(Duration.max, emit_count); }
1142 	/// ditto
1143 	int wait(Duration timeout, int emit_count) shared { return doWaitShared!true(timeout, emit_count); }
1144 
1145 	/** Same as $(D wait), but defers throwing any $(D InterruptException).
1146 
1147 		This method is annotated $(D nothrow) at the expense that it cannot be
1148 		interrupted.
1149 	*/
1150 	int waitUninterruptible() shared nothrow { return waitUninterruptible(this.emitCount); }
1151 	/// ditto
1152 	int waitUninterruptible(int emit_count) shared nothrow { return doWaitShared!false(Duration.max, emit_count); }
1153 	/// ditto
1154 	int waitUninterruptible(Duration timeout, int emit_count) shared nothrow { return doWaitShared!false(timeout, emit_count); }
1155 
1156 	private int doWaitShared(bool interruptible)(Duration timeout, int emit_count)
1157 	shared {
1158 		import core.time : MonoTime;
1159 
1160 		() @trusted { logTrace("wait shared %s", cast(void*)&this); } ();
1161 
1162 		if (ms_threadEvent is EventID.invalid) {
1163 			ms_threadEvent = eventDriver.events.create();
1164 			assert(ms_threadEvent != EventID.invalid, "Failed to create event!");
1165 		}
1166 
1167 		MonoTime target_timeout, now;
1168 		if (timeout != Duration.max) {
1169 			try now = MonoTime.currTime();
1170 			catch (Exception e) { assert(false, e.msg); }
1171 			target_timeout = now + timeout;
1172 		}
1173 
1174 		int ec = this.emitCount;
1175 
1176 		acquireThreadWaiter((scope ThreadWaiter w) {
1177 			while (ec - emit_count <= 0) {
1178 				w.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, ms_threadEvent, () => (this.emitCount - emit_count) > 0);
1179 				ec = this.emitCount;
1180 
1181 				if (timeout != Duration.max) {
1182 					try now = MonoTime.currTime();
1183 					catch (Exception e) { assert(false, e.msg); }
1184 					if (now >= target_timeout) break;
1185 				}
1186 			}
1187 		});
1188 
1189 		return ec;
1190 	}
1191 
1192 	private void acquireThreadWaiter(DEL)(scope DEL del)
1193 	shared {
1194 		import vibe.internal.allocator : processAllocator, makeGCSafe;
1195 
1196 		ThreadWaiter w;
1197 		auto drv = eventDriver;
1198 
1199 		with (m_waiters.lock) {
1200 			active.iterate((aw) {
1201 				if (aw.m_driver is drv) {
1202 					w = aw;
1203 					w.addRef();
1204 					return false;
1205 				}
1206 				return true;
1207 			});
1208 
1209 			if (!w) {
1210 				free.filter((fw) {
1211 					if (fw.m_driver is drv) {
1212 						w = fw;
1213 						w.addRef();
1214 						return false;
1215 					}
1216 					return true;
1217 				});
1218 
1219 				if (!w) {
1220 					() @trusted {
1221 						try {
1222 							w = processAllocator.makeGCSafe!ThreadWaiter;
1223 							w.m_driver = drv;
1224 							w.m_event = ms_threadEvent;
1225 						} catch (Exception e) {
1226 							assert(false, "Failed to allocate thread waiter.");
1227 						}
1228 					} ();
1229 				}
1230 
1231 				assert(w.m_refCount == 1);
1232 				active.add(w);
1233 			}
1234 		}
1235 
1236 		scope (exit) releaseWaiter(w);
1237 
1238 		del(w);
1239 	}
1240 
1241 	private void releaseWaiter(ThreadWaiter w)
1242 	shared nothrow {
1243 		if (!w.releaseRef()) {
1244 			assert(w.m_driver is eventDriver, "Waiter was reassigned a different driver!?");
1245 			assert(w.unused, "Waiter still used, but not referenced!?");
1246 			with (m_waiters.lock) {
1247 				auto rmvd = active.remove(w);
1248 				assert(rmvd, "Waiter not in active queue anymore!?");
1249 				free.add(w);
1250 				// TODO: cap size of m_freeWaiters
1251 			}
1252 		}
1253 	}
1254 }
1255 
1256 unittest {
1257 	import vibe.core.core : exitEventLoop, runEventLoop, runTask, runWorkerTaskH, sleep;
1258 
1259 	auto e = createSharedManualEvent();
1260 	auto w1 = runTask({ e.waitUninterruptible(100.msecs, e.emitCount); });
1261 	static void w(shared(ManualEvent)* e) { e.waitUninterruptible(500.msecs, e.emitCount); }
1262 	auto w2 = runWorkerTaskH(&w, &e);
1263 	runTask({
1264 		try sleep(50.msecs);
1265 		catch (Exception e) assert(false, e.msg);
1266 		e.emit();
1267 		try sleep(50.msecs);
1268 		catch (Exception e) assert(false, e.msg);
1269 		assert(!w1.running && !w2.running);
1270 		exitEventLoop();
1271 	});
1272 	runEventLoop();
1273 }
1274 
1275 unittest {
1276 	import vibe.core.core : runTask, runWorkerTaskH, setTimer, sleep;
1277 	import vibe.core.taskpool : TaskPool;
1278 	import core.time : msecs, usecs;
1279 	import std.concurrency : send, receiveOnly;
1280 	import std.random : uniform;
1281 
1282 	auto tpool = new shared TaskPool(4);
1283 	scope (exit) tpool.terminate();
1284 
1285 	static void test(shared(ManualEvent)* evt, Task owner)
1286 	nothrow {
1287 		try owner.tid.send(Task.getThis());
1288 		catch (Exception e) assert(false, e.msg);
1289 
1290 		int ec = evt.emitCount;
1291 		auto thist = Task.getThis();
1292 		auto tm = setTimer(500.msecs, { thist.interrupt(); }); // watchdog
1293 		scope (exit) tm.stop();
1294 		while (ec < 5_000) {
1295 			tm.rearm(500.msecs);
1296 			try sleep(uniform(0, 10_000).usecs);
1297 			catch (Exception e) assert(false, e.msg);
1298 			try if (uniform(0, 10) == 0) evt.emit();
1299 			catch (Exception e) assert(false, e.msg);
1300 			auto ecn = evt.waitUninterruptible(ec);
1301 			assert(ecn > ec);
1302 			ec = ecn;
1303 		}
1304 	}
1305 
1306 	auto watchdog = setTimer(30.seconds, { assert(false, "ManualEvent test has hung."); });
1307 	scope (exit) watchdog.stop();
1308 
1309 	auto e = createSharedManualEvent();
1310 	Task[] tasks;
1311 
1312 	runTask(() nothrow {
1313 		auto thist = Task.getThis();
1314 
1315 		// start 25 tasks in each thread
1316 		foreach (i; 0 .. 25) tpool.runTaskDist(&test, &e, thist);
1317 		// collect all task handles
1318 		try foreach (i; 0 .. 4*25) tasks ~= receiveOnly!Task;
1319 		catch (Exception e) assert(false, e.msg);
1320 
1321 		auto tm = setTimer(500.msecs, { thist.interrupt(); }); // watchdog
1322 		scope (exit) tm.stop();
1323 		int pec = 0;
1324 		while (e.emitCount < 5_000) {
1325 			tm.rearm(500.msecs);
1326 			try sleep(50.usecs);
1327 			catch (Exception e) assert(false, e.msg);
1328 			e.emit();
1329 		}
1330 
1331 		// wait for all worker tasks to finish
1332 		foreach (t; tasks) t.joinUninterruptible();
1333 	}).join();
1334 }
1335 
1336 package shared struct Monitor(T, M)
1337 {
1338 	alias Mutex = M;
1339 	alias Data = T;
1340 	private {
1341 		Mutex mutex;
1342 		Data data;
1343 	}
1344 
1345 	static struct Locked {
1346 		shared(Monitor)* m;
1347 		@disable this(this);
1348 		~this() {
1349 			() @trusted {
1350 				static if (is(typeof(Mutex.init.unlock_nothrow())))
1351 					(cast(Mutex)m.mutex).unlock_nothrow();
1352 				else (cast(Mutex)m.mutex).unlock();
1353 			} ();
1354 		}
1355 		ref inout(Data) get() inout @trusted { return *cast(inout(Data)*)&m.data; }
1356 		alias get this;
1357 	}
1358 
1359 	Locked lock() {
1360 		() @trusted {
1361 			static if (is(typeof(Mutex.init.lock_nothrow())))
1362 				(cast(Mutex)mutex).lock_nothrow();
1363 			else (cast(Mutex)mutex).lock();
1364 		} ();
1365 		return Locked(() @trusted { return &this; } ());
1366 	}
1367 
1368 	const(Locked) lock() const {
1369 		() @trusted {
1370 			static if (is(typeof(Mutex.init.lock_nothrow())))
1371 				(cast(Mutex)mutex).lock_nothrow();
1372 			else (cast(Mutex)mutex).lock();
1373 		} ();
1374 		return const(Locked)(() @trusted { return &this; } ());
1375 	}
1376 }
1377 
1378 
1379 package shared(Monitor!(T, M)) createMonitor(T, M)(M mutex)
1380 @trusted {
1381 	shared(Monitor!(T, M)) ret;
1382 	ret.mutex = cast(shared)mutex;
1383 	return ret;
1384 }
1385 
1386 
1387 private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) {
1388 	import vibe.internal.list : CircularDList;
1389 
1390 	private {
1391 		static struct TaskWaiter {
1392 			TaskWaiter* prev, next;
1393 			void delegate() @safe nothrow notifier;
1394 
1395 			void wait(void delegate() @safe nothrow del) @safe nothrow {
1396 				assert(notifier is null, "Local waiter is used twice!");
1397 				notifier = del;
1398 			}
1399 			void cancel() @safe nothrow { notifier = null; }
1400 			void emit() @safe nothrow { auto n = notifier; notifier = null; n(); }
1401 		}
1402 
1403 		static if (EVENT_TRIGGERED) {
1404 			package(vibe) ThreadLocalWaiter next; // queue of other waiters of the same thread
1405 			NativeEventDriver m_driver;
1406 			EventID m_event = EventID.invalid;
1407 		} else {
1408 			int m_emitCount = 0;
1409 		}
1410 		int m_refCount = 1;
1411 		TaskWaiter m_pivot;
1412 		TaskWaiter m_emitPivot;
1413 		CircularDList!(TaskWaiter*) m_waiters;
1414 	}
1415 
1416 	this()
1417 	{
1418 		m_waiters = CircularDList!(TaskWaiter*)(() @trusted { return &m_pivot; } ());
1419 	}
1420 
1421 	static if (EVENT_TRIGGERED) {
1422 		~this()
1423 		{
1424 			import vibe.core.internal.release : releaseHandle;
1425 
1426 			if (m_event != EventID.invalid)
1427 				releaseHandle!"events"(m_event, () @trusted { return cast(shared)m_driver; } ());
1428 		}
1429 	}
1430 
1431 	@property bool unused() const @safe nothrow { return m_waiters.empty; }
1432 
1433 	void addRef() @safe nothrow { assert(m_refCount >= 0); m_refCount++; }
1434 	bool releaseRef() @safe nothrow { assert(m_refCount > 0); return --m_refCount > 0; }
1435 
1436 	bool wait(bool interruptible)(Duration timeout, EventID evt = EventID.invalid, scope bool delegate() @safe nothrow exit_condition = null)
1437 	@safe {
1438 		import core.time : MonoTime;
1439 		import vibe.internal.async : Waitable, asyncAwaitAny;
1440 
1441 		TaskWaiter waiter_store;
1442 		TaskWaiter* waiter = () @trusted { return &waiter_store; } ();
1443 
1444 		m_waiters.insertBack(waiter);
1445 		assert(waiter.next !is null);
1446 		scope (exit)
1447 			if (waiter.next !is null) {
1448 				m_waiters.remove(waiter);
1449 				assert(!waiter.next);
1450 			}
1451 
1452 		MonoTime target_timeout, now;
1453 		if (timeout != Duration.max) {
1454 			try now = MonoTime.currTime();
1455 			catch (Exception e) { assert(false, e.msg); }
1456 			target_timeout = now + timeout;
1457 		}
1458 
1459 		bool cancelled;
1460 
1461 		alias waitable = Waitable!(typeof(TaskWaiter.notifier),
1462 			(cb) { waiter.wait(cb); },
1463 			(cb) { cancelled = true; waiter.cancel(); },
1464 			() {}
1465 		);
1466 
1467 		alias ewaitable = Waitable!(EventCallback,
1468 			(cb) {
1469 				eventDriver.events.wait(evt, cb);
1470 				// check for exit condition *after* starting to wait for the event
1471 				// to avoid a race condition
1472 				if (exit_condition()) {
1473 					eventDriver.events.cancelWait(evt, cb);
1474 					cb(evt);
1475 				}
1476 			},
1477 			(cb) { eventDriver.events.cancelWait(evt, cb); },
1478 			(EventID) {}
1479 		);
1480 
1481 		if (evt != EventID.invalid) {
1482 			asyncAwaitAny!(interruptible, waitable, ewaitable)(timeout);
1483 		} else {
1484 			asyncAwaitAny!(interruptible, waitable)(timeout);
1485 		}
1486 
1487 		if (cancelled) {
1488 			assert(waiter.next !is null, "Cancelled waiter not in queue anymore!?");
1489 			return false;
1490 		} else {
1491 			assert(waiter.next is null, "Triggered waiter still in queue!?");
1492 			return true;
1493 		}
1494 	}
1495 
1496 	void emit()
1497 	@safe nothrow {
1498 		import std.algorithm.mutation : swap;
1499 		import vibe.core.core : yield;
1500 
1501 		if (m_waiters.empty) return;
1502 
1503 		TaskWaiter* pivot = () @trusted { return &m_emitPivot; } ();
1504 
1505 		if (pivot.next) { // another emit in progress?
1506 			// shift pivot to the end, so that the other emit call will process all pending waiters
1507 			if (pivot !is m_waiters.back) {
1508 				m_waiters.remove(pivot);
1509 				m_waiters.insertBack(pivot);
1510 			}
1511 			return;
1512 		}
1513 
1514 		m_waiters.insertBack(pivot);
1515 		scope (exit) m_waiters.remove(pivot);
1516 
1517 		foreach (w; m_waiters) {
1518 			if (w is pivot) break;
1519 			emitWaiter(w);
1520 		}
1521 	}
1522 
1523 	bool emitSingle()
1524 	@safe nothrow {
1525 		if (m_waiters.empty) return false;
1526 
1527 		TaskWaiter* pivot = () @trusted { return &m_emitPivot; } ();
1528 
1529 		if (pivot.next) { // another emit in progress?
1530 			// shift pivot to the right, so that the other emit call will process another waiter
1531 			if (pivot !is m_waiters.back) {
1532 				auto n = pivot.next;
1533 				m_waiters.remove(pivot);
1534 				m_waiters.insertAfter(pivot, n);
1535 			}
1536 			return true;
1537 		}
1538 
1539 		emitWaiter(m_waiters.front);
1540 		return true;
1541 	}
1542 
1543 	private void emitWaiter(TaskWaiter* w)
1544 	@safe nothrow {
1545 		m_waiters.remove(w);
1546 
1547 		if (w.notifier !is null) {
1548 			logTrace("notify task %s %s %s", cast(void*)w, () @trusted { return cast(void*)w.notifier.funcptr; } (), w.notifier.ptr);
1549 			w.emit();
1550 		} else logTrace("notify callback is null");
1551 	}
1552 }
1553 
1554 private struct TaskMutexImpl(bool INTERRUPTIBLE) {
1555 	private {
1556 		shared(bool) m_locked = false;
1557 		shared(uint) m_waiters = 0;
1558 		shared(ManualEvent) m_signal;
1559 		debug Task m_owner;
1560 	}
1561 
1562 	void setup()
1563 	{
1564 		m_signal.initialize();
1565 	}
1566 
1567 	@trusted bool tryLock()
1568 	nothrow {
1569 		if (cas(&m_locked, false, true)) {
1570 			debug m_owner = Task.getThis();
1571 			debug(VibeMutexLog) logTrace("mutex %s lock %s", cast(void*)&this, atomicLoad(m_waiters));
1572 			return true;
1573 		}
1574 		return false;
1575 	}
1576 
1577 	@trusted bool lock(Duration timeout = Duration.max)
1578 	{
1579 		if (tryLock()) return true;
1580 		debug assert(m_owner == Task() || m_owner != Task.getThis(), "Recursive mutex lock.");
1581 		atomicOp!"+="(m_waiters, 1);
1582 		debug(VibeMutexLog) logTrace("mutex %s wait %s", cast(void*)&this, atomicLoad(m_waiters));
1583 		scope(exit) atomicOp!"-="(m_waiters, 1);
1584 		auto ecnt = m_signal.emitCount();
1585 		MonoTime target = MonoTime.currTime + timeout;
1586 		while (!tryLock()) {
1587 			auto now = MonoTime.currTime;
1588 			if (timeout != Duration.max && now >= target)
1589 				return false;
1590 
1591 			auto remaining = timeout != Duration.max ? target - now : Duration.max;
1592 			static if (INTERRUPTIBLE) ecnt = m_signal.wait(remaining, ecnt);
1593 			else ecnt = m_signal.waitUninterruptible(remaining, ecnt);
1594 		}
1595 		return true;
1596 	}
1597 
1598 	@trusted void unlock()
1599 	{
1600 		assert(m_locked);
1601 		debug {
1602 			assert(m_owner == Task.getThis());
1603 			m_owner = Task();
1604 		}
1605 		atomicStore!(MemoryOrder.rel)(m_locked, false);
1606 		debug(VibeMutexLog) logTrace("mutex %s unlock %s", cast(void*)&this, atomicLoad(m_waiters));
1607 		if (atomicLoad(m_waiters) > 0)
1608 			m_signal.emit();
1609 	}
1610 }
1611 
1612 private struct RecursiveTaskMutexImpl(bool INTERRUPTIBLE) {
1613 	import std.stdio;
1614 	private {
1615 		core.sync.mutex.Mutex m_mutex;
1616 		Task m_owner;
1617 		size_t m_recCount = 0;
1618 		shared(uint) m_waiters = 0;
1619 		shared(ManualEvent) m_signal;
1620 		@property bool m_locked() const { return m_recCount > 0; }
1621 	}
1622 
1623 	void setup()
1624 	{
1625 		m_mutex = new core.sync.mutex.Mutex;
1626 		m_signal.initialize();
1627 	}
1628 
1629 	@trusted bool tryLock()
1630 	nothrow {
1631 		auto self = Task.getThis();
1632 		return m_mutex.performLocked!({
1633 			if (!m_owner) {
1634 				assert(m_recCount == 0);
1635 				m_recCount = 1;
1636 				m_owner = self;
1637 				return true;
1638 			} else if (m_owner == self) {
1639 				m_recCount++;
1640 				return true;
1641 			}
1642 			return false;
1643 		});
1644 	}
1645 
1646 	@trusted void lock()
1647 	{
1648 		if (tryLock()) return;
1649 		atomicOp!"+="(m_waiters, 1);
1650 		debug(VibeMutexLog) logTrace("mutex %s wait %s", cast(void*)&this, atomicLoad(m_waiters));
1651 		scope(exit) atomicOp!"-="(m_waiters, 1);
1652 		auto ecnt = m_signal.emitCount();
1653 		while (!tryLock()) {
1654 			static if (INTERRUPTIBLE) ecnt = m_signal.wait(ecnt);
1655 			else ecnt = m_signal.waitUninterruptible(ecnt);
1656 		}
1657 	}
1658 
1659 	@trusted void unlock()
1660 	{
1661 		auto self = Task.getThis();
1662 		m_mutex.performLocked!({
1663 			assert(m_owner == self);
1664 			assert(m_recCount > 0);
1665 			m_recCount--;
1666 			if (m_recCount == 0) {
1667 				m_owner = Task.init;
1668 			}
1669 		});
1670 		debug(VibeMutexLog) logTrace("mutex %s unlock %s", cast(void*)&this, atomicLoad(m_waiters));
1671 		if (atomicLoad(m_waiters) > 0)
1672 			m_signal.emit();
1673 	}
1674 }
1675 
1676 private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) {
1677 	private {
1678 		LOCKABLE m_mutex;
1679 		static if (is(LOCKABLE == Mutex))
1680 			TaskMutex m_taskMutex;
1681 		shared(ManualEvent) m_signal;
1682 	}
1683 
1684 	static if (is(LOCKABLE == Lockable)) {
1685 		final class MutexWrapper : Lockable {
1686 			private core.sync.mutex.Mutex m_mutex;
1687 			this(core.sync.mutex.Mutex mtx) { m_mutex = mtx; }
1688 			@trusted void lock() { m_mutex.lock(); }
1689 			@trusted void unlock() { m_mutex.unlock(); }
1690 			@trusted bool tryLock() { return m_mutex.tryLock(); }
1691 		}
1692 
1693 		void setupForMutex(core.sync.mutex.Mutex mtx)
1694 		{
1695 			setup(new MutexWrapper(mtx));
1696 		}
1697 	}
1698 
1699 	@disable this(this);
1700 
1701 	void setup(LOCKABLE mtx)
1702 	{
1703 		m_mutex = mtx;
1704 		static if (is(typeof(m_taskMutex)))
1705 			m_taskMutex = cast(TaskMutex)mtx;
1706 		m_signal.initialize();
1707 	}
1708 
1709 	@property LOCKABLE mutex() { return m_mutex; }
1710 
1711 	@trusted void wait()
1712 	{
1713 		if (auto tm = cast(TaskMutex)m_mutex) {
1714 			assert(tm.m_impl.m_locked);
1715 			debug assert(tm.m_impl.m_owner == Task.getThis());
1716 		}
1717 
1718 		auto refcount = m_signal.emitCount;
1719 
1720 		static if (is(LOCKABLE == Mutex)) {
1721 			if (m_taskMutex) m_taskMutex.unlock();
1722 			else m_mutex.unlock_nothrow();
1723 		} else m_mutex.unlock();
1724 
1725 		scope(exit) {
1726 			static if (is(LOCKABLE == Mutex)) {
1727 				if (m_taskMutex) m_taskMutex.lock();
1728 				else m_mutex.lock_nothrow();
1729 			} else m_mutex.lock();
1730 		}
1731 		static if (INTERRUPTIBLE) m_signal.wait(refcount);
1732 		else m_signal.waitUninterruptible(refcount);
1733 	}
1734 
1735 	@trusted bool wait(Duration timeout)
1736 	{
1737 		assert(!timeout.isNegative());
1738 		if (auto tm = cast(TaskMutex)m_mutex) {
1739 			assert(tm.m_impl.m_locked);
1740 			debug assert(tm.m_impl.m_owner == Task.getThis());
1741 		}
1742 
1743 		auto refcount = m_signal.emitCount;
1744 
1745 		static if (is(LOCKABLE == Mutex)) {
1746 			if (m_taskMutex) m_taskMutex.unlock();
1747 			else m_mutex.unlock_nothrow();
1748 		} else m_mutex.unlock();
1749 
1750 		scope(exit) {
1751 			static if (is(LOCKABLE == Mutex)) {
1752 				if (m_taskMutex) m_taskMutex.lock();
1753 				else m_mutex.lock_nothrow();
1754 			} else m_mutex.lock();
1755 		}
1756 
1757 		static if (INTERRUPTIBLE) return m_signal.wait(timeout, refcount) != refcount;
1758 		else return m_signal.waitUninterruptible(timeout, refcount) != refcount;
1759 	}
1760 
1761 	@trusted void notify()
1762 	{
1763 		m_signal.emit();
1764 	}
1765 
1766 	@trusted void notifyAll()
1767 	{
1768 		m_signal.emit();
1769 	}
1770 }
1771 
1772 /** Contains the shared state of a $(D TaskReadWriteMutex).
1773  *
1774  *  Since a $(D TaskReadWriteMutex) consists of two actual Mutex
1775  *  objects that rely on common memory, this class implements
1776  *  the actual functionality of their method calls.
1777  *
1778  *  The method implementations are based on two static parameters
1779  *  ($(D INTERRUPTIBLE) and $(D INTENT)), which are configured through
1780  *  template arguments:
1781  *
1782  *  - $(D INTERRUPTIBLE) determines whether the mutex implementation
1783  *    are interruptible by vibe.d's $(D vibe.core.task.Task.interrupt())
1784  *    method or not.
1785  *
1786  *  - $(D INTENT) describes the intent, with which a locking operation is
1787  *    performed (i.e. $(D READ_ONLY) or $(D READ_WRITE)). RO locking allows for
1788  *    multiple Tasks holding the mutex, whereas RW locking will cause
1789  *    a "bottleneck" so that only one Task can write to the protected
1790  *    data at once.
1791  */
1792 private struct ReadWriteMutexState(bool INTERRUPTIBLE)
1793 {
1794     /** The policy with which the mutex should operate.
1795      *
1796      *  The policy determines how the acquisition of the locks is
1797      *  performed and can be used to tune the mutex according to the
1798      *  underlying algorithm in which it is used.
1799      *
1800      *  According to the provided policy, the mutex will either favor
1801      *  reading or writing tasks and could potentially starve the
1802      *  respective opposite.
1803      *
1804      *  cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy)
1805      */
1806     enum Policy : int
1807     {
1808         /** Readers are prioritized, writers may be starved as a result. */
1809         PREFER_READERS = 0,
1810         /** Writers are prioritized, readers may be starved as a result. */
1811         PREFER_WRITERS
1812     }
1813 
1814     /** The intent with which a locking operation is performed.
1815      *
1816      *  Since both locks share the same underlying algorithms, the actual
1817      *  intent with which a lock operation is performed (i.e read/write)
1818      *  are passed as a template parameter to each method.
1819      */
1820     enum LockingIntent : bool
1821     {
1822         /** Perform a read lock/unlock operation. Multiple reading locks can be
1823          *  active at a time. */
1824         READ_ONLY = 0,
1825         /** Perform a write lock/unlock operation. Only a single writer can
1826          *  hold a lock at any given time. */
1827         READ_WRITE = 1
1828     }
1829 
1830     private {
1831         //Queue counters
1832         /** The number of reading tasks waiting for the lock to become available. */
1833         shared(uint)  m_waitingForReadLock = 0;
1834         /** The number of writing tasks waiting for the lock to become available. */
1835         shared(uint)  m_waitingForWriteLock = 0;
1836 
1837         //Lock counters
1838         /** The number of reading tasks that currently hold the lock. */
1839         uint  m_activeReadLocks = 0;
1840         /** The number of writing tasks that currently hold the lock (binary). */
1841         ubyte m_activeWriteLocks = 0;
1842 
1843         /** The policy determining the lock's behavior. */
1844         Policy m_policy;
1845 
1846         //Queue Events
1847         /** The event used to wake reading tasks waiting for the lock while it is blocked. */
1848         shared(ManualEvent) m_readyForReadLock;
1849         /** The event used to wake writing tasks waiting for the lock while it is blocked. */
1850         shared(ManualEvent) m_readyForWriteLock;
1851 
1852         /** The underlying mutex that gates the access to the shared state. */
1853         Mutex m_counterMutex;
1854     }
1855 
1856     this(Policy policy)
1857     {
1858         m_policy = policy;
1859         m_counterMutex = new Mutex();
1860         m_readyForReadLock  = createSharedManualEvent();
1861         m_readyForWriteLock = createSharedManualEvent();
1862     }
1863 
1864     @disable this(this);
1865 
1866     /** The policy with which the lock has been created. */
1867     @property policy() const { return m_policy; }
1868 
1869     version(RWMutexPrint)
1870     {
1871         /** Print out debug information during lock operations. */
1872         void printInfo(string OP, LockingIntent INTENT)() nothrow
1873         {
1874         	import std.string;
1875             try
1876             {
1877                 import std.stdio;
1878                 writefln("RWMutex: %s (%s), active: RO: %d, RW: %d; waiting: RO: %d, RW: %d",
1879                     OP.leftJustify(10,' '),
1880                     INTENT == LockingIntent.READ_ONLY ? "RO" : "RW",
1881                     m_activeReadLocks,    m_activeWriteLocks,
1882                     m_waitingForReadLock, m_waitingForWriteLock
1883                     );
1884             }
1885             catch (Throwable t){}
1886         }
1887     }
1888 
1889     /** An internal shortcut method to determine the queue event for a given intent. */
1890     @property ref auto queueEvent(LockingIntent INTENT)()
1891     {
1892         static if (INTENT == LockingIntent.READ_ONLY)
1893             return m_readyForReadLock;
1894         else
1895             return m_readyForWriteLock;
1896     }
1897 
1898     /** An internal shortcut method to determine the queue counter for a given intent. */
1899     @property ref auto queueCounter(LockingIntent INTENT)()
1900     {
1901         static if (INTENT == LockingIntent.READ_ONLY)
1902             return m_waitingForReadLock;
1903         else
1904             return m_waitingForWriteLock;
1905     }
1906 
1907     /** An internal shortcut method to determine the current emitCount of the queue counter for a given intent. */
1908     int emitCount(LockingIntent INTENT)()
1909     {
1910         return queueEvent!INTENT.emitCount();
1911     }
1912 
1913     /** An internal shortcut method to determine the active counter for a given intent. */
1914     @property ref auto activeCounter(LockingIntent INTENT)()
1915     {
1916         static if (INTENT == LockingIntent.READ_ONLY)
1917             return m_activeReadLocks;
1918         else
1919             return m_activeWriteLocks;
1920     }
1921 
1922     /** An internal shortcut method to wait for the queue event for a given intent.
1923      *
1924      *  This method is used during the `lock()` operation, after a
1925      *  `tryLock()` operation has been unsuccessfully finished.
1926      *  The active fiber will yield and be suspended until the queue event
1927      *  for the given intent will be fired.
1928      */
1929     int wait(LockingIntent INTENT)(int count)
1930     {
1931         static if (INTERRUPTIBLE)
1932             return queueEvent!INTENT.wait(count);
1933         else
1934             return queueEvent!INTENT.waitUninterruptible(count);
1935     }
1936 
1937     /** An internal shortcut method to notify tasks waiting for the lock to become available again.
1938      *
1939      *  This method is called whenever the number of owners of the mutex hits
1940      *  zero; this is basically the counterpart to `wait()`.
1941      *  It wakes any Task currently waiting for the mutex to be released.
1942      */
1943     @trusted void notify(LockingIntent INTENT)()
1944     {
1945         static if (INTENT == LockingIntent.READ_ONLY)
1946         { //If the last reader unlocks the mutex, notify all waiting writers
1947             if (atomicLoad(m_waitingForWriteLock) > 0)
1948                 m_readyForWriteLock.emit();
1949         }
1950         else
1951         { //If a writer unlocks the mutex, notify both readers and writers
1952             if (atomicLoad(m_waitingForReadLock) > 0)
1953                 m_readyForReadLock.emit();
1954 
1955             if (atomicLoad(m_waitingForWriteLock) > 0)
1956                 m_readyForWriteLock.emit();
1957         }
1958     }
1959 
1960     /** An internal method that performs the acquisition attempt in different variations.
1961      *
1962      *  Since both locks rely on a common TaskMutex object which gates the access
1963      *  to their common data acquisition attempts for this lock are more complex
1964      *  than for simple mutex variants. This method will thus be performing the
1965      *  `tryLock()` operation in two variations, depending on the callee:
1966      *
1967      *  If called from the outside ($(D WAIT_FOR_BLOCKING_MUTEX) = false), the method
1968      *  will instantly fail if the underlying mutex is locked (i.e. during another
1969      *  `tryLock()` or `unlock()` operation), in order to guarantee the fastest
1970      *  possible locking attempt.
1971      *
1972      *  If used internally by the `lock()` method ($(D WAIT_FOR_BLOCKING_MUTEX) = true),
1973      *  the operation will wait for the mutex to be available before deciding if
1974      *  the lock can be acquired, since the attempt would anyway be repeated until
1975      *  it succeeds. This will prevent frequent retries under heavy loads and thus
1976      *  should ensure better performance.
1977      */
1978     @trusted bool tryLock(LockingIntent INTENT, bool WAIT_FOR_BLOCKING_MUTEX)()
1979     {
1980         //Log a debug statement for the attempt
1981         version(RWMutexPrint)
1982             printInfo!("tryLock",INTENT)();
1983 
1984         //Try to acquire the lock
1985         static if (!WAIT_FOR_BLOCKING_MUTEX)
1986         {
1987             if (!m_counterMutex.tryLock())
1988                 return false;
1989         }
1990         else
1991             m_counterMutex.lock();
1992 
1993         scope(exit)
1994             m_counterMutex.unlock();
1995 
1996         //Log a debug statement for the attempt
1997         version(RWMutexPrint)
1998             printInfo!("checkCtrs",INTENT)();
1999 
2000         //Check if there's already an active writer
2001         if (m_activeWriteLocks > 0)
2002             return false;
2003 
2004         //If writers are preferred over readers, check whether there
2005         //currently is a writer in the waiting queue and abort if
2006         //that's the case.
2007         static if (INTENT == LockingIntent.READ_ONLY)
2008             if (m_policy.PREFER_WRITERS && m_waitingForWriteLock > 0)
2009                 return false;
2010 
2011         //If we are locking the mutex for writing, make sure that
2012         //there's no reader active.
2013         static if (INTENT == LockingIntent.READ_WRITE)
2014             if (m_activeReadLocks > 0)
2015                 return false;
2016 
2017         //We can successfully acquire the lock!
2018         //Log a debug statement for the success.
2019         version(RWMutexPrint)
2020             printInfo!("lock",INTENT)();
2021 
2022         //Increase the according counter
2023         //(number of active readers/writers)
2024         //and return a success code.
2025         activeCounter!INTENT += 1;
2026         return true;
2027     }
2028 
2029     /** Attempt to acquire the lock for a given intent.
2030      *
2031      *  Returns:
2032      *      `true`, if the lock was successfully acquired;
2033      *      `false` otherwise.
2034      */
2035     @trusted bool tryLock(LockingIntent INTENT)()
2036     {
2037         //Try to lock this mutex without waiting for the underlying
2038         //TaskMutex - fail if it is already blocked.
2039         return tryLock!(INTENT,false)();
2040     }
2041 
2042     /** Acquire the lock for the given intent; yield and suspend until the lock has been acquired. */
2043     @trusted void lock(LockingIntent INTENT)()
2044     {
2045         //Prepare a waiting action before the first
2046         //`tryLock()` call in order to avoid a race
2047         //condition that could lead to the queue notification
2048         //not being fired.
2049         auto count = emitCount!INTENT;
2050         atomicOp!"+="(queueCounter!INTENT,1);
2051         scope(exit)
2052             atomicOp!"-="(queueCounter!INTENT,1);
2053 
2054         //Try to lock the mutex
2055         auto locked = tryLock!(INTENT,true)();
2056         if (locked)
2057             return;
2058 
2059         //Retry until we successfully acquired the lock
2060         while(!locked)
2061         {
2062             version(RWMutexPrint)
2063                 printInfo!("wait",INTENT)();
2064 
2065             count  = wait!INTENT(count);
2066             locked = tryLock!(INTENT,true)();
2067         }
2068     }
2069 
2070     /** Unlock the mutex after a successful acquisition. */
2071     @trusted void unlock(LockingIntent INTENT)()
2072     {
2073         version(RWMutexPrint)
2074             printInfo!("unlock",INTENT)();
2075 
2076         debug assert(activeCounter!INTENT > 0);
2077 
2078         synchronized(m_counterMutex)
2079         {
2080             //Decrement the counter of active lock holders.
2081             //If the counter hits zero, notify waiting Tasks
2082             activeCounter!INTENT -= 1;
2083             if (activeCounter!INTENT == 0)
2084             {
2085                 version(RWMutexPrint)
2086                     printInfo!("notify",INTENT)();
2087 
2088                 notify!INTENT();
2089             }
2090         }
2091     }
2092 }
2093 
2094 /** A ReadWriteMutex implementation for fibers.
2095  *
2096  *  This mutex can be used in exchange for a $(D core.sync.mutex.ReadWriteMutex),
2097  *  but does not block the event loop in contention situations. The `reader` and `writer`
2098  *  members are used for locking. Locking the `reader` mutex allows access to multiple
2099  *  readers at once, while the `writer` mutex only allows a single writer to lock it at
2100  *  any given time. Locks on `reader` and `writer` are mutually exclusive (i.e. whenever a
2101  *  writer is active, no readers can be active at the same time, and vice versa).
2102  *
2103  *  Notice:
2104  *      Mutexes implemented by this class cannot be interrupted
2105  *      using $(D vibe.core.task.Task.interrupt()). The corresponding
2106  *      InterruptException will be deferred until the next blocking
2107  *      operation yields the event loop.
2108  *
2109  *      Use $(D InterruptibleTaskReadWriteMutex) as an alternative that can be
2110  *      interrupted.
2111  *
2112  *  cf. $(D core.sync.mutex.ReadWriteMutex)
2113  */
2114 final class TaskReadWriteMutex
2115 {
2116     private {
2117         alias State = ReadWriteMutexState!false;
2118         alias LockingIntent = State.LockingIntent;
2119         alias READ_ONLY  = LockingIntent.READ_ONLY;
2120         alias READ_WRITE = LockingIntent.READ_WRITE;
2121 
2122         /** The shared state used by the reader and writer mutexes. */
2123         State m_state;
2124     }
2125 
2126     /** The policy with which the mutex should operate.
2127      *
2128      *  The policy determines how the acquisition of the locks is
2129      *  performed and can be used to tune the mutex according to the
2130      *  underlying algorithm in which it is used.
2131      *
2132      *  According to the provided policy, the mutex will either favor
2133      *  reading or writing tasks and could potentially starve the
2134      *  respective opposite.
2135      *
2136      *  cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy)
2137      */
2138     alias Policy = State.Policy;
2139 
2140     /** A common baseclass for both of the provided mutexes.
2141      *
2142      *  The intent for the according mutex is specified through the
2143      *  $(D INTENT) template argument, which determines if a mutex is
2144      *  used for read or write locking.
2145      */
2146     final class Mutex(LockingIntent INTENT): core.sync.mutex.Mutex, Lockable
2147     {
2148         /** Try to lock the mutex. cf. $(D core.sync.mutex.Mutex) */
2149         override bool tryLock() { return m_state.tryLock!INTENT(); }
2150         /** Lock the mutex. cf. $(D core.sync.mutex.Mutex) */
2151         override void lock()    { m_state.lock!INTENT(); }
2152         /** Unlock the mutex. cf. $(D core.sync.mutex.Mutex) */
2153         override void unlock()  { m_state.unlock!INTENT(); }
2154     }
2155     alias Reader = Mutex!READ_ONLY;
2156     alias Writer = Mutex!READ_WRITE;
2157 
2158     Reader reader;
2159     Writer writer;
2160 
2161     this(Policy policy = Policy.PREFER_WRITERS)
2162     {
2163         m_state = State(policy);
2164         reader = new Reader();
2165         writer = new Writer();
2166     }
2167 
2168     /** The policy with which the lock has been created. */
2169     @property Policy policy() const { return m_state.policy; }
2170 }
2171 
2172 /** Alternative to $(D TaskReadWriteMutex) that supports interruption.
2173  *
2174  *  This class supports the use of $(D vibe.core.task.Task.interrupt()) while
2175  *  waiting in the `lock()` method.
2176  *
2177  *  cf. $(D core.sync.mutex.ReadWriteMutex)
2178  */
2179 final class InterruptibleTaskReadWriteMutex
2180 {
2181 	@safe:
2182 
2183     private {
2184         alias State = ReadWriteMutexState!true;
2185         alias LockingIntent = State.LockingIntent;
2186         alias READ_ONLY  = LockingIntent.READ_ONLY;
2187         alias READ_WRITE = LockingIntent.READ_WRITE;
2188 
2189         /** The shared state used by the reader and writer mutexes. */
2190         State m_state;
2191     }
2192 
2193     /** The policy with which the mutex should operate.
2194      *
2195      *  The policy determines how the acquisition of the locks is
2196      *  performed and can be used to tune the mutex according to the
2197      *  underlying algorithm in which it is used.
2198      *
2199      *  According to the provided policy, the mutex will either favor
2200      *  reading or writing tasks and could potentially starve the
2201      *  respective opposite.
2202      *
2203      *  cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy)
2204      */
2205     alias Policy = State.Policy;
2206 
2207     /** A common baseclass for both of the provided mutexes.
2208      *
2209      *  The intent for the according mutex is specified through the
2210      *  $(D INTENT) template argument, which determines if a mutex is
2211      *  used for read or write locking.
2212      *
2213      */
2214     final class Mutex(LockingIntent INTENT): core.sync.mutex.Mutex, Lockable
2215     {
2216         /** Try to lock the mutex. cf. $(D core.sync.mutex.Mutex) */
2217         override bool tryLock() { return m_state.tryLock!INTENT(); }
2218         /** Lock the mutex. cf. $(D core.sync.mutex.Mutex) */
2219         override void lock()    { m_state.lock!INTENT(); }
2220         /** Unlock the mutex. cf. $(D core.sync.mutex.Mutex) */
2221         override void unlock()  { m_state.unlock!INTENT(); }
2222     }
2223     alias Reader = Mutex!READ_ONLY;
2224     alias Writer = Mutex!READ_WRITE;
2225 
2226     Reader reader;
2227     Writer writer;
2228 
2229     this(Policy policy = Policy.PREFER_WRITERS)
2230     {
2231         m_state = State(policy);
2232         reader = new Reader();
2233         writer = new Writer();
2234     }
2235 
2236     /** The policy with which the lock has been created. */
2237     @property Policy policy() const { return m_state.policy; }
2238 }