1 /**
2 Event loop compatible task synchronization facilities.
3
4 This module provides replacement primitives for the modules in `core.sync`
5 that do not block vibe.d's event loop in their wait states. These should
6 always be preferred over the ones in Druntime under usual circumstances.
7
8 Using a standard `Mutex` is possible as long as it is ensured that no event
9 loop based functionality (I/O, task interaction or anything that implicitly
10 calls `vibe.core.core.yield`) is executed within a section of code that is
11 protected by the mutex. $(B Failure to do so may result in dead-locks and
12 high-level race-conditions!)
13
14 Copyright: © 2012-2019 Sönke Ludwig
15 Authors: Leonid Kramer, Sönke Ludwig, Manuel Frischknecht
16 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
17 */
18 module vibe.core.sync;
19
20 import vibe.core.log : logDebugV, logTrace, logInfo;
21 import vibe.core.task;
22
23 import core.atomic;
24 import core.sync.mutex;
25 import core.sync.condition;
26 import eventcore.core;
27 import std.exception;
28 import std.stdio;
29 import std.traits : ReturnType;
30
31
32 /** Creates a new signal that can be shared between fibers.
33 */
34 LocalManualEvent createManualEvent()
35 @safe nothrow {
36 LocalManualEvent ret;
37 ret.initialize();
38 return ret;
39 }
40 /// ditto
41 shared(ManualEvent) createSharedManualEvent()
42 @trusted nothrow {
43 shared(ManualEvent) ret;
44 ret.initialize();
45 return ret;
46 }
47
48
49 /** Performs RAII based locking/unlocking of a mutex.
50
51 Note that while `TaskMutex` can be used with D's built-in `synchronized`
52 statement, `InterruptibleTaskMutex` cannot. This function provides a
53 library based alternative that is suitable for use with all mutex types.
54 */
55 ScopedMutexLock!M scopedMutexLock(M)(M mutex, LockMode mode = LockMode.lock)
56 if (is(M : Mutex) || is(M : Lockable))
57 {
58 return ScopedMutexLock!M(mutex, mode);
59 }
60
61 ///
62 unittest {
63 import vibe.core.core : runWorkerTaskH;
64
65 __gshared int counter;
66 __gshared TaskMutex mutex;
67
68 mutex = new TaskMutex;
69
70 Task[] tasks;
71
72 foreach (i; 0 .. 100) {
73 tasks ~= runWorkerTaskH(() nothrow {
74 auto l = scopedMutexLock(mutex);
75 counter++;
76 });
77 }
78
79 foreach (t; tasks) t.join();
80
81 assert(counter == 100);
82 }
83
84 unittest {
85 scopedMutexLock(new Mutex);
86 scopedMutexLock(new TaskMutex);
87 scopedMutexLock(new InterruptibleTaskMutex);
88 }
89
90 enum LockMode {
91 lock,
92 tryLock,
93 defer
94 }
95
96 interface Lockable {
97 @safe:
98 void lock();
99 void unlock();
100 bool tryLock();
101 }
102
103 /** RAII lock for the Mutex class.
104 */
105 struct ScopedMutexLock(M)
106 if (is(M : Mutex) || is(M : Lockable))
107 {
108 @disable this(this);
109 private {
110 M m_mutex;
111 bool m_locked;
112 LockMode m_mode;
113 }
114
115 this(M mutex, LockMode mode = LockMode.lock)
116 {
117 assert(mutex !is null);
118 m_mutex = mutex;
119
120 final switch (mode) {
121 case LockMode.lock: lock(); break;
122 case LockMode.tryLock: tryLock(); break;
123 case LockMode.defer: break;
124 }
125 }
126
127 ~this()
128 {
129 if (m_locked) unlock();
130 }
131
132 @property bool locked() const { return m_locked; }
133
134 void unlock()
135 in { assert(this.locked); }
136 do {
137 assert(m_locked, "Unlocking unlocked scoped mutex lock");
138 static if (is(typeof(m_mutex.unlock_nothrow())))
139 m_mutex.unlock_nothrow();
140 else
141 m_mutex.unlock();
142 m_locked = false;
143 }
144
145 bool tryLock()
146 in { assert(!this.locked); }
147 do {
148 static if (is(typeof(m_mutex.tryLock_nothrow())))
149 return m_locked = m_mutex.tryLock_nothrow();
150 else
151 return m_locked = m_mutex.tryLock();
152 }
153
154 void lock()
155 in { assert(!this.locked); }
156 do {
157 m_locked = true;
158 static if (is(typeof(m_mutex.lock_nothrow())))
159 m_mutex.lock_nothrow();
160 else
161 m_mutex.lock();
162 }
163 }
164
165
166 /*
167 Only for internal use:
168 Ensures that a mutex is locked while executing the given procedure.
169
170 This function works for all kinds of mutexes, in particular for
171 $(D core.sync.mutex.Mutex), $(D TaskMutex) and $(D InterruptibleTaskMutex).
172
173 Returns:
174 Returns the value returned from $(D PROC), if any.
175 */
176 /// private
177 ReturnType!PROC performLocked(alias PROC, MUTEX)(MUTEX mutex)
178 {
179 auto l = scopedMutexLock(mutex);
180 return PROC();
181 }
182
183 ///
184 unittest {
185 int protected_var = 0;
186 auto mtx = new TaskMutex;
187 mtx.performLocked!({
188 protected_var++;
189 });
190 }
191
192
193 /**
194 Thread-local semaphore implementation for tasks.
195
196 When the semaphore runs out of concurrent locks, it will suspend. This class
197 is used in `vibe.core.connectionpool` to limit the number of concurrent
198 connections.
199 */
200 final class LocalTaskSemaphore
201 {
202 @safe:
203
204 // requires a queue
205 import std.container.binaryheap;
206 import std.container.array;
207 //import vibe.utils.memory;
208
209 private {
210 static struct ThreadWaiter {
211 ubyte priority;
212 uint seq;
213 }
214
215 BinaryHeap!(Array!ThreadWaiter, asc) m_waiters;
216 uint m_maxLocks;
217 uint m_locks;
218 uint m_seq;
219 LocalManualEvent m_signal;
220 }
221
222 this(uint max_locks) nothrow
223 {
224 m_maxLocks = max_locks;
225 m_signal = createManualEvent();
226 }
227
228 /// Maximum number of concurrent locks
229 @property void maxLocks(uint max_locks) { m_maxLocks = max_locks; }
230 /// ditto
231 @property uint maxLocks() const nothrow { return m_maxLocks; }
232
233 /// Number of concurrent locks still available
234 @property uint available() const { return m_maxLocks - m_locks; }
235
236 /** Try to acquire a lock.
237
238 If a lock cannot be acquired immediately, returns `false` and leaves the
239 semaphore in its previous state.
240
241 Returns:
242 `true` is returned $(I iff) the number of available locks is greater
243 than one.
244 */
245 bool tryLock()
246 {
247 if (available > 0)
248 {
249 m_locks++;
250 return true;
251 }
252 return false;
253 }
254
255 /** Acquires a lock.
256
257 Once the limit of concurrent locks is reached, this method will block
258 until the number of locks drops below the limit.
259 */
260 void lock(ubyte priority = 0)
261 {
262 import std.algorithm.comparison : min;
263
264 if (tryLock())
265 return;
266
267 ThreadWaiter w;
268 w.priority = priority;
269 w.seq = min(0, m_seq - w.priority);
270 if (++m_seq == uint.max)
271 rewindSeq();
272
273 () @trusted { m_waiters.insert(w); } ();
274
275 while (true) {
276 m_signal.waitUninterruptible();
277 if (m_waiters.front.seq == w.seq && tryLock()) {
278 return;
279 }
280 }
281 }
282
283 /** Gives up an existing lock.
284 */
285 void unlock()
286 {
287 assert(m_locks >= 1);
288 m_locks--;
289 if (m_waiters.length > 0)
290 m_signal.emit(); // resume one
291 }
292
293 // if true, a goes after b. ie. b comes out front()
294 /// private
295 static bool asc(ref ThreadWaiter a, ref ThreadWaiter b)
296 {
297 if (a.priority != b.priority)
298 return a.priority < b.priority;
299 return a.seq > b.seq;
300 }
301
302 private void rewindSeq()
303 @trusted {
304 Array!ThreadWaiter waiters = m_waiters.release();
305 ushort min_seq;
306 import std.algorithm : min;
307 foreach (ref waiter; waiters[])
308 min_seq = min(waiter.seq, min_seq);
309 foreach (ref waiter; waiters[])
310 waiter.seq -= min_seq;
311 m_waiters.assume(waiters);
312 }
313 }
314
315
316 /**
317 Mutex implementation for fibers.
318
319 This mutex type can be used in exchange for a core.sync.mutex.Mutex, but
320 does not block the event loop when contention happens. Note that this
321 mutex does not allow recursive locking.
322
323 Notice:
324 Because this class is annotated nothrow, it cannot be interrupted
325 using $(D vibe.core.task.Task.interrupt()). The corresponding
326 $(D InterruptException) will be deferred until the next blocking
327 operation yields the event loop.
328
329 Use $(D InterruptibleTaskMutex) as an alternative that can be
330 interrupted.
331
332 See_Also: InterruptibleTaskMutex, RecursiveTaskMutex, core.sync.mutex.Mutex
333 */
334 final class TaskMutex : core.sync.mutex.Mutex, Lockable {
335 @safe:
336 private TaskMutexImpl!false m_impl;
337
338 this(Object o) nothrow { m_impl.setup(); super(o); }
339 this() nothrow { m_impl.setup(); }
340
341 override bool tryLock() nothrow { return m_impl.tryLock(); }
342 override void lock() nothrow { m_impl.lock(); }
343 override void unlock() nothrow { m_impl.unlock(); }
344 bool lock(Duration timeout) nothrow { return m_impl.lock(timeout); }
345 }
346
347 unittest {
348 auto mutex = new TaskMutex;
349
350 {
351 auto lock = scopedMutexLock(mutex);
352 assert(lock.locked);
353 assert(mutex.m_impl.m_locked);
354
355 auto lock2 = scopedMutexLock(mutex, LockMode.tryLock);
356 assert(!lock2.locked);
357 }
358 assert(!mutex.m_impl.m_locked);
359
360 auto lock = scopedMutexLock(mutex, LockMode.tryLock);
361 assert(lock.locked);
362 lock.unlock();
363 assert(!lock.locked);
364
365 synchronized(mutex){
366 assert(mutex.m_impl.m_locked);
367 }
368 assert(!mutex.m_impl.m_locked);
369
370 mutex.performLocked!({
371 assert(mutex.m_impl.m_locked);
372 });
373 assert(!mutex.m_impl.m_locked);
374
375 static if (__VERSION__ >= 2067) {
376 with(mutex.scopedMutexLock) {
377 assert(mutex.m_impl.m_locked);
378 }
379 }
380 }
381
382 unittest { // test deferred throwing
383 import vibe.core.core;
384
385 auto mutex = new TaskMutex;
386 auto t1 = runTask({
387 scope (failure) assert(false, "No exception expected in first task!");
388 mutex.lock();
389 scope (exit) mutex.unlock();
390 sleep(20.msecs);
391 });
392
393 auto t2 = runTask({
394 mutex.lock();
395 scope (exit) mutex.unlock();
396 try {
397 yield();
398 assert(false, "Yield is supposed to have thrown an InterruptException.");
399 } catch (InterruptException) {
400 // as expected!
401 } catch (Exception) {
402 assert(false, "Only InterruptException supposed to be thrown!");
403 }
404 });
405
406 runTask({
407 // mutex is now locked in first task for 20 ms
408 // the second tasks is waiting in lock()
409 t2.interrupt();
410 t1.joinUninterruptible();
411 t2.joinUninterruptible();
412 assert(!mutex.m_impl.m_locked); // ensure that the scope(exit) has been executed
413 exitEventLoop();
414 });
415
416 runEventLoop();
417 }
418
419 unittest {
420 runMutexUnitTests!TaskMutex();
421 }
422
423
424 /**
425 Alternative to $(D TaskMutex) that supports interruption.
426
427 This class supports the use of $(D vibe.core.task.Task.interrupt()) while
428 waiting in the $(D lock()) method. However, because the interface is not
429 $(D nothrow), it cannot be used as an object monitor.
430
431 See_Also: $(D TaskMutex), $(D InterruptibleRecursiveTaskMutex)
432 */
433 final class InterruptibleTaskMutex : Lockable {
434 @safe:
435
436 private TaskMutexImpl!true m_impl;
437
438 this()
439 {
440 m_impl.setup();
441
442 // detects invalid usage within synchronized(...)
443 () @trusted { this.__monitor = cast(void*)&NoUseMonitor.instance(); } ();
444 }
445
446 bool tryLock() nothrow { return m_impl.tryLock(); }
447 void lock() { m_impl.lock(); }
448 void unlock() nothrow { m_impl.unlock(); }
449 }
450
451 unittest {
452 runMutexUnitTests!InterruptibleTaskMutex();
453 }
454
455
456
457 /**
458 Recursive mutex implementation for tasks.
459
460 This mutex type can be used in exchange for a `core.sync.mutex.Mutex`, but
461 does not block the event loop when contention happens.
462
463 Notice:
464 Because this class is annotated `nothrow`, it cannot be interrupted
465 using $(D vibe.core.task.Task.interrupt()). The corresponding
466 $(D InterruptException) will be deferred until the next blocking
467 operation yields the event loop.
468
469 Use $(D InterruptibleRecursiveTaskMutex) as an alternative that can be
470 interrupted.
471
472 See_Also: `TaskMutex`, `core.sync.mutex.Mutex`
473 */
474 final class RecursiveTaskMutex : core.sync.mutex.Mutex, Lockable {
475 @safe:
476
477 private RecursiveTaskMutexImpl!false m_impl;
478
479 this(Object o) { m_impl.setup(); super(o); }
480 this() { m_impl.setup(); }
481
482 override bool tryLock() nothrow { return m_impl.tryLock(); }
483 override void lock() { m_impl.lock(); }
484 override void unlock() nothrow { m_impl.unlock(); }
485 }
486
487 unittest {
488 runMutexUnitTests!RecursiveTaskMutex();
489 }
490
491
492 /**
493 Alternative to $(D RecursiveTaskMutex) that supports interruption.
494
495 This class supports the use of $(D vibe.core.task.Task.interrupt()) while
496 waiting in the $(D lock()) method. However, because the interface is not
497 $(D nothrow), it cannot be used as an object monitor.
498
499 See_Also: $(D RecursiveTaskMutex), $(D InterruptibleTaskMutex)
500 */
501 final class InterruptibleRecursiveTaskMutex : Lockable {
502 @safe:
503 private RecursiveTaskMutexImpl!true m_impl;
504
505 this()
506 {
507 m_impl.setup();
508
509 // detects invalid usage within synchronized(...)
510 () @trusted { this.__monitor = cast(void*)&NoUseMonitor.instance(); } ();
511 }
512
513 bool tryLock() nothrow { return m_impl.tryLock(); }
514 void lock() { m_impl.lock(); }
515 void unlock() nothrow { m_impl.unlock(); }
516 }
517
518 unittest {
519 runMutexUnitTests!InterruptibleRecursiveTaskMutex();
520 }
521
522
523 // Helper class to ensure that the non Object.Monitor compatible interruptible
524 // mutex classes are not accidentally used with the `synchronized` statement
525 private final class NoUseMonitor : Object.Monitor {
526 private static shared Proxy st_instance;
527
528 static struct Proxy {
529 Object.Monitor monitor;
530 }
531
532 static @property ref shared(Proxy) instance()
533 @safe nothrow {
534 static shared(Proxy)* inst = null;
535 if (inst) return *inst;
536
537 () @trusted { // synchronized {} not @safe for DMD <= 2.078.3
538 synchronized {
539 if (!st_instance.monitor)
540 st_instance.monitor = new shared NoUseMonitor;
541 inst = &st_instance;
542 }
543 } ();
544
545 return *inst;
546 }
547
548 override void lock() @safe @nogc nothrow {
549 assert(false, "Interruptible task mutexes cannot be used with synchronized(), use scopedMutexLock instead.");
550 }
551
552 override void unlock() @safe @nogc nothrow {}
553 }
554
555
556 private void runMutexUnitTests(M)()
557 {
558 import vibe.core.core;
559
560 auto m = new M;
561 Task t1, t2;
562 void runContendedTasks(bool interrupt_t1, bool interrupt_t2) {
563 assert(!m.m_impl.m_locked);
564
565 // t1 starts first and acquires the mutex for 20 ms
566 // t2 starts second and has to wait in m.lock()
567 t1 = runTask(() nothrow {
568 assert(!m.m_impl.m_locked);
569 try m.lock();
570 catch (Exception e) assert(false, e.msg);
571 assert(m.m_impl.m_locked);
572 if (interrupt_t1) {
573 try assertThrown!InterruptException(sleep(100.msecs));
574 catch (Exception e) assert(false, e.msg);
575 } else assertNotThrown(sleep(20.msecs));
576 m.unlock();
577 });
578 t2 = runTask(() nothrow {
579 assert(!m.tryLock());
580 if (interrupt_t2) {
581 try m.lock();
582 catch (InterruptException) return;
583 catch (Exception e) assert(false, e.msg);
584 try yield(); // rethrows any deferred exceptions
585 catch (InterruptException) {
586 m.unlock();
587 return;
588 }
589 catch (Exception e) assert(false, e.msg);
590 assert(false, "Supposed to have thrown an InterruptException.");
591 } else assertNotThrown(m.lock());
592 assert(m.m_impl.m_locked);
593 try sleep(20.msecs);
594 catch (Exception e) assert(false, e.msg);
595 m.unlock();
596 assert(!m.m_impl.m_locked);
597 });
598 }
599
600 // basic lock test
601 m.performLocked!({
602 assert(m.m_impl.m_locked);
603 });
604 assert(!m.m_impl.m_locked);
605
606 // basic contention test
607 runContendedTasks(false, false);
608 auto t3 = runTask({
609 assert(t1.running && t2.running);
610 assert(m.m_impl.m_locked);
611 t1.joinUninterruptible();
612 assert(!t1.running && t2.running);
613 try yield(); // give t2 a chance to take the lock
614 catch (Exception e) assert(false, e.msg);
615 assert(m.m_impl.m_locked);
616 t2.joinUninterruptible();
617 assert(!t2.running);
618 assert(!m.m_impl.m_locked);
619 exitEventLoop();
620 });
621 runEventLoop();
622 assert(!t3.running);
623 assert(!m.m_impl.m_locked);
624
625 // interruption test #1
626 runContendedTasks(true, false);
627 t3 = runTask({
628 assert(t1.running && t2.running);
629 assert(m.m_impl.m_locked);
630 t1.interrupt();
631 t1.joinUninterruptible();
632 assert(!t1.running && t2.running);
633 try yield(); // give t2 a chance to take the lock
634 catch (Exception e) assert(false, e.msg);
635 assert(m.m_impl.m_locked);
636 t2.joinUninterruptible();
637 assert(!t2.running);
638 assert(!m.m_impl.m_locked);
639 exitEventLoop();
640 });
641 runEventLoop();
642 assert(!t3.running);
643 assert(!m.m_impl.m_locked);
644
645 // interruption test #2
646 runContendedTasks(false, true);
647 t3 = runTask({
648 assert(t1.running && t2.running);
649 assert(m.m_impl.m_locked);
650 t2.interrupt();
651 t2.joinUninterruptible();
652 assert(!t2.running);
653 static if (is(M == InterruptibleTaskMutex) || is (M == InterruptibleRecursiveTaskMutex))
654 assert(t1.running && m.m_impl.m_locked);
655 t1.joinUninterruptible();
656 assert(!t1.running);
657 assert(!m.m_impl.m_locked);
658 exitEventLoop();
659 });
660 runEventLoop();
661 assert(!t3.running);
662 assert(!m.m_impl.m_locked);
663 }
664
665
666 /**
667 Event loop based condition variable or "event" implementation.
668
669 This class can be used in exchange for a $(D core.sync.condition.Condition)
670 to avoid blocking the event loop when waiting.
671
672 Notice:
673 Because this class is annotated nothrow, it cannot be interrupted
674 using $(D vibe.core.task.Task.interrupt()). The corresponding
675 $(D InterruptException) will be deferred until the next blocking
676 operation yields to the event loop.
677
678 Use $(D InterruptibleTaskCondition) as an alternative that can be
679 interrupted.
680
681 Note that it is generally not safe to use a `TaskCondition` together with an
682 interruptible mutex type.
683
684 See_Also: `InterruptibleTaskCondition`
685 */
686 final class TaskCondition : core.sync.condition.Condition {
687 @safe:
688
689 private TaskConditionImpl!(false, Mutex) m_impl;
690
691 this(core.sync.mutex.Mutex mtx)
692 nothrow {
693 assert(mtx.classinfo is Mutex.classinfo || mtx.classinfo is TaskMutex.classinfo,
694 "TaskCondition can only be used with Mutex or TaskMutex");
695
696 m_impl.setup(mtx);
697 super(mtx);
698 }
699 override @property Mutex mutex() nothrow { return m_impl.mutex; }
700 override void wait() nothrow { m_impl.wait(); }
701 override bool wait(Duration timeout) nothrow { return m_impl.wait(timeout); }
702 override void notify() nothrow { m_impl.notify(); }
703 override void notifyAll() nothrow { m_impl.notifyAll(); }
704 }
705
706 unittest {
707 new TaskCondition(new Mutex);
708 new TaskCondition(new TaskMutex);
709 }
710
711
712 /** This example shows the typical usage pattern using a `while` loop to make
713 sure that the final condition is reached.
714 */
715 unittest {
716 import vibe.core.core;
717 import vibe.core.log;
718
719 __gshared Mutex mutex;
720 __gshared TaskCondition condition;
721 __gshared int workers_still_running = 0;
722
723 // setup the task condition
724 mutex = new Mutex;
725 condition = new TaskCondition(mutex);
726
727 logDebug("SETTING UP TASKS");
728
729 // start up the workers and count how many are running
730 foreach (i; 0 .. 4) {
731 workers_still_running++;
732 runWorkerTask(() nothrow {
733 // simulate some work
734 try sleep(100.msecs);
735 catch (Exception e) {}
736
737 // notify the waiter that we're finished
738 {
739 auto l = scopedMutexLock(mutex);
740 workers_still_running--;
741 logDebug("DECREMENT %s", workers_still_running);
742 }
743 logDebug("NOTIFY");
744 condition.notify();
745 });
746 }
747
748 logDebug("STARTING WAIT LOOP");
749
750 // wait until all tasks have decremented the counter back to zero
751 synchronized (mutex) {
752 while (workers_still_running > 0) {
753 logDebug("STILL running %s", workers_still_running);
754 condition.wait();
755 }
756 }
757 }
758
759
760 /**
761 Alternative to `TaskCondition` that supports interruption.
762
763 This class supports the use of `vibe.core.task.Task.interrupt()` while
764 waiting in the `wait()` method.
765
766 See `TaskCondition` for an example.
767
768 Notice:
769 Note that it is generally not safe to use an
770 `InterruptibleTaskCondition` together with an interruptible mutex type.
771
772 See_Also: `TaskCondition`
773 */
774 final class InterruptibleTaskCondition {
775 @safe:
776
777 private TaskConditionImpl!(true, Lockable) m_impl;
778
779 this(M)(M mutex)
780 if (is(M : Mutex) || is (M : Lockable))
781 {
782 static if (is(M : Lockable))
783 m_impl.setup(mutex);
784 else
785 m_impl.setupForMutex(mutex);
786 }
787
788 @property Lockable mutex() { return m_impl.mutex; }
789 void wait() { m_impl.wait(); }
790 bool wait(Duration timeout) { return m_impl.wait(timeout); }
791 void notify() nothrow { m_impl.notify(); }
792 void notifyAll() nothrow { m_impl.notifyAll(); }
793 }
794
795 unittest {
796 new InterruptibleTaskCondition(new Mutex);
797 new InterruptibleTaskCondition(new TaskMutex);
798 new InterruptibleTaskCondition(new InterruptibleTaskMutex);
799 }
800
801
802 /** A manually triggered single threaded cross-task event.
803
804 Note: the ownership can be shared between multiple fibers of the same thread.
805 */
806 struct LocalManualEvent {
807 import core.thread : Thread;
808 import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny;
809
810 @safe:
811
812 private {
813 alias Waiter = ThreadLocalWaiter!false;
814
815 Waiter m_waiter;
816 }
817
818 // thread destructor in vibe.core.core will decrement the ref. count
819 package static EventID ms_threadEvent;
820
821 private void initialize()
822 nothrow {
823 import vibe.internal.allocator : Mallocator, makeGCSafe;
824 m_waiter = () @trusted { return Mallocator.instance.makeGCSafe!Waiter; } ();
825 }
826
827 this(this)
828 nothrow {
829 if (m_waiter)
830 return m_waiter.addRef();
831 }
832
833 ~this()
834 nothrow {
835 import vibe.internal.allocator : Mallocator, disposeGCSafe;
836 if (m_waiter) {
837 if (!m_waiter.releaseRef()) {
838 static if (__VERSION__ < 2087) scope (failure) assert(false);
839 () @trusted { Mallocator.instance.disposeGCSafe(m_waiter); } ();
840 }
841 }
842 }
843
844 bool opCast (T : bool) () const nothrow { return m_waiter !is null; }
845
846 /// A counter that is increased with every emit() call
847 int emitCount() const nothrow { return m_waiter.m_emitCount; }
848
849 /// Emits the signal, waking up all owners of the signal.
850 int emit()
851 nothrow {
852 assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()");
853 logTrace("unshared emit");
854 auto ec = m_waiter.m_emitCount++;
855 m_waiter.emit();
856 return ec;
857 }
858
859 /// Emits the signal, waking up a single owners of the signal.
860 int emitSingle()
861 nothrow {
862 assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()");
863 logTrace("unshared single emit");
864 auto ec = m_waiter.m_emitCount++;
865 m_waiter.emitSingle();
866 return ec;
867 }
868
869 /** Acquires ownership and waits until the signal is emitted.
870
871 Note that in order not to miss any emits it is necessary to use the
872 overload taking an integer.
873
874 Throws:
875 May throw an $(D InterruptException) if the task gets interrupted
876 using $(D Task.interrupt()).
877 */
878 int wait() { return wait(this.emitCount); }
879
880 /** Acquires ownership and waits until the signal is emitted and the emit
881 count is larger than a given one.
882
883 Throws:
884 May throw an $(D InterruptException) if the task gets interrupted
885 using $(D Task.interrupt()).
886 */
887 int wait(int emit_count) { return doWait!true(Duration.max, emit_count); }
888 /// ditto
889 int wait(Duration timeout, int emit_count) { return doWait!true(timeout, emit_count); }
890
891 /** Same as $(D wait), but defers throwing any $(D InterruptException).
892
893 This method is annotated $(D nothrow) at the expense that it cannot be
894 interrupted.
895 */
896 int waitUninterruptible() nothrow { return waitUninterruptible(this.emitCount); }
897 /// ditto
898 int waitUninterruptible(int emit_count) nothrow { return doWait!false(Duration.max, emit_count); }
899 /// ditto
900 int waitUninterruptible(Duration timeout, int emit_count) nothrow { return doWait!false(timeout, emit_count); }
901
902 private int doWait(bool interruptible)(Duration timeout, int emit_count)
903 {
904 import core.time : MonoTime;
905
906 assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()");
907
908 MonoTime target_timeout, now;
909 if (timeout != Duration.max) {
910 try now = MonoTime.currTime();
911 catch (Exception e) { assert(false, e.msg); }
912 target_timeout = now + timeout;
913 }
914
915 while (m_waiter.m_emitCount - emit_count <= 0) {
916 m_waiter.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max);
917 try now = MonoTime.currTime();
918 catch (Exception e) { assert(false, e.msg); }
919 if (now >= target_timeout) break;
920 }
921
922 return m_waiter.m_emitCount;
923 }
924 }
925
926 unittest {
927 import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep;
928
929 auto e = createManualEvent();
930 auto w1 = runTask({ e.waitUninterruptible(100.msecs, e.emitCount); });
931 auto w2 = runTask({ e.waitUninterruptible(500.msecs, e.emitCount); });
932 runTask({
933 try sleep(50.msecs);
934 catch (Exception e) assert(false, e.msg);
935 e.emit();
936 try sleep(50.msecs);
937 catch (Exception e) assert(false, e.msg);
938 assert(!w1.running && !w2.running);
939 exitEventLoop();
940 });
941 runEventLoop();
942 }
943
944 unittest {
945 import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep;
946 auto e = createManualEvent();
947 // integer overflow test
948 e.m_waiter.m_emitCount = int.max;
949 auto w1 = runTask({ e.waitUninterruptible(50.msecs, e.emitCount); });
950 runTask({
951 try sleep(5.msecs);
952 catch (Exception e) assert(false, e.msg);
953 e.emit();
954 try sleep(50.msecs);
955 catch (Exception e) assert(false, e.msg);
956 assert(!w1.running);
957 exitEventLoop();
958 });
959 runEventLoop();
960 }
961
962 unittest { // ensure that cancelled waiters are properly handled and that a FIFO order is implemented
963 import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep;
964
965 LocalManualEvent l = createManualEvent();
966
967 Task t2;
968 runTask({
969 l.waitUninterruptible();
970 t2.interrupt();
971 try sleep(20.msecs);
972 catch (Exception e) assert(false, e.msg);
973 exitEventLoop();
974 });
975 t2 = runTask({
976 try {
977 l.wait();
978 assert(false, "Shouldn't reach this.");
979 }
980 catch (InterruptException e) {}
981 catch (Exception e) assert(false, e.msg);
982 });
983 runTask({
984 l.emit();
985 });
986 runEventLoop();
987 }
988
989 unittest { // ensure that LocalManualEvent behaves correctly after being copied
990 import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep;
991
992 LocalManualEvent l = createManualEvent();
993 runTask(() nothrow {
994 auto lc = l;
995 try sleep(100.msecs);
996 catch (Exception e) assert(false, e.msg);
997 lc.emit();
998 });
999 runTask({
1000 assert(l.waitUninterruptible(1.seconds, l.emitCount));
1001 exitEventLoop();
1002 });
1003 runEventLoop();
1004 }
1005
1006
1007 /** A manually triggered multi threaded cross-task event.
1008
1009 Note: the ownership can be shared between multiple fibers and threads.
1010 */
1011 struct ManualEvent {
1012 import core.thread : Thread;
1013 import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny;
1014 import vibe.internal.list : StackSList;
1015
1016 @safe:
1017
1018 private {
1019 alias ThreadWaiter = ThreadLocalWaiter!true;
1020
1021 int m_emitCount;
1022 static struct Waiters {
1023 StackSList!ThreadWaiter active; // actively waiting
1024 StackSList!ThreadWaiter free; // free-list of reusable waiter structs
1025 }
1026 Monitor!(Waiters, shared(Mutex)) m_waiters;
1027 }
1028
1029 // thread destructor in vibe.core.core will decrement the ref. count
1030 package static EventID ms_threadEvent;
1031
1032 enum EmitMode {
1033 single,
1034 all
1035 }
1036
1037 @disable this(this);
1038
1039 private void initialize()
1040 shared nothrow {
1041 m_waiters = createMonitor!(ManualEvent.Waiters)(new shared Mutex);
1042 }
1043
1044 deprecated("ManualEvent is always non-null!")
1045 bool opCast (T : bool) () const shared nothrow { return true; }
1046
1047 /// A counter that is increased with every emit() call
1048 int emitCount() const shared nothrow @trusted { return atomicLoad(m_emitCount); }
1049
1050 /// Emits the signal, waking up all owners of the signal.
1051 int emit()
1052 shared nothrow @trusted {
1053 import core.atomic : atomicOp, cas;
1054
1055 debug (VibeMutexLog) () @trusted { logTrace("emit shared %s", cast(void*)&this); } ();
1056
1057 auto ec = atomicOp!"+="(m_emitCount, 1);
1058 auto thisthr = Thread.getThis();
1059
1060 ThreadWaiter lw;
1061 auto drv = eventDriver;
1062 m_waiters.lock.active.filter((ThreadWaiter w) {
1063 debug (VibeMutexLog) () @trusted { logTrace("waiter %s", cast(void*)w); } ();
1064 if (w.m_driver is drv) {
1065 lw = w;
1066 lw.addRef();
1067 } else {
1068 try {
1069 assert(w.m_event != EventID.init);
1070 () @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true);
1071 } catch (Exception e) assert(false, e.msg);
1072 }
1073 return true;
1074 });
1075 debug (VibeMutexLog) () @trusted { logTrace("lw %s", cast(void*)lw); } ();
1076 if (lw) {
1077 lw.emit();
1078 releaseWaiter(lw);
1079 }
1080
1081 debug (VibeMutexLog) logTrace("emit shared done");
1082
1083 return ec;
1084 }
1085
1086 /// Emits the signal, waking up at least one waiting task
1087 int emitSingle()
1088 shared nothrow @trusted {
1089 import core.atomic : atomicOp, cas;
1090
1091 () @trusted { logTrace("emit shared single %s", cast(void*)&this); } ();
1092
1093 auto ec = atomicOp!"+="(m_emitCount, 1);
1094 auto thisthr = Thread.getThis();
1095
1096 ThreadWaiter lw;
1097 auto drv = eventDriver;
1098 m_waiters.lock.active.iterate((ThreadWaiter w) {
1099 () @trusted { logTrace("waiter %s", cast(void*)w); } ();
1100 if (w.m_driver is drv) {
1101 if (w.unused) return true;
1102 lw = w;
1103 lw.addRef();
1104 } else {
1105 try {
1106 assert(w.m_event != EventID.invalid);
1107 () @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true);
1108 } catch (Exception e) assert(false, e.msg);
1109 }
1110 return false;
1111 });
1112 () @trusted { logTrace("lw %s", cast(void*)lw); } ();
1113 if (lw) {
1114 lw.emitSingle();
1115 releaseWaiter(lw);
1116 }
1117
1118 logTrace("emit shared done");
1119
1120 return ec;
1121 }
1122
1123 /** Acquires ownership and waits until the signal is emitted.
1124
1125 Note that in order not to miss any emits it is necessary to use the
1126 overload taking an integer.
1127
1128 Throws:
1129 May throw an $(D InterruptException) if the task gets interrupted
1130 using $(D Task.interrupt()).
1131 */
1132 int wait() shared { return wait(this.emitCount); }
1133
1134 /** Acquires ownership and waits until the emit count differs from the
1135 given one or until a timeout is reached.
1136
1137 Throws:
1138 May throw an $(D InterruptException) if the task gets interrupted
1139 using $(D Task.interrupt()).
1140 */
1141 int wait(int emit_count) shared { return doWaitShared!true(Duration.max, emit_count); }
1142 /// ditto
1143 int wait(Duration timeout, int emit_count) shared { return doWaitShared!true(timeout, emit_count); }
1144
1145 /** Same as $(D wait), but defers throwing any $(D InterruptException).
1146
1147 This method is annotated $(D nothrow) at the expense that it cannot be
1148 interrupted.
1149 */
1150 int waitUninterruptible() shared nothrow { return waitUninterruptible(this.emitCount); }
1151 /// ditto
1152 int waitUninterruptible(int emit_count) shared nothrow { return doWaitShared!false(Duration.max, emit_count); }
1153 /// ditto
1154 int waitUninterruptible(Duration timeout, int emit_count) shared nothrow { return doWaitShared!false(timeout, emit_count); }
1155
1156 private int doWaitShared(bool interruptible)(Duration timeout, int emit_count)
1157 shared {
1158 import core.time : MonoTime;
1159
1160 () @trusted { logTrace("wait shared %s", cast(void*)&this); } ();
1161
1162 if (ms_threadEvent is EventID.invalid) {
1163 ms_threadEvent = eventDriver.events.create();
1164 assert(ms_threadEvent != EventID.invalid, "Failed to create event!");
1165 }
1166
1167 MonoTime target_timeout, now;
1168 if (timeout != Duration.max) {
1169 try now = MonoTime.currTime();
1170 catch (Exception e) { assert(false, e.msg); }
1171 target_timeout = now + timeout;
1172 }
1173
1174 int ec = this.emitCount;
1175
1176 acquireThreadWaiter((scope ThreadWaiter w) {
1177 while (ec - emit_count <= 0) {
1178 w.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, ms_threadEvent, () => (this.emitCount - emit_count) > 0);
1179 ec = this.emitCount;
1180
1181 if (timeout != Duration.max) {
1182 try now = MonoTime.currTime();
1183 catch (Exception e) { assert(false, e.msg); }
1184 if (now >= target_timeout) break;
1185 }
1186 }
1187 });
1188
1189 return ec;
1190 }
1191
1192 private void acquireThreadWaiter(DEL)(scope DEL del)
1193 shared {
1194 import vibe.internal.allocator : processAllocator, makeGCSafe;
1195
1196 ThreadWaiter w;
1197 auto drv = eventDriver;
1198
1199 with (m_waiters.lock) {
1200 active.iterate((aw) {
1201 if (aw.m_driver is drv) {
1202 w = aw;
1203 w.addRef();
1204 return false;
1205 }
1206 return true;
1207 });
1208
1209 if (!w) {
1210 free.filter((fw) {
1211 if (fw.m_driver is drv) {
1212 w = fw;
1213 w.addRef();
1214 return false;
1215 }
1216 return true;
1217 });
1218
1219 if (!w) {
1220 () @trusted {
1221 try {
1222 w = processAllocator.makeGCSafe!ThreadWaiter;
1223 w.m_driver = drv;
1224 w.m_event = ms_threadEvent;
1225 } catch (Exception e) {
1226 assert(false, "Failed to allocate thread waiter.");
1227 }
1228 } ();
1229 }
1230
1231 assert(w.m_refCount == 1);
1232 active.add(w);
1233 }
1234 }
1235
1236 scope (exit) releaseWaiter(w);
1237
1238 del(w);
1239 }
1240
1241 private void releaseWaiter(ThreadWaiter w)
1242 shared nothrow {
1243 if (!w.releaseRef()) {
1244 assert(w.m_driver is eventDriver, "Waiter was reassigned a different driver!?");
1245 assert(w.unused, "Waiter still used, but not referenced!?");
1246 with (m_waiters.lock) {
1247 auto rmvd = active.remove(w);
1248 assert(rmvd, "Waiter not in active queue anymore!?");
1249 free.add(w);
1250 // TODO: cap size of m_freeWaiters
1251 }
1252 }
1253 }
1254 }
1255
1256 unittest {
1257 import vibe.core.core : exitEventLoop, runEventLoop, runTask, runWorkerTaskH, sleep;
1258
1259 auto e = createSharedManualEvent();
1260 auto w1 = runTask({ e.waitUninterruptible(100.msecs, e.emitCount); });
1261 static void w(shared(ManualEvent)* e) { e.waitUninterruptible(500.msecs, e.emitCount); }
1262 auto w2 = runWorkerTaskH(&w, &e);
1263 runTask({
1264 try sleep(50.msecs);
1265 catch (Exception e) assert(false, e.msg);
1266 e.emit();
1267 try sleep(50.msecs);
1268 catch (Exception e) assert(false, e.msg);
1269 assert(!w1.running && !w2.running);
1270 exitEventLoop();
1271 });
1272 runEventLoop();
1273 }
1274
1275 unittest {
1276 import vibe.core.core : runTask, runWorkerTaskH, setTimer, sleep;
1277 import vibe.core.taskpool : TaskPool;
1278 import core.time : msecs, usecs;
1279 import std.concurrency : send, receiveOnly;
1280 import std.random : uniform;
1281
1282 auto tpool = new shared TaskPool(4);
1283 scope (exit) tpool.terminate();
1284
1285 static void test(shared(ManualEvent)* evt, Task owner)
1286 nothrow {
1287 try owner.tid.send(Task.getThis());
1288 catch (Exception e) assert(false, e.msg);
1289
1290 int ec = evt.emitCount;
1291 auto thist = Task.getThis();
1292 auto tm = setTimer(500.msecs, { thist.interrupt(); }); // watchdog
1293 scope (exit) tm.stop();
1294 while (ec < 5_000) {
1295 tm.rearm(500.msecs);
1296 try sleep(uniform(0, 10_000).usecs);
1297 catch (Exception e) assert(false, e.msg);
1298 try if (uniform(0, 10) == 0) evt.emit();
1299 catch (Exception e) assert(false, e.msg);
1300 auto ecn = evt.waitUninterruptible(ec);
1301 assert(ecn > ec);
1302 ec = ecn;
1303 }
1304 }
1305
1306 auto watchdog = setTimer(30.seconds, { assert(false, "ManualEvent test has hung."); });
1307 scope (exit) watchdog.stop();
1308
1309 auto e = createSharedManualEvent();
1310 Task[] tasks;
1311
1312 runTask(() nothrow {
1313 auto thist = Task.getThis();
1314
1315 // start 25 tasks in each thread
1316 foreach (i; 0 .. 25) tpool.runTaskDist(&test, &e, thist);
1317 // collect all task handles
1318 try foreach (i; 0 .. 4*25) tasks ~= receiveOnly!Task;
1319 catch (Exception e) assert(false, e.msg);
1320
1321 auto tm = setTimer(500.msecs, { thist.interrupt(); }); // watchdog
1322 scope (exit) tm.stop();
1323 int pec = 0;
1324 while (e.emitCount < 5_000) {
1325 tm.rearm(500.msecs);
1326 try sleep(50.usecs);
1327 catch (Exception e) assert(false, e.msg);
1328 e.emit();
1329 }
1330
1331 // wait for all worker tasks to finish
1332 foreach (t; tasks) t.joinUninterruptible();
1333 }).join();
1334 }
1335
1336 package shared struct Monitor(T, M)
1337 {
1338 alias Mutex = M;
1339 alias Data = T;
1340 private {
1341 Mutex mutex;
1342 Data data;
1343 }
1344
1345 static struct Locked {
1346 shared(Monitor)* m;
1347 @disable this(this);
1348 ~this() {
1349 () @trusted {
1350 static if (is(typeof(Mutex.init.unlock_nothrow())))
1351 (cast(Mutex)m.mutex).unlock_nothrow();
1352 else (cast(Mutex)m.mutex).unlock();
1353 } ();
1354 }
1355 ref inout(Data) get() inout @trusted { return *cast(inout(Data)*)&m.data; }
1356 alias get this;
1357 }
1358
1359 Locked lock() {
1360 () @trusted {
1361 static if (is(typeof(Mutex.init.lock_nothrow())))
1362 (cast(Mutex)mutex).lock_nothrow();
1363 else (cast(Mutex)mutex).lock();
1364 } ();
1365 return Locked(() @trusted { return &this; } ());
1366 }
1367
1368 const(Locked) lock() const {
1369 () @trusted {
1370 static if (is(typeof(Mutex.init.lock_nothrow())))
1371 (cast(Mutex)mutex).lock_nothrow();
1372 else (cast(Mutex)mutex).lock();
1373 } ();
1374 return const(Locked)(() @trusted { return &this; } ());
1375 }
1376 }
1377
1378
1379 package shared(Monitor!(T, M)) createMonitor(T, M)(M mutex)
1380 @trusted {
1381 shared(Monitor!(T, M)) ret;
1382 ret.mutex = cast(shared)mutex;
1383 return ret;
1384 }
1385
1386
1387 private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) {
1388 import vibe.internal.list : CircularDList;
1389
1390 private {
1391 static struct TaskWaiter {
1392 TaskWaiter* prev, next;
1393 void delegate() @safe nothrow notifier;
1394
1395 void wait(void delegate() @safe nothrow del) @safe nothrow {
1396 assert(notifier is null, "Local waiter is used twice!");
1397 notifier = del;
1398 }
1399 void cancel() @safe nothrow { notifier = null; }
1400 void emit() @safe nothrow { auto n = notifier; notifier = null; n(); }
1401 }
1402
1403 static if (EVENT_TRIGGERED) {
1404 package(vibe) ThreadLocalWaiter next; // queue of other waiters of the same thread
1405 NativeEventDriver m_driver;
1406 EventID m_event = EventID.invalid;
1407 } else {
1408 int m_emitCount = 0;
1409 }
1410 int m_refCount = 1;
1411 TaskWaiter m_pivot;
1412 TaskWaiter m_emitPivot;
1413 CircularDList!(TaskWaiter*) m_waiters;
1414 }
1415
1416 this()
1417 {
1418 m_waiters = CircularDList!(TaskWaiter*)(() @trusted { return &m_pivot; } ());
1419 }
1420
1421 static if (EVENT_TRIGGERED) {
1422 ~this()
1423 {
1424 import vibe.core.internal.release : releaseHandle;
1425
1426 if (m_event != EventID.invalid)
1427 releaseHandle!"events"(m_event, () @trusted { return cast(shared)m_driver; } ());
1428 }
1429 }
1430
1431 @property bool unused() const @safe nothrow { return m_waiters.empty; }
1432
1433 void addRef() @safe nothrow { assert(m_refCount >= 0); m_refCount++; }
1434 bool releaseRef() @safe nothrow { assert(m_refCount > 0); return --m_refCount > 0; }
1435
1436 bool wait(bool interruptible)(Duration timeout, EventID evt = EventID.invalid, scope bool delegate() @safe nothrow exit_condition = null)
1437 @safe {
1438 import core.time : MonoTime;
1439 import vibe.internal.async : Waitable, asyncAwaitAny;
1440
1441 TaskWaiter waiter_store;
1442 TaskWaiter* waiter = () @trusted { return &waiter_store; } ();
1443
1444 m_waiters.insertBack(waiter);
1445 assert(waiter.next !is null);
1446 scope (exit)
1447 if (waiter.next !is null) {
1448 m_waiters.remove(waiter);
1449 assert(!waiter.next);
1450 }
1451
1452 MonoTime target_timeout, now;
1453 if (timeout != Duration.max) {
1454 try now = MonoTime.currTime();
1455 catch (Exception e) { assert(false, e.msg); }
1456 target_timeout = now + timeout;
1457 }
1458
1459 bool cancelled;
1460
1461 alias waitable = Waitable!(typeof(TaskWaiter.notifier),
1462 (cb) { waiter.wait(cb); },
1463 (cb) { cancelled = true; waiter.cancel(); },
1464 () {}
1465 );
1466
1467 alias ewaitable = Waitable!(EventCallback,
1468 (cb) {
1469 eventDriver.events.wait(evt, cb);
1470 // check for exit condition *after* starting to wait for the event
1471 // to avoid a race condition
1472 if (exit_condition()) {
1473 eventDriver.events.cancelWait(evt, cb);
1474 cb(evt);
1475 }
1476 },
1477 (cb) { eventDriver.events.cancelWait(evt, cb); },
1478 (EventID) {}
1479 );
1480
1481 if (evt != EventID.invalid) {
1482 asyncAwaitAny!(interruptible, waitable, ewaitable)(timeout);
1483 } else {
1484 asyncAwaitAny!(interruptible, waitable)(timeout);
1485 }
1486
1487 if (cancelled) {
1488 assert(waiter.next !is null, "Cancelled waiter not in queue anymore!?");
1489 return false;
1490 } else {
1491 assert(waiter.next is null, "Triggered waiter still in queue!?");
1492 return true;
1493 }
1494 }
1495
1496 void emit()
1497 @safe nothrow {
1498 import std.algorithm.mutation : swap;
1499 import vibe.core.core : yield;
1500
1501 if (m_waiters.empty) return;
1502
1503 TaskWaiter* pivot = () @trusted { return &m_emitPivot; } ();
1504
1505 if (pivot.next) { // another emit in progress?
1506 // shift pivot to the end, so that the other emit call will process all pending waiters
1507 if (pivot !is m_waiters.back) {
1508 m_waiters.remove(pivot);
1509 m_waiters.insertBack(pivot);
1510 }
1511 return;
1512 }
1513
1514 m_waiters.insertBack(pivot);
1515 scope (exit) m_waiters.remove(pivot);
1516
1517 foreach (w; m_waiters) {
1518 if (w is pivot) break;
1519 emitWaiter(w);
1520 }
1521 }
1522
1523 bool emitSingle()
1524 @safe nothrow {
1525 if (m_waiters.empty) return false;
1526
1527 TaskWaiter* pivot = () @trusted { return &m_emitPivot; } ();
1528
1529 if (pivot.next) { // another emit in progress?
1530 // shift pivot to the right, so that the other emit call will process another waiter
1531 if (pivot !is m_waiters.back) {
1532 auto n = pivot.next;
1533 m_waiters.remove(pivot);
1534 m_waiters.insertAfter(pivot, n);
1535 }
1536 return true;
1537 }
1538
1539 emitWaiter(m_waiters.front);
1540 return true;
1541 }
1542
1543 private void emitWaiter(TaskWaiter* w)
1544 @safe nothrow {
1545 m_waiters.remove(w);
1546
1547 if (w.notifier !is null) {
1548 logTrace("notify task %s %s %s", cast(void*)w, () @trusted { return cast(void*)w.notifier.funcptr; } (), w.notifier.ptr);
1549 w.emit();
1550 } else logTrace("notify callback is null");
1551 }
1552 }
1553
1554 private struct TaskMutexImpl(bool INTERRUPTIBLE) {
1555 private {
1556 shared(bool) m_locked = false;
1557 shared(uint) m_waiters = 0;
1558 shared(ManualEvent) m_signal;
1559 debug Task m_owner;
1560 }
1561
1562 void setup()
1563 {
1564 m_signal.initialize();
1565 }
1566
1567 @trusted bool tryLock()
1568 nothrow {
1569 if (cas(&m_locked, false, true)) {
1570 debug m_owner = Task.getThis();
1571 debug(VibeMutexLog) logTrace("mutex %s lock %s", cast(void*)&this, atomicLoad(m_waiters));
1572 return true;
1573 }
1574 return false;
1575 }
1576
1577 @trusted bool lock(Duration timeout = Duration.max)
1578 {
1579 if (tryLock()) return true;
1580 debug assert(m_owner == Task() || m_owner != Task.getThis(), "Recursive mutex lock.");
1581 atomicOp!"+="(m_waiters, 1);
1582 debug(VibeMutexLog) logTrace("mutex %s wait %s", cast(void*)&this, atomicLoad(m_waiters));
1583 scope(exit) atomicOp!"-="(m_waiters, 1);
1584 auto ecnt = m_signal.emitCount();
1585 MonoTime target = MonoTime.currTime + timeout;
1586 while (!tryLock()) {
1587 auto now = MonoTime.currTime;
1588 if (timeout != Duration.max && now >= target)
1589 return false;
1590
1591 auto remaining = timeout != Duration.max ? target - now : Duration.max;
1592 static if (INTERRUPTIBLE) ecnt = m_signal.wait(remaining, ecnt);
1593 else ecnt = m_signal.waitUninterruptible(remaining, ecnt);
1594 }
1595 return true;
1596 }
1597
1598 @trusted void unlock()
1599 {
1600 assert(m_locked);
1601 debug {
1602 assert(m_owner == Task.getThis());
1603 m_owner = Task();
1604 }
1605 atomicStore!(MemoryOrder.rel)(m_locked, false);
1606 debug(VibeMutexLog) logTrace("mutex %s unlock %s", cast(void*)&this, atomicLoad(m_waiters));
1607 if (atomicLoad(m_waiters) > 0)
1608 m_signal.emit();
1609 }
1610 }
1611
1612 private struct RecursiveTaskMutexImpl(bool INTERRUPTIBLE) {
1613 import std.stdio;
1614 private {
1615 core.sync.mutex.Mutex m_mutex;
1616 Task m_owner;
1617 size_t m_recCount = 0;
1618 shared(uint) m_waiters = 0;
1619 shared(ManualEvent) m_signal;
1620 @property bool m_locked() const { return m_recCount > 0; }
1621 }
1622
1623 void setup()
1624 {
1625 m_mutex = new core.sync.mutex.Mutex;
1626 m_signal.initialize();
1627 }
1628
1629 @trusted bool tryLock()
1630 nothrow {
1631 auto self = Task.getThis();
1632 return m_mutex.performLocked!({
1633 if (!m_owner) {
1634 assert(m_recCount == 0);
1635 m_recCount = 1;
1636 m_owner = self;
1637 return true;
1638 } else if (m_owner == self) {
1639 m_recCount++;
1640 return true;
1641 }
1642 return false;
1643 });
1644 }
1645
1646 @trusted void lock()
1647 {
1648 if (tryLock()) return;
1649 atomicOp!"+="(m_waiters, 1);
1650 debug(VibeMutexLog) logTrace("mutex %s wait %s", cast(void*)&this, atomicLoad(m_waiters));
1651 scope(exit) atomicOp!"-="(m_waiters, 1);
1652 auto ecnt = m_signal.emitCount();
1653 while (!tryLock()) {
1654 static if (INTERRUPTIBLE) ecnt = m_signal.wait(ecnt);
1655 else ecnt = m_signal.waitUninterruptible(ecnt);
1656 }
1657 }
1658
1659 @trusted void unlock()
1660 {
1661 auto self = Task.getThis();
1662 m_mutex.performLocked!({
1663 assert(m_owner == self);
1664 assert(m_recCount > 0);
1665 m_recCount--;
1666 if (m_recCount == 0) {
1667 m_owner = Task.init;
1668 }
1669 });
1670 debug(VibeMutexLog) logTrace("mutex %s unlock %s", cast(void*)&this, atomicLoad(m_waiters));
1671 if (atomicLoad(m_waiters) > 0)
1672 m_signal.emit();
1673 }
1674 }
1675
1676 private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) {
1677 private {
1678 LOCKABLE m_mutex;
1679 static if (is(LOCKABLE == Mutex))
1680 TaskMutex m_taskMutex;
1681 shared(ManualEvent) m_signal;
1682 }
1683
1684 static if (is(LOCKABLE == Lockable)) {
1685 final class MutexWrapper : Lockable {
1686 private core.sync.mutex.Mutex m_mutex;
1687 this(core.sync.mutex.Mutex mtx) { m_mutex = mtx; }
1688 @trusted void lock() { m_mutex.lock(); }
1689 @trusted void unlock() { m_mutex.unlock(); }
1690 @trusted bool tryLock() { return m_mutex.tryLock(); }
1691 }
1692
1693 void setupForMutex(core.sync.mutex.Mutex mtx)
1694 {
1695 setup(new MutexWrapper(mtx));
1696 }
1697 }
1698
1699 @disable this(this);
1700
1701 void setup(LOCKABLE mtx)
1702 {
1703 m_mutex = mtx;
1704 static if (is(typeof(m_taskMutex)))
1705 m_taskMutex = cast(TaskMutex)mtx;
1706 m_signal.initialize();
1707 }
1708
1709 @property LOCKABLE mutex() { return m_mutex; }
1710
1711 @trusted void wait()
1712 {
1713 if (auto tm = cast(TaskMutex)m_mutex) {
1714 assert(tm.m_impl.m_locked);
1715 debug assert(tm.m_impl.m_owner == Task.getThis());
1716 }
1717
1718 auto refcount = m_signal.emitCount;
1719
1720 static if (is(LOCKABLE == Mutex)) {
1721 if (m_taskMutex) m_taskMutex.unlock();
1722 else m_mutex.unlock_nothrow();
1723 } else m_mutex.unlock();
1724
1725 scope(exit) {
1726 static if (is(LOCKABLE == Mutex)) {
1727 if (m_taskMutex) m_taskMutex.lock();
1728 else m_mutex.lock_nothrow();
1729 } else m_mutex.lock();
1730 }
1731 static if (INTERRUPTIBLE) m_signal.wait(refcount);
1732 else m_signal.waitUninterruptible(refcount);
1733 }
1734
1735 @trusted bool wait(Duration timeout)
1736 {
1737 assert(!timeout.isNegative());
1738 if (auto tm = cast(TaskMutex)m_mutex) {
1739 assert(tm.m_impl.m_locked);
1740 debug assert(tm.m_impl.m_owner == Task.getThis());
1741 }
1742
1743 auto refcount = m_signal.emitCount;
1744
1745 static if (is(LOCKABLE == Mutex)) {
1746 if (m_taskMutex) m_taskMutex.unlock();
1747 else m_mutex.unlock_nothrow();
1748 } else m_mutex.unlock();
1749
1750 scope(exit) {
1751 static if (is(LOCKABLE == Mutex)) {
1752 if (m_taskMutex) m_taskMutex.lock();
1753 else m_mutex.lock_nothrow();
1754 } else m_mutex.lock();
1755 }
1756
1757 static if (INTERRUPTIBLE) return m_signal.wait(timeout, refcount) != refcount;
1758 else return m_signal.waitUninterruptible(timeout, refcount) != refcount;
1759 }
1760
1761 @trusted void notify()
1762 {
1763 m_signal.emit();
1764 }
1765
1766 @trusted void notifyAll()
1767 {
1768 m_signal.emit();
1769 }
1770 }
1771
1772 /** Contains the shared state of a $(D TaskReadWriteMutex).
1773 *
1774 * Since a $(D TaskReadWriteMutex) consists of two actual Mutex
1775 * objects that rely on common memory, this class implements
1776 * the actual functionality of their method calls.
1777 *
1778 * The method implementations are based on two static parameters
1779 * ($(D INTERRUPTIBLE) and $(D INTENT)), which are configured through
1780 * template arguments:
1781 *
1782 * - $(D INTERRUPTIBLE) determines whether the mutex implementation
1783 * are interruptible by vibe.d's $(D vibe.core.task.Task.interrupt())
1784 * method or not.
1785 *
1786 * - $(D INTENT) describes the intent, with which a locking operation is
1787 * performed (i.e. $(D READ_ONLY) or $(D READ_WRITE)). RO locking allows for
1788 * multiple Tasks holding the mutex, whereas RW locking will cause
1789 * a "bottleneck" so that only one Task can write to the protected
1790 * data at once.
1791 */
1792 private struct ReadWriteMutexState(bool INTERRUPTIBLE)
1793 {
1794 /** The policy with which the mutex should operate.
1795 *
1796 * The policy determines how the acquisition of the locks is
1797 * performed and can be used to tune the mutex according to the
1798 * underlying algorithm in which it is used.
1799 *
1800 * According to the provided policy, the mutex will either favor
1801 * reading or writing tasks and could potentially starve the
1802 * respective opposite.
1803 *
1804 * cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy)
1805 */
1806 enum Policy : int
1807 {
1808 /** Readers are prioritized, writers may be starved as a result. */
1809 PREFER_READERS = 0,
1810 /** Writers are prioritized, readers may be starved as a result. */
1811 PREFER_WRITERS
1812 }
1813
1814 /** The intent with which a locking operation is performed.
1815 *
1816 * Since both locks share the same underlying algorithms, the actual
1817 * intent with which a lock operation is performed (i.e read/write)
1818 * are passed as a template parameter to each method.
1819 */
1820 enum LockingIntent : bool
1821 {
1822 /** Perform a read lock/unlock operation. Multiple reading locks can be
1823 * active at a time. */
1824 READ_ONLY = 0,
1825 /** Perform a write lock/unlock operation. Only a single writer can
1826 * hold a lock at any given time. */
1827 READ_WRITE = 1
1828 }
1829
1830 private {
1831 //Queue counters
1832 /** The number of reading tasks waiting for the lock to become available. */
1833 shared(uint) m_waitingForReadLock = 0;
1834 /** The number of writing tasks waiting for the lock to become available. */
1835 shared(uint) m_waitingForWriteLock = 0;
1836
1837 //Lock counters
1838 /** The number of reading tasks that currently hold the lock. */
1839 uint m_activeReadLocks = 0;
1840 /** The number of writing tasks that currently hold the lock (binary). */
1841 ubyte m_activeWriteLocks = 0;
1842
1843 /** The policy determining the lock's behavior. */
1844 Policy m_policy;
1845
1846 //Queue Events
1847 /** The event used to wake reading tasks waiting for the lock while it is blocked. */
1848 shared(ManualEvent) m_readyForReadLock;
1849 /** The event used to wake writing tasks waiting for the lock while it is blocked. */
1850 shared(ManualEvent) m_readyForWriteLock;
1851
1852 /** The underlying mutex that gates the access to the shared state. */
1853 Mutex m_counterMutex;
1854 }
1855
1856 this(Policy policy)
1857 {
1858 m_policy = policy;
1859 m_counterMutex = new Mutex();
1860 m_readyForReadLock = createSharedManualEvent();
1861 m_readyForWriteLock = createSharedManualEvent();
1862 }
1863
1864 @disable this(this);
1865
1866 /** The policy with which the lock has been created. */
1867 @property policy() const { return m_policy; }
1868
1869 version(RWMutexPrint)
1870 {
1871 /** Print out debug information during lock operations. */
1872 void printInfo(string OP, LockingIntent INTENT)() nothrow
1873 {
1874 import std.string;
1875 try
1876 {
1877 import std.stdio;
1878 writefln("RWMutex: %s (%s), active: RO: %d, RW: %d; waiting: RO: %d, RW: %d",
1879 OP.leftJustify(10,' '),
1880 INTENT == LockingIntent.READ_ONLY ? "RO" : "RW",
1881 m_activeReadLocks, m_activeWriteLocks,
1882 m_waitingForReadLock, m_waitingForWriteLock
1883 );
1884 }
1885 catch (Throwable t){}
1886 }
1887 }
1888
1889 /** An internal shortcut method to determine the queue event for a given intent. */
1890 @property ref auto queueEvent(LockingIntent INTENT)()
1891 {
1892 static if (INTENT == LockingIntent.READ_ONLY)
1893 return m_readyForReadLock;
1894 else
1895 return m_readyForWriteLock;
1896 }
1897
1898 /** An internal shortcut method to determine the queue counter for a given intent. */
1899 @property ref auto queueCounter(LockingIntent INTENT)()
1900 {
1901 static if (INTENT == LockingIntent.READ_ONLY)
1902 return m_waitingForReadLock;
1903 else
1904 return m_waitingForWriteLock;
1905 }
1906
1907 /** An internal shortcut method to determine the current emitCount of the queue counter for a given intent. */
1908 int emitCount(LockingIntent INTENT)()
1909 {
1910 return queueEvent!INTENT.emitCount();
1911 }
1912
1913 /** An internal shortcut method to determine the active counter for a given intent. */
1914 @property ref auto activeCounter(LockingIntent INTENT)()
1915 {
1916 static if (INTENT == LockingIntent.READ_ONLY)
1917 return m_activeReadLocks;
1918 else
1919 return m_activeWriteLocks;
1920 }
1921
1922 /** An internal shortcut method to wait for the queue event for a given intent.
1923 *
1924 * This method is used during the `lock()` operation, after a
1925 * `tryLock()` operation has been unsuccessfully finished.
1926 * The active fiber will yield and be suspended until the queue event
1927 * for the given intent will be fired.
1928 */
1929 int wait(LockingIntent INTENT)(int count)
1930 {
1931 static if (INTERRUPTIBLE)
1932 return queueEvent!INTENT.wait(count);
1933 else
1934 return queueEvent!INTENT.waitUninterruptible(count);
1935 }
1936
1937 /** An internal shortcut method to notify tasks waiting for the lock to become available again.
1938 *
1939 * This method is called whenever the number of owners of the mutex hits
1940 * zero; this is basically the counterpart to `wait()`.
1941 * It wakes any Task currently waiting for the mutex to be released.
1942 */
1943 @trusted void notify(LockingIntent INTENT)()
1944 {
1945 static if (INTENT == LockingIntent.READ_ONLY)
1946 { //If the last reader unlocks the mutex, notify all waiting writers
1947 if (atomicLoad(m_waitingForWriteLock) > 0)
1948 m_readyForWriteLock.emit();
1949 }
1950 else
1951 { //If a writer unlocks the mutex, notify both readers and writers
1952 if (atomicLoad(m_waitingForReadLock) > 0)
1953 m_readyForReadLock.emit();
1954
1955 if (atomicLoad(m_waitingForWriteLock) > 0)
1956 m_readyForWriteLock.emit();
1957 }
1958 }
1959
1960 /** An internal method that performs the acquisition attempt in different variations.
1961 *
1962 * Since both locks rely on a common TaskMutex object which gates the access
1963 * to their common data acquisition attempts for this lock are more complex
1964 * than for simple mutex variants. This method will thus be performing the
1965 * `tryLock()` operation in two variations, depending on the callee:
1966 *
1967 * If called from the outside ($(D WAIT_FOR_BLOCKING_MUTEX) = false), the method
1968 * will instantly fail if the underlying mutex is locked (i.e. during another
1969 * `tryLock()` or `unlock()` operation), in order to guarantee the fastest
1970 * possible locking attempt.
1971 *
1972 * If used internally by the `lock()` method ($(D WAIT_FOR_BLOCKING_MUTEX) = true),
1973 * the operation will wait for the mutex to be available before deciding if
1974 * the lock can be acquired, since the attempt would anyway be repeated until
1975 * it succeeds. This will prevent frequent retries under heavy loads and thus
1976 * should ensure better performance.
1977 */
1978 @trusted bool tryLock(LockingIntent INTENT, bool WAIT_FOR_BLOCKING_MUTEX)()
1979 {
1980 //Log a debug statement for the attempt
1981 version(RWMutexPrint)
1982 printInfo!("tryLock",INTENT)();
1983
1984 //Try to acquire the lock
1985 static if (!WAIT_FOR_BLOCKING_MUTEX)
1986 {
1987 if (!m_counterMutex.tryLock())
1988 return false;
1989 }
1990 else
1991 m_counterMutex.lock();
1992
1993 scope(exit)
1994 m_counterMutex.unlock();
1995
1996 //Log a debug statement for the attempt
1997 version(RWMutexPrint)
1998 printInfo!("checkCtrs",INTENT)();
1999
2000 //Check if there's already an active writer
2001 if (m_activeWriteLocks > 0)
2002 return false;
2003
2004 //If writers are preferred over readers, check whether there
2005 //currently is a writer in the waiting queue and abort if
2006 //that's the case.
2007 static if (INTENT == LockingIntent.READ_ONLY)
2008 if (m_policy.PREFER_WRITERS && m_waitingForWriteLock > 0)
2009 return false;
2010
2011 //If we are locking the mutex for writing, make sure that
2012 //there's no reader active.
2013 static if (INTENT == LockingIntent.READ_WRITE)
2014 if (m_activeReadLocks > 0)
2015 return false;
2016
2017 //We can successfully acquire the lock!
2018 //Log a debug statement for the success.
2019 version(RWMutexPrint)
2020 printInfo!("lock",INTENT)();
2021
2022 //Increase the according counter
2023 //(number of active readers/writers)
2024 //and return a success code.
2025 activeCounter!INTENT += 1;
2026 return true;
2027 }
2028
2029 /** Attempt to acquire the lock for a given intent.
2030 *
2031 * Returns:
2032 * `true`, if the lock was successfully acquired;
2033 * `false` otherwise.
2034 */
2035 @trusted bool tryLock(LockingIntent INTENT)()
2036 {
2037 //Try to lock this mutex without waiting for the underlying
2038 //TaskMutex - fail if it is already blocked.
2039 return tryLock!(INTENT,false)();
2040 }
2041
2042 /** Acquire the lock for the given intent; yield and suspend until the lock has been acquired. */
2043 @trusted void lock(LockingIntent INTENT)()
2044 {
2045 //Prepare a waiting action before the first
2046 //`tryLock()` call in order to avoid a race
2047 //condition that could lead to the queue notification
2048 //not being fired.
2049 auto count = emitCount!INTENT;
2050 atomicOp!"+="(queueCounter!INTENT,1);
2051 scope(exit)
2052 atomicOp!"-="(queueCounter!INTENT,1);
2053
2054 //Try to lock the mutex
2055 auto locked = tryLock!(INTENT,true)();
2056 if (locked)
2057 return;
2058
2059 //Retry until we successfully acquired the lock
2060 while(!locked)
2061 {
2062 version(RWMutexPrint)
2063 printInfo!("wait",INTENT)();
2064
2065 count = wait!INTENT(count);
2066 locked = tryLock!(INTENT,true)();
2067 }
2068 }
2069
2070 /** Unlock the mutex after a successful acquisition. */
2071 @trusted void unlock(LockingIntent INTENT)()
2072 {
2073 version(RWMutexPrint)
2074 printInfo!("unlock",INTENT)();
2075
2076 debug assert(activeCounter!INTENT > 0);
2077
2078 synchronized(m_counterMutex)
2079 {
2080 //Decrement the counter of active lock holders.
2081 //If the counter hits zero, notify waiting Tasks
2082 activeCounter!INTENT -= 1;
2083 if (activeCounter!INTENT == 0)
2084 {
2085 version(RWMutexPrint)
2086 printInfo!("notify",INTENT)();
2087
2088 notify!INTENT();
2089 }
2090 }
2091 }
2092 }
2093
2094 /** A ReadWriteMutex implementation for fibers.
2095 *
2096 * This mutex can be used in exchange for a $(D core.sync.mutex.ReadWriteMutex),
2097 * but does not block the event loop in contention situations. The `reader` and `writer`
2098 * members are used for locking. Locking the `reader` mutex allows access to multiple
2099 * readers at once, while the `writer` mutex only allows a single writer to lock it at
2100 * any given time. Locks on `reader` and `writer` are mutually exclusive (i.e. whenever a
2101 * writer is active, no readers can be active at the same time, and vice versa).
2102 *
2103 * Notice:
2104 * Mutexes implemented by this class cannot be interrupted
2105 * using $(D vibe.core.task.Task.interrupt()). The corresponding
2106 * InterruptException will be deferred until the next blocking
2107 * operation yields the event loop.
2108 *
2109 * Use $(D InterruptibleTaskReadWriteMutex) as an alternative that can be
2110 * interrupted.
2111 *
2112 * cf. $(D core.sync.mutex.ReadWriteMutex)
2113 */
2114 final class TaskReadWriteMutex
2115 {
2116 private {
2117 alias State = ReadWriteMutexState!false;
2118 alias LockingIntent = State.LockingIntent;
2119 alias READ_ONLY = LockingIntent.READ_ONLY;
2120 alias READ_WRITE = LockingIntent.READ_WRITE;
2121
2122 /** The shared state used by the reader and writer mutexes. */
2123 State m_state;
2124 }
2125
2126 /** The policy with which the mutex should operate.
2127 *
2128 * The policy determines how the acquisition of the locks is
2129 * performed and can be used to tune the mutex according to the
2130 * underlying algorithm in which it is used.
2131 *
2132 * According to the provided policy, the mutex will either favor
2133 * reading or writing tasks and could potentially starve the
2134 * respective opposite.
2135 *
2136 * cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy)
2137 */
2138 alias Policy = State.Policy;
2139
2140 /** A common baseclass for both of the provided mutexes.
2141 *
2142 * The intent for the according mutex is specified through the
2143 * $(D INTENT) template argument, which determines if a mutex is
2144 * used for read or write locking.
2145 */
2146 final class Mutex(LockingIntent INTENT): core.sync.mutex.Mutex, Lockable
2147 {
2148 /** Try to lock the mutex. cf. $(D core.sync.mutex.Mutex) */
2149 override bool tryLock() { return m_state.tryLock!INTENT(); }
2150 /** Lock the mutex. cf. $(D core.sync.mutex.Mutex) */
2151 override void lock() { m_state.lock!INTENT(); }
2152 /** Unlock the mutex. cf. $(D core.sync.mutex.Mutex) */
2153 override void unlock() { m_state.unlock!INTENT(); }
2154 }
2155 alias Reader = Mutex!READ_ONLY;
2156 alias Writer = Mutex!READ_WRITE;
2157
2158 Reader reader;
2159 Writer writer;
2160
2161 this(Policy policy = Policy.PREFER_WRITERS)
2162 {
2163 m_state = State(policy);
2164 reader = new Reader();
2165 writer = new Writer();
2166 }
2167
2168 /** The policy with which the lock has been created. */
2169 @property Policy policy() const { return m_state.policy; }
2170 }
2171
2172 /** Alternative to $(D TaskReadWriteMutex) that supports interruption.
2173 *
2174 * This class supports the use of $(D vibe.core.task.Task.interrupt()) while
2175 * waiting in the `lock()` method.
2176 *
2177 * cf. $(D core.sync.mutex.ReadWriteMutex)
2178 */
2179 final class InterruptibleTaskReadWriteMutex
2180 {
2181 @safe:
2182
2183 private {
2184 alias State = ReadWriteMutexState!true;
2185 alias LockingIntent = State.LockingIntent;
2186 alias READ_ONLY = LockingIntent.READ_ONLY;
2187 alias READ_WRITE = LockingIntent.READ_WRITE;
2188
2189 /** The shared state used by the reader and writer mutexes. */
2190 State m_state;
2191 }
2192
2193 /** The policy with which the mutex should operate.
2194 *
2195 * The policy determines how the acquisition of the locks is
2196 * performed and can be used to tune the mutex according to the
2197 * underlying algorithm in which it is used.
2198 *
2199 * According to the provided policy, the mutex will either favor
2200 * reading or writing tasks and could potentially starve the
2201 * respective opposite.
2202 *
2203 * cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy)
2204 */
2205 alias Policy = State.Policy;
2206
2207 /** A common baseclass for both of the provided mutexes.
2208 *
2209 * The intent for the according mutex is specified through the
2210 * $(D INTENT) template argument, which determines if a mutex is
2211 * used for read or write locking.
2212 *
2213 */
2214 final class Mutex(LockingIntent INTENT): core.sync.mutex.Mutex, Lockable
2215 {
2216 /** Try to lock the mutex. cf. $(D core.sync.mutex.Mutex) */
2217 override bool tryLock() { return m_state.tryLock!INTENT(); }
2218 /** Lock the mutex. cf. $(D core.sync.mutex.Mutex) */
2219 override void lock() { m_state.lock!INTENT(); }
2220 /** Unlock the mutex. cf. $(D core.sync.mutex.Mutex) */
2221 override void unlock() { m_state.unlock!INTENT(); }
2222 }
2223 alias Reader = Mutex!READ_ONLY;
2224 alias Writer = Mutex!READ_WRITE;
2225
2226 Reader reader;
2227 Writer writer;
2228
2229 this(Policy policy = Policy.PREFER_WRITERS)
2230 {
2231 m_state = State(policy);
2232 reader = new Reader();
2233 writer = new Writer();
2234 }
2235
2236 /** The policy with which the lock has been created. */
2237 @property Policy policy() const { return m_state.policy; }
2238 }