1 /** 2 Event loop compatible task synchronization facilities. 3 4 This module provides replacement primitives for the modules in `core.sync` 5 that do not block vibe.d's event loop in their wait states. These should 6 always be preferred over the ones in Druntime under usual circumstances. 7 8 Using a standard `Mutex` is possible as long as it is ensured that no event 9 loop based functionality (I/O, task interaction or anything that implicitly 10 calls `vibe.core.core.yield`) is executed within a section of code that is 11 protected by the mutex. $(B Failure to do so may result in dead-locks and 12 high-level race-conditions!) 13 14 Copyright: © 2012-2019 Sönke Ludwig 15 Authors: Leonid Kramer, Sönke Ludwig, Manuel Frischknecht 16 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 17 */ 18 module vibe.core.sync; 19 20 import vibe.core.log : logDebugV, logTrace, logInfo; 21 import vibe.core.task; 22 23 import core.atomic; 24 import core.sync.mutex; 25 import core.sync.condition; 26 import eventcore.core; 27 import std.exception; 28 import std.stdio; 29 import std.traits : ReturnType; 30 31 32 /** Creates a new signal that can be shared between fibers. 33 */ 34 LocalManualEvent createManualEvent() 35 @safe nothrow { 36 LocalManualEvent ret; 37 ret.initialize(); 38 return ret; 39 } 40 /// ditto 41 shared(ManualEvent) createSharedManualEvent() 42 @trusted nothrow { 43 shared(ManualEvent) ret; 44 ret.initialize(); 45 return ret; 46 } 47 48 49 /** Performs RAII based locking/unlocking of a mutex. 50 51 Note that while `TaskMutex` can be used with D's built-in `synchronized` 52 statement, `InterruptibleTaskMutex` cannot. This function provides a 53 library based alternative that is suitable for use with all mutex types. 54 */ 55 ScopedMutexLock!M scopedMutexLock(M)(M mutex, LockMode mode = LockMode.lock) 56 if (is(M : Mutex) || is(M : Lockable)) 57 { 58 return ScopedMutexLock!M(mutex, mode); 59 } 60 61 /// 62 unittest { 63 import vibe.core.core : runWorkerTaskH; 64 65 __gshared int counter; 66 __gshared TaskMutex mutex; 67 68 mutex = new TaskMutex; 69 70 Task[] tasks; 71 72 foreach (i; 0 .. 100) { 73 tasks ~= runWorkerTaskH(() nothrow { 74 auto l = scopedMutexLock(mutex); 75 counter++; 76 }); 77 } 78 79 foreach (t; tasks) t.join(); 80 81 assert(counter == 100); 82 } 83 84 unittest { 85 scopedMutexLock(new Mutex); 86 scopedMutexLock(new TaskMutex); 87 scopedMutexLock(new InterruptibleTaskMutex); 88 } 89 90 enum LockMode { 91 lock, 92 tryLock, 93 defer 94 } 95 96 interface Lockable { 97 @safe: 98 void lock(); 99 void unlock(); 100 bool tryLock(); 101 } 102 103 /** RAII lock for the Mutex class. 104 */ 105 struct ScopedMutexLock(M) 106 if (is(M : Mutex) || is(M : Lockable)) 107 { 108 @disable this(this); 109 private { 110 M m_mutex; 111 bool m_locked; 112 LockMode m_mode; 113 } 114 115 this(M mutex, LockMode mode = LockMode.lock) 116 { 117 assert(mutex !is null); 118 m_mutex = mutex; 119 120 final switch (mode) { 121 case LockMode.lock: lock(); break; 122 case LockMode.tryLock: tryLock(); break; 123 case LockMode.defer: break; 124 } 125 } 126 127 ~this() 128 { 129 if (m_locked) unlock(); 130 } 131 132 @property bool locked() const { return m_locked; } 133 134 void unlock() 135 in { assert(this.locked); } 136 do { 137 assert(m_locked, "Unlocking unlocked scoped mutex lock"); 138 static if (is(typeof(m_mutex.unlock_nothrow()))) 139 m_mutex.unlock_nothrow(); 140 else 141 m_mutex.unlock(); 142 m_locked = false; 143 } 144 145 bool tryLock() 146 in { assert(!this.locked); } 147 do { 148 static if (is(typeof(m_mutex.tryLock_nothrow()))) 149 return m_locked = m_mutex.tryLock_nothrow(); 150 else 151 return m_locked = m_mutex.tryLock(); 152 } 153 154 void lock() 155 in { assert(!this.locked); } 156 do { 157 m_locked = true; 158 static if (is(typeof(m_mutex.lock_nothrow()))) 159 m_mutex.lock_nothrow(); 160 else 161 m_mutex.lock(); 162 } 163 } 164 165 166 /* 167 Only for internal use: 168 Ensures that a mutex is locked while executing the given procedure. 169 170 This function works for all kinds of mutexes, in particular for 171 $(D core.sync.mutex.Mutex), $(D TaskMutex) and $(D InterruptibleTaskMutex). 172 173 Returns: 174 Returns the value returned from $(D PROC), if any. 175 */ 176 /// private 177 ReturnType!PROC performLocked(alias PROC, MUTEX)(MUTEX mutex) 178 { 179 auto l = scopedMutexLock(mutex); 180 return PROC(); 181 } 182 183 /// 184 unittest { 185 int protected_var = 0; 186 auto mtx = new TaskMutex; 187 mtx.performLocked!({ 188 protected_var++; 189 }); 190 } 191 192 193 /** 194 Thread-local semaphore implementation for tasks. 195 196 When the semaphore runs out of concurrent locks, it will suspend. This class 197 is used in `vibe.core.connectionpool` to limit the number of concurrent 198 connections. 199 */ 200 final class LocalTaskSemaphore 201 { 202 @safe: 203 204 // requires a queue 205 import std.container.binaryheap; 206 import std.container.array; 207 //import vibe.utils.memory; 208 209 private { 210 static struct ThreadWaiter { 211 ubyte priority; 212 uint seq; 213 } 214 215 BinaryHeap!(Array!ThreadWaiter, asc) m_waiters; 216 uint m_maxLocks; 217 uint m_locks; 218 uint m_seq; 219 LocalManualEvent m_signal; 220 } 221 222 this(uint max_locks) nothrow 223 { 224 m_maxLocks = max_locks; 225 m_signal = createManualEvent(); 226 } 227 228 /// Maximum number of concurrent locks 229 @property void maxLocks(uint max_locks) { m_maxLocks = max_locks; } 230 /// ditto 231 @property uint maxLocks() const nothrow { return m_maxLocks; } 232 233 /// Number of concurrent locks still available 234 @property uint available() const { return m_maxLocks - m_locks; } 235 236 /** Try to acquire a lock. 237 238 If a lock cannot be acquired immediately, returns `false` and leaves the 239 semaphore in its previous state. 240 241 Returns: 242 `true` is returned $(I iff) the number of available locks is greater 243 than one. 244 */ 245 bool tryLock() 246 { 247 if (available > 0) 248 { 249 m_locks++; 250 return true; 251 } 252 return false; 253 } 254 255 /** Acquires a lock. 256 257 Once the limit of concurrent locks is reached, this method will block 258 until the number of locks drops below the limit. 259 */ 260 void lock(ubyte priority = 0) 261 { 262 import std.algorithm.comparison : min; 263 264 if (tryLock()) 265 return; 266 267 ThreadWaiter w; 268 w.priority = priority; 269 w.seq = min(0, m_seq - w.priority); 270 if (++m_seq == uint.max) 271 rewindSeq(); 272 273 () @trusted { m_waiters.insert(w); } (); 274 275 while (true) { 276 m_signal.waitUninterruptible(); 277 if (m_waiters.front.seq == w.seq && tryLock()) { 278 return; 279 } 280 } 281 } 282 283 /** Gives up an existing lock. 284 */ 285 void unlock() 286 { 287 assert(m_locks >= 1); 288 m_locks--; 289 if (m_waiters.length > 0) 290 m_signal.emit(); // resume one 291 } 292 293 // if true, a goes after b. ie. b comes out front() 294 /// private 295 static bool asc(ref ThreadWaiter a, ref ThreadWaiter b) 296 { 297 if (a.priority != b.priority) 298 return a.priority < b.priority; 299 return a.seq > b.seq; 300 } 301 302 private void rewindSeq() 303 @trusted { 304 Array!ThreadWaiter waiters = m_waiters.release(); 305 ushort min_seq; 306 import std.algorithm : min; 307 foreach (ref waiter; waiters[]) 308 min_seq = min(waiter.seq, min_seq); 309 foreach (ref waiter; waiters[]) 310 waiter.seq -= min_seq; 311 m_waiters.assume(waiters); 312 } 313 } 314 315 316 /** 317 Mutex implementation for fibers. 318 319 This mutex type can be used in exchange for a core.sync.mutex.Mutex, but 320 does not block the event loop when contention happens. Note that this 321 mutex does not allow recursive locking. 322 323 Notice: 324 Because this class is annotated nothrow, it cannot be interrupted 325 using $(D vibe.core.task.Task.interrupt()). The corresponding 326 $(D InterruptException) will be deferred until the next blocking 327 operation yields the event loop. 328 329 Use $(D InterruptibleTaskMutex) as an alternative that can be 330 interrupted. 331 332 See_Also: InterruptibleTaskMutex, RecursiveTaskMutex, core.sync.mutex.Mutex 333 */ 334 final class TaskMutex : core.sync.mutex.Mutex, Lockable { 335 @safe: 336 private TaskMutexImpl!false m_impl; 337 338 this(Object o) nothrow { m_impl.setup(); super(o); } 339 this() nothrow { m_impl.setup(); } 340 341 override bool tryLock() nothrow { return m_impl.tryLock(); } 342 override void lock() nothrow { m_impl.lock(); } 343 override void unlock() nothrow { m_impl.unlock(); } 344 bool lock(Duration timeout) nothrow { return m_impl.lock(timeout); } 345 } 346 347 unittest { 348 auto mutex = new TaskMutex; 349 350 { 351 auto lock = scopedMutexLock(mutex); 352 assert(lock.locked); 353 assert(mutex.m_impl.m_locked); 354 355 auto lock2 = scopedMutexLock(mutex, LockMode.tryLock); 356 assert(!lock2.locked); 357 } 358 assert(!mutex.m_impl.m_locked); 359 360 auto lock = scopedMutexLock(mutex, LockMode.tryLock); 361 assert(lock.locked); 362 lock.unlock(); 363 assert(!lock.locked); 364 365 synchronized(mutex){ 366 assert(mutex.m_impl.m_locked); 367 } 368 assert(!mutex.m_impl.m_locked); 369 370 mutex.performLocked!({ 371 assert(mutex.m_impl.m_locked); 372 }); 373 assert(!mutex.m_impl.m_locked); 374 375 static if (__VERSION__ >= 2067) { 376 with(mutex.scopedMutexLock) { 377 assert(mutex.m_impl.m_locked); 378 } 379 } 380 } 381 382 unittest { // test deferred throwing 383 import vibe.core.core; 384 385 auto mutex = new TaskMutex; 386 auto t1 = runTask({ 387 scope (failure) assert(false, "No exception expected in first task!"); 388 mutex.lock(); 389 scope (exit) mutex.unlock(); 390 sleep(20.msecs); 391 }); 392 393 auto t2 = runTask({ 394 mutex.lock(); 395 scope (exit) mutex.unlock(); 396 try { 397 yield(); 398 assert(false, "Yield is supposed to have thrown an InterruptException."); 399 } catch (InterruptException) { 400 // as expected! 401 } catch (Exception) { 402 assert(false, "Only InterruptException supposed to be thrown!"); 403 } 404 }); 405 406 runTask({ 407 // mutex is now locked in first task for 20 ms 408 // the second tasks is waiting in lock() 409 t2.interrupt(); 410 t1.joinUninterruptible(); 411 t2.joinUninterruptible(); 412 assert(!mutex.m_impl.m_locked); // ensure that the scope(exit) has been executed 413 exitEventLoop(); 414 }); 415 416 runEventLoop(); 417 } 418 419 unittest { 420 runMutexUnitTests!TaskMutex(); 421 } 422 423 424 /** 425 Alternative to $(D TaskMutex) that supports interruption. 426 427 This class supports the use of $(D vibe.core.task.Task.interrupt()) while 428 waiting in the $(D lock()) method. However, because the interface is not 429 $(D nothrow), it cannot be used as an object monitor. 430 431 See_Also: $(D TaskMutex), $(D InterruptibleRecursiveTaskMutex) 432 */ 433 final class InterruptibleTaskMutex : Lockable { 434 @safe: 435 436 private TaskMutexImpl!true m_impl; 437 438 this() 439 { 440 m_impl.setup(); 441 442 // detects invalid usage within synchronized(...) 443 () @trusted { this.__monitor = cast(void*)&NoUseMonitor.instance(); } (); 444 } 445 446 bool tryLock() nothrow { return m_impl.tryLock(); } 447 void lock() { m_impl.lock(); } 448 void unlock() nothrow { m_impl.unlock(); } 449 } 450 451 unittest { 452 runMutexUnitTests!InterruptibleTaskMutex(); 453 } 454 455 456 457 /** 458 Recursive mutex implementation for tasks. 459 460 This mutex type can be used in exchange for a `core.sync.mutex.Mutex`, but 461 does not block the event loop when contention happens. 462 463 Notice: 464 Because this class is annotated `nothrow`, it cannot be interrupted 465 using $(D vibe.core.task.Task.interrupt()). The corresponding 466 $(D InterruptException) will be deferred until the next blocking 467 operation yields the event loop. 468 469 Use $(D InterruptibleRecursiveTaskMutex) as an alternative that can be 470 interrupted. 471 472 See_Also: `TaskMutex`, `core.sync.mutex.Mutex` 473 */ 474 final class RecursiveTaskMutex : core.sync.mutex.Mutex, Lockable { 475 @safe: 476 477 private RecursiveTaskMutexImpl!false m_impl; 478 479 this(Object o) { m_impl.setup(); super(o); } 480 this() { m_impl.setup(); } 481 482 override bool tryLock() nothrow { return m_impl.tryLock(); } 483 override void lock() { m_impl.lock(); } 484 override void unlock() nothrow { m_impl.unlock(); } 485 } 486 487 unittest { 488 runMutexUnitTests!RecursiveTaskMutex(); 489 } 490 491 492 /** 493 Alternative to $(D RecursiveTaskMutex) that supports interruption. 494 495 This class supports the use of $(D vibe.core.task.Task.interrupt()) while 496 waiting in the $(D lock()) method. However, because the interface is not 497 $(D nothrow), it cannot be used as an object monitor. 498 499 See_Also: $(D RecursiveTaskMutex), $(D InterruptibleTaskMutex) 500 */ 501 final class InterruptibleRecursiveTaskMutex : Lockable { 502 @safe: 503 private RecursiveTaskMutexImpl!true m_impl; 504 505 this() 506 { 507 m_impl.setup(); 508 509 // detects invalid usage within synchronized(...) 510 () @trusted { this.__monitor = cast(void*)&NoUseMonitor.instance(); } (); 511 } 512 513 bool tryLock() nothrow { return m_impl.tryLock(); } 514 void lock() { m_impl.lock(); } 515 void unlock() nothrow { m_impl.unlock(); } 516 } 517 518 unittest { 519 runMutexUnitTests!InterruptibleRecursiveTaskMutex(); 520 } 521 522 523 // Helper class to ensure that the non Object.Monitor compatible interruptible 524 // mutex classes are not accidentally used with the `synchronized` statement 525 private final class NoUseMonitor : Object.Monitor { 526 private static shared Proxy st_instance; 527 528 static struct Proxy { 529 Object.Monitor monitor; 530 } 531 532 static @property ref shared(Proxy) instance() 533 @safe nothrow { 534 static shared(Proxy)* inst = null; 535 if (inst) return *inst; 536 537 () @trusted { // synchronized {} not @safe for DMD <= 2.078.3 538 synchronized { 539 if (!st_instance.monitor) 540 st_instance.monitor = new shared NoUseMonitor; 541 inst = &st_instance; 542 } 543 } (); 544 545 return *inst; 546 } 547 548 override void lock() @safe @nogc nothrow { 549 assert(false, "Interruptible task mutexes cannot be used with synchronized(), use scopedMutexLock instead."); 550 } 551 552 override void unlock() @safe @nogc nothrow {} 553 } 554 555 556 private void runMutexUnitTests(M)() 557 { 558 import vibe.core.core; 559 560 auto m = new M; 561 Task t1, t2; 562 void runContendedTasks(bool interrupt_t1, bool interrupt_t2) { 563 assert(!m.m_impl.m_locked); 564 565 // t1 starts first and acquires the mutex for 20 ms 566 // t2 starts second and has to wait in m.lock() 567 t1 = runTask(() nothrow { 568 assert(!m.m_impl.m_locked); 569 try m.lock(); 570 catch (Exception e) assert(false, e.msg); 571 assert(m.m_impl.m_locked); 572 if (interrupt_t1) { 573 try assertThrown!InterruptException(sleep(100.msecs)); 574 catch (Exception e) assert(false, e.msg); 575 } else assertNotThrown(sleep(20.msecs)); 576 m.unlock(); 577 }); 578 t2 = runTask(() nothrow { 579 assert(!m.tryLock()); 580 if (interrupt_t2) { 581 try m.lock(); 582 catch (InterruptException) return; 583 catch (Exception e) assert(false, e.msg); 584 try yield(); // rethrows any deferred exceptions 585 catch (InterruptException) { 586 m.unlock(); 587 return; 588 } 589 catch (Exception e) assert(false, e.msg); 590 assert(false, "Supposed to have thrown an InterruptException."); 591 } else assertNotThrown(m.lock()); 592 assert(m.m_impl.m_locked); 593 try sleep(20.msecs); 594 catch (Exception e) assert(false, e.msg); 595 m.unlock(); 596 assert(!m.m_impl.m_locked); 597 }); 598 } 599 600 // basic lock test 601 m.performLocked!({ 602 assert(m.m_impl.m_locked); 603 }); 604 assert(!m.m_impl.m_locked); 605 606 // basic contention test 607 runContendedTasks(false, false); 608 auto t3 = runTask({ 609 assert(t1.running && t2.running); 610 assert(m.m_impl.m_locked); 611 t1.joinUninterruptible(); 612 assert(!t1.running && t2.running); 613 try yield(); // give t2 a chance to take the lock 614 catch (Exception e) assert(false, e.msg); 615 assert(m.m_impl.m_locked); 616 t2.joinUninterruptible(); 617 assert(!t2.running); 618 assert(!m.m_impl.m_locked); 619 exitEventLoop(); 620 }); 621 runEventLoop(); 622 assert(!t3.running); 623 assert(!m.m_impl.m_locked); 624 625 // interruption test #1 626 runContendedTasks(true, false); 627 t3 = runTask({ 628 assert(t1.running && t2.running); 629 assert(m.m_impl.m_locked); 630 t1.interrupt(); 631 t1.joinUninterruptible(); 632 assert(!t1.running && t2.running); 633 try yield(); // give t2 a chance to take the lock 634 catch (Exception e) assert(false, e.msg); 635 assert(m.m_impl.m_locked); 636 t2.joinUninterruptible(); 637 assert(!t2.running); 638 assert(!m.m_impl.m_locked); 639 exitEventLoop(); 640 }); 641 runEventLoop(); 642 assert(!t3.running); 643 assert(!m.m_impl.m_locked); 644 645 // interruption test #2 646 runContendedTasks(false, true); 647 t3 = runTask({ 648 assert(t1.running && t2.running); 649 assert(m.m_impl.m_locked); 650 t2.interrupt(); 651 t2.joinUninterruptible(); 652 assert(!t2.running); 653 static if (is(M == InterruptibleTaskMutex) || is (M == InterruptibleRecursiveTaskMutex)) 654 assert(t1.running && m.m_impl.m_locked); 655 t1.joinUninterruptible(); 656 assert(!t1.running); 657 assert(!m.m_impl.m_locked); 658 exitEventLoop(); 659 }); 660 runEventLoop(); 661 assert(!t3.running); 662 assert(!m.m_impl.m_locked); 663 } 664 665 666 /** 667 Event loop based condition variable or "event" implementation. 668 669 This class can be used in exchange for a $(D core.sync.condition.Condition) 670 to avoid blocking the event loop when waiting. 671 672 Notice: 673 Because this class is annotated nothrow, it cannot be interrupted 674 using $(D vibe.core.task.Task.interrupt()). The corresponding 675 $(D InterruptException) will be deferred until the next blocking 676 operation yields to the event loop. 677 678 Use $(D InterruptibleTaskCondition) as an alternative that can be 679 interrupted. 680 681 Note that it is generally not safe to use a `TaskCondition` together with an 682 interruptible mutex type. 683 684 See_Also: `InterruptibleTaskCondition` 685 */ 686 final class TaskCondition : core.sync.condition.Condition { 687 @safe: 688 689 private TaskConditionImpl!(false, Mutex) m_impl; 690 691 this(core.sync.mutex.Mutex mtx) 692 nothrow { 693 assert(mtx.classinfo is Mutex.classinfo || mtx.classinfo is TaskMutex.classinfo, 694 "TaskCondition can only be used with Mutex or TaskMutex"); 695 696 m_impl.setup(mtx); 697 super(mtx); 698 } 699 override @property Mutex mutex() nothrow { return m_impl.mutex; } 700 override void wait() nothrow { m_impl.wait(); } 701 override bool wait(Duration timeout) nothrow { return m_impl.wait(timeout); } 702 override void notify() nothrow { m_impl.notify(); } 703 override void notifyAll() nothrow { m_impl.notifyAll(); } 704 } 705 706 unittest { 707 new TaskCondition(new Mutex); 708 new TaskCondition(new TaskMutex); 709 } 710 711 712 /** This example shows the typical usage pattern using a `while` loop to make 713 sure that the final condition is reached. 714 */ 715 unittest { 716 import vibe.core.core; 717 import vibe.core.log; 718 719 __gshared Mutex mutex; 720 __gshared TaskCondition condition; 721 __gshared int workers_still_running = 0; 722 723 // setup the task condition 724 mutex = new Mutex; 725 condition = new TaskCondition(mutex); 726 727 logDebug("SETTING UP TASKS"); 728 729 // start up the workers and count how many are running 730 foreach (i; 0 .. 4) { 731 workers_still_running++; 732 runWorkerTask(() nothrow { 733 // simulate some work 734 try sleep(100.msecs); 735 catch (Exception e) {} 736 737 // notify the waiter that we're finished 738 { 739 auto l = scopedMutexLock(mutex); 740 workers_still_running--; 741 logDebug("DECREMENT %s", workers_still_running); 742 } 743 logDebug("NOTIFY"); 744 condition.notify(); 745 }); 746 } 747 748 logDebug("STARTING WAIT LOOP"); 749 750 // wait until all tasks have decremented the counter back to zero 751 synchronized (mutex) { 752 while (workers_still_running > 0) { 753 logDebug("STILL running %s", workers_still_running); 754 condition.wait(); 755 } 756 } 757 } 758 759 760 /** 761 Alternative to `TaskCondition` that supports interruption. 762 763 This class supports the use of `vibe.core.task.Task.interrupt()` while 764 waiting in the `wait()` method. 765 766 See `TaskCondition` for an example. 767 768 Notice: 769 Note that it is generally not safe to use an 770 `InterruptibleTaskCondition` together with an interruptible mutex type. 771 772 See_Also: `TaskCondition` 773 */ 774 final class InterruptibleTaskCondition { 775 @safe: 776 777 private TaskConditionImpl!(true, Lockable) m_impl; 778 779 this(M)(M mutex) 780 if (is(M : Mutex) || is (M : Lockable)) 781 { 782 static if (is(M : Lockable)) 783 m_impl.setup(mutex); 784 else 785 m_impl.setupForMutex(mutex); 786 } 787 788 @property Lockable mutex() { return m_impl.mutex; } 789 void wait() { m_impl.wait(); } 790 bool wait(Duration timeout) { return m_impl.wait(timeout); } 791 void notify() nothrow { m_impl.notify(); } 792 void notifyAll() nothrow { m_impl.notifyAll(); } 793 } 794 795 unittest { 796 new InterruptibleTaskCondition(new Mutex); 797 new InterruptibleTaskCondition(new TaskMutex); 798 new InterruptibleTaskCondition(new InterruptibleTaskMutex); 799 } 800 801 802 /** A manually triggered single threaded cross-task event. 803 804 Note: the ownership can be shared between multiple fibers of the same thread. 805 */ 806 struct LocalManualEvent { 807 import core.thread : Thread; 808 import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny; 809 810 @safe: 811 812 private { 813 alias Waiter = ThreadLocalWaiter!false; 814 815 Waiter m_waiter; 816 } 817 818 // thread destructor in vibe.core.core will decrement the ref. count 819 package static EventID ms_threadEvent; 820 821 private void initialize() 822 nothrow { 823 import vibe.internal.allocator : Mallocator, makeGCSafe; 824 m_waiter = () @trusted { return Mallocator.instance.makeGCSafe!Waiter; } (); 825 } 826 827 this(this) 828 nothrow { 829 if (m_waiter) 830 return m_waiter.addRef(); 831 } 832 833 ~this() 834 nothrow { 835 import vibe.internal.allocator : Mallocator, disposeGCSafe; 836 if (m_waiter) { 837 if (!m_waiter.releaseRef()) { 838 static if (__VERSION__ < 2087) scope (failure) assert(false); 839 () @trusted { Mallocator.instance.disposeGCSafe(m_waiter); } (); 840 } 841 } 842 } 843 844 bool opCast (T : bool) () const nothrow { return m_waiter !is null; } 845 846 /// A counter that is increased with every emit() call 847 int emitCount() const nothrow { return m_waiter.m_emitCount; } 848 849 /// Emits the signal, waking up all owners of the signal. 850 int emit() 851 nothrow { 852 assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()"); 853 logTrace("unshared emit"); 854 auto ec = m_waiter.m_emitCount++; 855 m_waiter.emit(); 856 return ec; 857 } 858 859 /// Emits the signal, waking up a single owners of the signal. 860 int emitSingle() 861 nothrow { 862 assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()"); 863 logTrace("unshared single emit"); 864 auto ec = m_waiter.m_emitCount++; 865 m_waiter.emitSingle(); 866 return ec; 867 } 868 869 /** Acquires ownership and waits until the signal is emitted. 870 871 Note that in order not to miss any emits it is necessary to use the 872 overload taking an integer. 873 874 Throws: 875 May throw an $(D InterruptException) if the task gets interrupted 876 using $(D Task.interrupt()). 877 */ 878 int wait() { return wait(this.emitCount); } 879 880 /** Acquires ownership and waits until the signal is emitted and the emit 881 count is larger than a given one. 882 883 Throws: 884 May throw an $(D InterruptException) if the task gets interrupted 885 using $(D Task.interrupt()). 886 */ 887 int wait(int emit_count) { return doWait!true(Duration.max, emit_count); } 888 /// ditto 889 int wait(Duration timeout, int emit_count) { return doWait!true(timeout, emit_count); } 890 891 /** Same as $(D wait), but defers throwing any $(D InterruptException). 892 893 This method is annotated $(D nothrow) at the expense that it cannot be 894 interrupted. 895 */ 896 int waitUninterruptible() nothrow { return waitUninterruptible(this.emitCount); } 897 /// ditto 898 int waitUninterruptible(int emit_count) nothrow { return doWait!false(Duration.max, emit_count); } 899 /// ditto 900 int waitUninterruptible(Duration timeout, int emit_count) nothrow { return doWait!false(timeout, emit_count); } 901 902 private int doWait(bool interruptible)(Duration timeout, int emit_count) 903 { 904 import core.time : MonoTime; 905 906 assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()"); 907 908 MonoTime target_timeout, now; 909 if (timeout != Duration.max) { 910 try now = MonoTime.currTime(); 911 catch (Exception e) { assert(false, e.msg); } 912 target_timeout = now + timeout; 913 } 914 915 while (m_waiter.m_emitCount - emit_count <= 0) { 916 m_waiter.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max); 917 try now = MonoTime.currTime(); 918 catch (Exception e) { assert(false, e.msg); } 919 if (now >= target_timeout) break; 920 } 921 922 return m_waiter.m_emitCount; 923 } 924 } 925 926 unittest { 927 import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep; 928 929 auto e = createManualEvent(); 930 auto w1 = runTask({ e.waitUninterruptible(100.msecs, e.emitCount); }); 931 auto w2 = runTask({ e.waitUninterruptible(500.msecs, e.emitCount); }); 932 runTask({ 933 try sleep(50.msecs); 934 catch (Exception e) assert(false, e.msg); 935 e.emit(); 936 try sleep(50.msecs); 937 catch (Exception e) assert(false, e.msg); 938 assert(!w1.running && !w2.running); 939 exitEventLoop(); 940 }); 941 runEventLoop(); 942 } 943 944 unittest { 945 import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep; 946 auto e = createManualEvent(); 947 // integer overflow test 948 e.m_waiter.m_emitCount = int.max; 949 auto w1 = runTask({ e.waitUninterruptible(50.msecs, e.emitCount); }); 950 runTask({ 951 try sleep(5.msecs); 952 catch (Exception e) assert(false, e.msg); 953 e.emit(); 954 try sleep(50.msecs); 955 catch (Exception e) assert(false, e.msg); 956 assert(!w1.running); 957 exitEventLoop(); 958 }); 959 runEventLoop(); 960 } 961 962 unittest { // ensure that cancelled waiters are properly handled and that a FIFO order is implemented 963 import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep; 964 965 LocalManualEvent l = createManualEvent(); 966 967 Task t2; 968 runTask({ 969 l.waitUninterruptible(); 970 t2.interrupt(); 971 try sleep(20.msecs); 972 catch (Exception e) assert(false, e.msg); 973 exitEventLoop(); 974 }); 975 t2 = runTask({ 976 try { 977 l.wait(); 978 assert(false, "Shouldn't reach this."); 979 } 980 catch (InterruptException e) {} 981 catch (Exception e) assert(false, e.msg); 982 }); 983 runTask({ 984 l.emit(); 985 }); 986 runEventLoop(); 987 } 988 989 unittest { // ensure that LocalManualEvent behaves correctly after being copied 990 import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep; 991 992 LocalManualEvent l = createManualEvent(); 993 runTask(() nothrow { 994 auto lc = l; 995 try sleep(100.msecs); 996 catch (Exception e) assert(false, e.msg); 997 lc.emit(); 998 }); 999 runTask({ 1000 assert(l.waitUninterruptible(1.seconds, l.emitCount)); 1001 exitEventLoop(); 1002 }); 1003 runEventLoop(); 1004 } 1005 1006 1007 /** A manually triggered multi threaded cross-task event. 1008 1009 Note: the ownership can be shared between multiple fibers and threads. 1010 */ 1011 struct ManualEvent { 1012 import core.thread : Thread; 1013 import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny; 1014 import vibe.internal.list : StackSList; 1015 1016 @safe: 1017 1018 private { 1019 alias ThreadWaiter = ThreadLocalWaiter!true; 1020 1021 int m_emitCount; 1022 static struct Waiters { 1023 StackSList!ThreadWaiter active; // actively waiting 1024 StackSList!ThreadWaiter free; // free-list of reusable waiter structs 1025 } 1026 Monitor!(Waiters, shared(Mutex)) m_waiters; 1027 } 1028 1029 // thread destructor in vibe.core.core will decrement the ref. count 1030 package static EventID ms_threadEvent; 1031 1032 enum EmitMode { 1033 single, 1034 all 1035 } 1036 1037 @disable this(this); 1038 1039 private void initialize() 1040 shared nothrow { 1041 m_waiters = createMonitor!(ManualEvent.Waiters)(new shared Mutex); 1042 } 1043 1044 deprecated("ManualEvent is always non-null!") 1045 bool opCast (T : bool) () const shared nothrow { return true; } 1046 1047 /// A counter that is increased with every emit() call 1048 int emitCount() const shared nothrow @trusted { return atomicLoad(m_emitCount); } 1049 1050 /// Emits the signal, waking up all owners of the signal. 1051 int emit() 1052 shared nothrow @trusted { 1053 import core.atomic : atomicOp, cas; 1054 1055 debug (VibeMutexLog) () @trusted { logTrace("emit shared %s", cast(void*)&this); } (); 1056 1057 auto ec = atomicOp!"+="(m_emitCount, 1); 1058 auto thisthr = Thread.getThis(); 1059 1060 ThreadWaiter lw; 1061 auto drv = eventDriver; 1062 m_waiters.lock.active.filter((ThreadWaiter w) { 1063 debug (VibeMutexLog) () @trusted { logTrace("waiter %s", cast(void*)w); } (); 1064 if (w.m_driver is drv) { 1065 lw = w; 1066 lw.addRef(); 1067 } else { 1068 try { 1069 assert(w.m_event != EventID.init); 1070 () @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true); 1071 } catch (Exception e) assert(false, e.msg); 1072 } 1073 return true; 1074 }); 1075 debug (VibeMutexLog) () @trusted { logTrace("lw %s", cast(void*)lw); } (); 1076 if (lw) { 1077 lw.emit(); 1078 releaseWaiter(lw); 1079 } 1080 1081 debug (VibeMutexLog) logTrace("emit shared done"); 1082 1083 return ec; 1084 } 1085 1086 /// Emits the signal, waking up at least one waiting task 1087 int emitSingle() 1088 shared nothrow @trusted { 1089 import core.atomic : atomicOp, cas; 1090 1091 () @trusted { logTrace("emit shared single %s", cast(void*)&this); } (); 1092 1093 auto ec = atomicOp!"+="(m_emitCount, 1); 1094 auto thisthr = Thread.getThis(); 1095 1096 ThreadWaiter lw; 1097 auto drv = eventDriver; 1098 m_waiters.lock.active.iterate((ThreadWaiter w) { 1099 () @trusted { logTrace("waiter %s", cast(void*)w); } (); 1100 if (w.m_driver is drv) { 1101 if (w.unused) return true; 1102 lw = w; 1103 lw.addRef(); 1104 } else { 1105 try { 1106 assert(w.m_event != EventID.invalid); 1107 () @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true); 1108 } catch (Exception e) assert(false, e.msg); 1109 } 1110 return false; 1111 }); 1112 () @trusted { logTrace("lw %s", cast(void*)lw); } (); 1113 if (lw) { 1114 lw.emitSingle(); 1115 releaseWaiter(lw); 1116 } 1117 1118 logTrace("emit shared done"); 1119 1120 return ec; 1121 } 1122 1123 /** Acquires ownership and waits until the signal is emitted. 1124 1125 Note that in order not to miss any emits it is necessary to use the 1126 overload taking an integer. 1127 1128 Throws: 1129 May throw an $(D InterruptException) if the task gets interrupted 1130 using $(D Task.interrupt()). 1131 */ 1132 int wait() shared { return wait(this.emitCount); } 1133 1134 /** Acquires ownership and waits until the emit count differs from the 1135 given one or until a timeout is reached. 1136 1137 Throws: 1138 May throw an $(D InterruptException) if the task gets interrupted 1139 using $(D Task.interrupt()). 1140 */ 1141 int wait(int emit_count) shared { return doWaitShared!true(Duration.max, emit_count); } 1142 /// ditto 1143 int wait(Duration timeout, int emit_count) shared { return doWaitShared!true(timeout, emit_count); } 1144 1145 /** Same as $(D wait), but defers throwing any $(D InterruptException). 1146 1147 This method is annotated $(D nothrow) at the expense that it cannot be 1148 interrupted. 1149 */ 1150 int waitUninterruptible() shared nothrow { return waitUninterruptible(this.emitCount); } 1151 /// ditto 1152 int waitUninterruptible(int emit_count) shared nothrow { return doWaitShared!false(Duration.max, emit_count); } 1153 /// ditto 1154 int waitUninterruptible(Duration timeout, int emit_count) shared nothrow { return doWaitShared!false(timeout, emit_count); } 1155 1156 private int doWaitShared(bool interruptible)(Duration timeout, int emit_count) 1157 shared { 1158 import core.time : MonoTime; 1159 1160 () @trusted { logTrace("wait shared %s", cast(void*)&this); } (); 1161 1162 if (ms_threadEvent is EventID.invalid) { 1163 ms_threadEvent = eventDriver.events.create(); 1164 assert(ms_threadEvent != EventID.invalid, "Failed to create event!"); 1165 } 1166 1167 MonoTime target_timeout, now; 1168 if (timeout != Duration.max) { 1169 try now = MonoTime.currTime(); 1170 catch (Exception e) { assert(false, e.msg); } 1171 target_timeout = now + timeout; 1172 } 1173 1174 int ec = this.emitCount; 1175 1176 acquireThreadWaiter((scope ThreadWaiter w) { 1177 while (ec - emit_count <= 0) { 1178 w.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, ms_threadEvent, () => (this.emitCount - emit_count) > 0); 1179 ec = this.emitCount; 1180 1181 if (timeout != Duration.max) { 1182 try now = MonoTime.currTime(); 1183 catch (Exception e) { assert(false, e.msg); } 1184 if (now >= target_timeout) break; 1185 } 1186 } 1187 }); 1188 1189 return ec; 1190 } 1191 1192 private void acquireThreadWaiter(DEL)(scope DEL del) 1193 shared { 1194 import vibe.internal.allocator : processAllocator, makeGCSafe; 1195 1196 ThreadWaiter w; 1197 auto drv = eventDriver; 1198 1199 with (m_waiters.lock) { 1200 active.iterate((aw) { 1201 if (aw.m_driver is drv) { 1202 w = aw; 1203 w.addRef(); 1204 return false; 1205 } 1206 return true; 1207 }); 1208 1209 if (!w) { 1210 free.filter((fw) { 1211 if (fw.m_driver is drv) { 1212 w = fw; 1213 w.addRef(); 1214 return false; 1215 } 1216 return true; 1217 }); 1218 1219 if (!w) { 1220 () @trusted { 1221 try { 1222 w = processAllocator.makeGCSafe!ThreadWaiter; 1223 w.m_driver = drv; 1224 w.m_event = ms_threadEvent; 1225 } catch (Exception e) { 1226 assert(false, "Failed to allocate thread waiter."); 1227 } 1228 } (); 1229 } 1230 1231 assert(w.m_refCount == 1); 1232 active.add(w); 1233 } 1234 } 1235 1236 scope (exit) releaseWaiter(w); 1237 1238 del(w); 1239 } 1240 1241 private void releaseWaiter(ThreadWaiter w) 1242 shared nothrow { 1243 if (!w.releaseRef()) { 1244 assert(w.m_driver is eventDriver, "Waiter was reassigned a different driver!?"); 1245 assert(w.unused, "Waiter still used, but not referenced!?"); 1246 with (m_waiters.lock) { 1247 auto rmvd = active.remove(w); 1248 assert(rmvd, "Waiter not in active queue anymore!?"); 1249 free.add(w); 1250 // TODO: cap size of m_freeWaiters 1251 } 1252 } 1253 } 1254 } 1255 1256 unittest { 1257 import vibe.core.core : exitEventLoop, runEventLoop, runTask, runWorkerTaskH, sleep; 1258 1259 auto e = createSharedManualEvent(); 1260 auto w1 = runTask({ e.waitUninterruptible(100.msecs, e.emitCount); }); 1261 static void w(shared(ManualEvent)* e) { e.waitUninterruptible(500.msecs, e.emitCount); } 1262 auto w2 = runWorkerTaskH(&w, &e); 1263 runTask({ 1264 try sleep(50.msecs); 1265 catch (Exception e) assert(false, e.msg); 1266 e.emit(); 1267 try sleep(50.msecs); 1268 catch (Exception e) assert(false, e.msg); 1269 assert(!w1.running && !w2.running); 1270 exitEventLoop(); 1271 }); 1272 runEventLoop(); 1273 } 1274 1275 unittest { 1276 import vibe.core.core : runTask, runWorkerTaskH, setTimer, sleep; 1277 import vibe.core.taskpool : TaskPool; 1278 import core.time : msecs, usecs; 1279 import std.concurrency : send, receiveOnly; 1280 import std.random : uniform; 1281 1282 auto tpool = new shared TaskPool(4); 1283 scope (exit) tpool.terminate(); 1284 1285 static void test(shared(ManualEvent)* evt, Task owner) 1286 nothrow { 1287 try owner.tid.send(Task.getThis()); 1288 catch (Exception e) assert(false, e.msg); 1289 1290 int ec = evt.emitCount; 1291 auto thist = Task.getThis(); 1292 auto tm = setTimer(500.msecs, { thist.interrupt(); }); // watchdog 1293 scope (exit) tm.stop(); 1294 while (ec < 5_000) { 1295 tm.rearm(500.msecs); 1296 try sleep(uniform(0, 10_000).usecs); 1297 catch (Exception e) assert(false, e.msg); 1298 try if (uniform(0, 10) == 0) evt.emit(); 1299 catch (Exception e) assert(false, e.msg); 1300 auto ecn = evt.waitUninterruptible(ec); 1301 assert(ecn > ec); 1302 ec = ecn; 1303 } 1304 } 1305 1306 auto watchdog = setTimer(30.seconds, { assert(false, "ManualEvent test has hung."); }); 1307 scope (exit) watchdog.stop(); 1308 1309 auto e = createSharedManualEvent(); 1310 Task[] tasks; 1311 1312 runTask(() nothrow { 1313 auto thist = Task.getThis(); 1314 1315 // start 25 tasks in each thread 1316 foreach (i; 0 .. 25) tpool.runTaskDist(&test, &e, thist); 1317 // collect all task handles 1318 try foreach (i; 0 .. 4*25) tasks ~= receiveOnly!Task; 1319 catch (Exception e) assert(false, e.msg); 1320 1321 auto tm = setTimer(500.msecs, { thist.interrupt(); }); // watchdog 1322 scope (exit) tm.stop(); 1323 int pec = 0; 1324 while (e.emitCount < 5_000) { 1325 tm.rearm(500.msecs); 1326 try sleep(50.usecs); 1327 catch (Exception e) assert(false, e.msg); 1328 e.emit(); 1329 } 1330 1331 // wait for all worker tasks to finish 1332 foreach (t; tasks) t.joinUninterruptible(); 1333 }).join(); 1334 } 1335 1336 package shared struct Monitor(T, M) 1337 { 1338 alias Mutex = M; 1339 alias Data = T; 1340 private { 1341 Mutex mutex; 1342 Data data; 1343 } 1344 1345 static struct Locked { 1346 shared(Monitor)* m; 1347 @disable this(this); 1348 ~this() { 1349 () @trusted { 1350 static if (is(typeof(Mutex.init.unlock_nothrow()))) 1351 (cast(Mutex)m.mutex).unlock_nothrow(); 1352 else (cast(Mutex)m.mutex).unlock(); 1353 } (); 1354 } 1355 ref inout(Data) get() inout @trusted { return *cast(inout(Data)*)&m.data; } 1356 alias get this; 1357 } 1358 1359 Locked lock() { 1360 () @trusted { 1361 static if (is(typeof(Mutex.init.lock_nothrow()))) 1362 (cast(Mutex)mutex).lock_nothrow(); 1363 else (cast(Mutex)mutex).lock(); 1364 } (); 1365 return Locked(() @trusted { return &this; } ()); 1366 } 1367 1368 const(Locked) lock() const { 1369 () @trusted { 1370 static if (is(typeof(Mutex.init.lock_nothrow()))) 1371 (cast(Mutex)mutex).lock_nothrow(); 1372 else (cast(Mutex)mutex).lock(); 1373 } (); 1374 return const(Locked)(() @trusted { return &this; } ()); 1375 } 1376 } 1377 1378 1379 package shared(Monitor!(T, M)) createMonitor(T, M)(M mutex) 1380 @trusted { 1381 shared(Monitor!(T, M)) ret; 1382 ret.mutex = cast(shared)mutex; 1383 return ret; 1384 } 1385 1386 1387 private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) { 1388 import vibe.internal.list : CircularDList; 1389 1390 private { 1391 static struct TaskWaiter { 1392 TaskWaiter* prev, next; 1393 void delegate() @safe nothrow notifier; 1394 1395 void wait(void delegate() @safe nothrow del) @safe nothrow { 1396 assert(notifier is null, "Local waiter is used twice!"); 1397 notifier = del; 1398 } 1399 void cancel() @safe nothrow { notifier = null; } 1400 void emit() @safe nothrow { auto n = notifier; notifier = null; n(); } 1401 } 1402 1403 static if (EVENT_TRIGGERED) { 1404 package(vibe) ThreadLocalWaiter next; // queue of other waiters of the same thread 1405 NativeEventDriver m_driver; 1406 EventID m_event = EventID.invalid; 1407 } else { 1408 int m_emitCount = 0; 1409 } 1410 int m_refCount = 1; 1411 TaskWaiter m_pivot; 1412 TaskWaiter m_emitPivot; 1413 CircularDList!(TaskWaiter*) m_waiters; 1414 } 1415 1416 this() 1417 { 1418 m_waiters = CircularDList!(TaskWaiter*)(() @trusted { return &m_pivot; } ()); 1419 } 1420 1421 static if (EVENT_TRIGGERED) { 1422 ~this() 1423 { 1424 import vibe.core.internal.release : releaseHandle; 1425 1426 if (m_event != EventID.invalid) 1427 releaseHandle!"events"(m_event, () @trusted { return cast(shared)m_driver; } ()); 1428 } 1429 } 1430 1431 @property bool unused() const @safe nothrow { return m_waiters.empty; } 1432 1433 void addRef() @safe nothrow { assert(m_refCount >= 0); m_refCount++; } 1434 bool releaseRef() @safe nothrow { assert(m_refCount > 0); return --m_refCount > 0; } 1435 1436 bool wait(bool interruptible)(Duration timeout, EventID evt = EventID.invalid, scope bool delegate() @safe nothrow exit_condition = null) 1437 @safe { 1438 import core.time : MonoTime; 1439 import vibe.internal.async : Waitable, asyncAwaitAny; 1440 1441 TaskWaiter waiter_store; 1442 TaskWaiter* waiter = () @trusted { return &waiter_store; } (); 1443 1444 m_waiters.insertBack(waiter); 1445 assert(waiter.next !is null); 1446 scope (exit) 1447 if (waiter.next !is null) { 1448 m_waiters.remove(waiter); 1449 assert(!waiter.next); 1450 } 1451 1452 MonoTime target_timeout, now; 1453 if (timeout != Duration.max) { 1454 try now = MonoTime.currTime(); 1455 catch (Exception e) { assert(false, e.msg); } 1456 target_timeout = now + timeout; 1457 } 1458 1459 bool cancelled; 1460 1461 alias waitable = Waitable!(typeof(TaskWaiter.notifier), 1462 (cb) { waiter.wait(cb); }, 1463 (cb) { cancelled = true; waiter.cancel(); }, 1464 () {} 1465 ); 1466 1467 alias ewaitable = Waitable!(EventCallback, 1468 (cb) { 1469 eventDriver.events.wait(evt, cb); 1470 // check for exit condition *after* starting to wait for the event 1471 // to avoid a race condition 1472 if (exit_condition()) { 1473 eventDriver.events.cancelWait(evt, cb); 1474 cb(evt); 1475 } 1476 }, 1477 (cb) { eventDriver.events.cancelWait(evt, cb); }, 1478 (EventID) {} 1479 ); 1480 1481 if (evt != EventID.invalid) { 1482 asyncAwaitAny!(interruptible, waitable, ewaitable)(timeout); 1483 } else { 1484 asyncAwaitAny!(interruptible, waitable)(timeout); 1485 } 1486 1487 if (cancelled) { 1488 assert(waiter.next !is null, "Cancelled waiter not in queue anymore!?"); 1489 return false; 1490 } else { 1491 assert(waiter.next is null, "Triggered waiter still in queue!?"); 1492 return true; 1493 } 1494 } 1495 1496 void emit() 1497 @safe nothrow { 1498 import std.algorithm.mutation : swap; 1499 import vibe.core.core : yield; 1500 1501 if (m_waiters.empty) return; 1502 1503 TaskWaiter* pivot = () @trusted { return &m_emitPivot; } (); 1504 1505 if (pivot.next) { // another emit in progress? 1506 // shift pivot to the end, so that the other emit call will process all pending waiters 1507 if (pivot !is m_waiters.back) { 1508 m_waiters.remove(pivot); 1509 m_waiters.insertBack(pivot); 1510 } 1511 return; 1512 } 1513 1514 m_waiters.insertBack(pivot); 1515 scope (exit) m_waiters.remove(pivot); 1516 1517 foreach (w; m_waiters) { 1518 if (w is pivot) break; 1519 emitWaiter(w); 1520 } 1521 } 1522 1523 bool emitSingle() 1524 @safe nothrow { 1525 if (m_waiters.empty) return false; 1526 1527 TaskWaiter* pivot = () @trusted { return &m_emitPivot; } (); 1528 1529 if (pivot.next) { // another emit in progress? 1530 // shift pivot to the right, so that the other emit call will process another waiter 1531 if (pivot !is m_waiters.back) { 1532 auto n = pivot.next; 1533 m_waiters.remove(pivot); 1534 m_waiters.insertAfter(pivot, n); 1535 } 1536 return true; 1537 } 1538 1539 emitWaiter(m_waiters.front); 1540 return true; 1541 } 1542 1543 private void emitWaiter(TaskWaiter* w) 1544 @safe nothrow { 1545 m_waiters.remove(w); 1546 1547 if (w.notifier !is null) { 1548 logTrace("notify task %s %s %s", cast(void*)w, () @trusted { return cast(void*)w.notifier.funcptr; } (), w.notifier.ptr); 1549 w.emit(); 1550 } else logTrace("notify callback is null"); 1551 } 1552 } 1553 1554 private struct TaskMutexImpl(bool INTERRUPTIBLE) { 1555 private { 1556 shared(bool) m_locked = false; 1557 shared(uint) m_waiters = 0; 1558 shared(ManualEvent) m_signal; 1559 debug Task m_owner; 1560 } 1561 1562 void setup() 1563 { 1564 m_signal.initialize(); 1565 } 1566 1567 @trusted bool tryLock() 1568 nothrow { 1569 if (cas(&m_locked, false, true)) { 1570 debug m_owner = Task.getThis(); 1571 debug(VibeMutexLog) logTrace("mutex %s lock %s", cast(void*)&this, atomicLoad(m_waiters)); 1572 return true; 1573 } 1574 return false; 1575 } 1576 1577 @trusted bool lock(Duration timeout = Duration.max) 1578 { 1579 if (tryLock()) return true; 1580 debug assert(m_owner == Task() || m_owner != Task.getThis(), "Recursive mutex lock."); 1581 atomicOp!"+="(m_waiters, 1); 1582 debug(VibeMutexLog) logTrace("mutex %s wait %s", cast(void*)&this, atomicLoad(m_waiters)); 1583 scope(exit) atomicOp!"-="(m_waiters, 1); 1584 auto ecnt = m_signal.emitCount(); 1585 MonoTime target = MonoTime.currTime + timeout; 1586 while (!tryLock()) { 1587 auto now = MonoTime.currTime; 1588 if (timeout != Duration.max && now >= target) 1589 return false; 1590 1591 auto remaining = timeout != Duration.max ? target - now : Duration.max; 1592 static if (INTERRUPTIBLE) ecnt = m_signal.wait(remaining, ecnt); 1593 else ecnt = m_signal.waitUninterruptible(remaining, ecnt); 1594 } 1595 return true; 1596 } 1597 1598 @trusted void unlock() 1599 { 1600 assert(m_locked); 1601 debug { 1602 assert(m_owner == Task.getThis()); 1603 m_owner = Task(); 1604 } 1605 atomicStore!(MemoryOrder.rel)(m_locked, false); 1606 debug(VibeMutexLog) logTrace("mutex %s unlock %s", cast(void*)&this, atomicLoad(m_waiters)); 1607 if (atomicLoad(m_waiters) > 0) 1608 m_signal.emit(); 1609 } 1610 } 1611 1612 private struct RecursiveTaskMutexImpl(bool INTERRUPTIBLE) { 1613 import std.stdio; 1614 private { 1615 core.sync.mutex.Mutex m_mutex; 1616 Task m_owner; 1617 size_t m_recCount = 0; 1618 shared(uint) m_waiters = 0; 1619 shared(ManualEvent) m_signal; 1620 @property bool m_locked() const { return m_recCount > 0; } 1621 } 1622 1623 void setup() 1624 { 1625 m_mutex = new core.sync.mutex.Mutex; 1626 m_signal.initialize(); 1627 } 1628 1629 @trusted bool tryLock() 1630 nothrow { 1631 auto self = Task.getThis(); 1632 return m_mutex.performLocked!({ 1633 if (!m_owner) { 1634 assert(m_recCount == 0); 1635 m_recCount = 1; 1636 m_owner = self; 1637 return true; 1638 } else if (m_owner == self) { 1639 m_recCount++; 1640 return true; 1641 } 1642 return false; 1643 }); 1644 } 1645 1646 @trusted void lock() 1647 { 1648 if (tryLock()) return; 1649 atomicOp!"+="(m_waiters, 1); 1650 debug(VibeMutexLog) logTrace("mutex %s wait %s", cast(void*)&this, atomicLoad(m_waiters)); 1651 scope(exit) atomicOp!"-="(m_waiters, 1); 1652 auto ecnt = m_signal.emitCount(); 1653 while (!tryLock()) { 1654 static if (INTERRUPTIBLE) ecnt = m_signal.wait(ecnt); 1655 else ecnt = m_signal.waitUninterruptible(ecnt); 1656 } 1657 } 1658 1659 @trusted void unlock() 1660 { 1661 auto self = Task.getThis(); 1662 m_mutex.performLocked!({ 1663 assert(m_owner == self); 1664 assert(m_recCount > 0); 1665 m_recCount--; 1666 if (m_recCount == 0) { 1667 m_owner = Task.init; 1668 } 1669 }); 1670 debug(VibeMutexLog) logTrace("mutex %s unlock %s", cast(void*)&this, atomicLoad(m_waiters)); 1671 if (atomicLoad(m_waiters) > 0) 1672 m_signal.emit(); 1673 } 1674 } 1675 1676 private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) { 1677 private { 1678 LOCKABLE m_mutex; 1679 static if (is(LOCKABLE == Mutex)) 1680 TaskMutex m_taskMutex; 1681 shared(ManualEvent) m_signal; 1682 } 1683 1684 static if (is(LOCKABLE == Lockable)) { 1685 final class MutexWrapper : Lockable { 1686 private core.sync.mutex.Mutex m_mutex; 1687 this(core.sync.mutex.Mutex mtx) { m_mutex = mtx; } 1688 @trusted void lock() { m_mutex.lock(); } 1689 @trusted void unlock() { m_mutex.unlock(); } 1690 @trusted bool tryLock() { return m_mutex.tryLock(); } 1691 } 1692 1693 void setupForMutex(core.sync.mutex.Mutex mtx) 1694 { 1695 setup(new MutexWrapper(mtx)); 1696 } 1697 } 1698 1699 @disable this(this); 1700 1701 void setup(LOCKABLE mtx) 1702 { 1703 m_mutex = mtx; 1704 static if (is(typeof(m_taskMutex))) 1705 m_taskMutex = cast(TaskMutex)mtx; 1706 m_signal.initialize(); 1707 } 1708 1709 @property LOCKABLE mutex() { return m_mutex; } 1710 1711 @trusted void wait() 1712 { 1713 if (auto tm = cast(TaskMutex)m_mutex) { 1714 assert(tm.m_impl.m_locked); 1715 debug assert(tm.m_impl.m_owner == Task.getThis()); 1716 } 1717 1718 auto refcount = m_signal.emitCount; 1719 1720 static if (is(LOCKABLE == Mutex)) { 1721 if (m_taskMutex) m_taskMutex.unlock(); 1722 else m_mutex.unlock_nothrow(); 1723 } else m_mutex.unlock(); 1724 1725 scope(exit) { 1726 static if (is(LOCKABLE == Mutex)) { 1727 if (m_taskMutex) m_taskMutex.lock(); 1728 else m_mutex.lock_nothrow(); 1729 } else m_mutex.lock(); 1730 } 1731 static if (INTERRUPTIBLE) m_signal.wait(refcount); 1732 else m_signal.waitUninterruptible(refcount); 1733 } 1734 1735 @trusted bool wait(Duration timeout) 1736 { 1737 assert(!timeout.isNegative()); 1738 if (auto tm = cast(TaskMutex)m_mutex) { 1739 assert(tm.m_impl.m_locked); 1740 debug assert(tm.m_impl.m_owner == Task.getThis()); 1741 } 1742 1743 auto refcount = m_signal.emitCount; 1744 1745 static if (is(LOCKABLE == Mutex)) { 1746 if (m_taskMutex) m_taskMutex.unlock(); 1747 else m_mutex.unlock_nothrow(); 1748 } else m_mutex.unlock(); 1749 1750 scope(exit) { 1751 static if (is(LOCKABLE == Mutex)) { 1752 if (m_taskMutex) m_taskMutex.lock(); 1753 else m_mutex.lock_nothrow(); 1754 } else m_mutex.lock(); 1755 } 1756 1757 static if (INTERRUPTIBLE) return m_signal.wait(timeout, refcount) != refcount; 1758 else return m_signal.waitUninterruptible(timeout, refcount) != refcount; 1759 } 1760 1761 @trusted void notify() 1762 { 1763 m_signal.emit(); 1764 } 1765 1766 @trusted void notifyAll() 1767 { 1768 m_signal.emit(); 1769 } 1770 } 1771 1772 /** Contains the shared state of a $(D TaskReadWriteMutex). 1773 * 1774 * Since a $(D TaskReadWriteMutex) consists of two actual Mutex 1775 * objects that rely on common memory, this class implements 1776 * the actual functionality of their method calls. 1777 * 1778 * The method implementations are based on two static parameters 1779 * ($(D INTERRUPTIBLE) and $(D INTENT)), which are configured through 1780 * template arguments: 1781 * 1782 * - $(D INTERRUPTIBLE) determines whether the mutex implementation 1783 * are interruptible by vibe.d's $(D vibe.core.task.Task.interrupt()) 1784 * method or not. 1785 * 1786 * - $(D INTENT) describes the intent, with which a locking operation is 1787 * performed (i.e. $(D READ_ONLY) or $(D READ_WRITE)). RO locking allows for 1788 * multiple Tasks holding the mutex, whereas RW locking will cause 1789 * a "bottleneck" so that only one Task can write to the protected 1790 * data at once. 1791 */ 1792 private struct ReadWriteMutexState(bool INTERRUPTIBLE) 1793 { 1794 /** The policy with which the mutex should operate. 1795 * 1796 * The policy determines how the acquisition of the locks is 1797 * performed and can be used to tune the mutex according to the 1798 * underlying algorithm in which it is used. 1799 * 1800 * According to the provided policy, the mutex will either favor 1801 * reading or writing tasks and could potentially starve the 1802 * respective opposite. 1803 * 1804 * cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy) 1805 */ 1806 enum Policy : int 1807 { 1808 /** Readers are prioritized, writers may be starved as a result. */ 1809 PREFER_READERS = 0, 1810 /** Writers are prioritized, readers may be starved as a result. */ 1811 PREFER_WRITERS 1812 } 1813 1814 /** The intent with which a locking operation is performed. 1815 * 1816 * Since both locks share the same underlying algorithms, the actual 1817 * intent with which a lock operation is performed (i.e read/write) 1818 * are passed as a template parameter to each method. 1819 */ 1820 enum LockingIntent : bool 1821 { 1822 /** Perform a read lock/unlock operation. Multiple reading locks can be 1823 * active at a time. */ 1824 READ_ONLY = 0, 1825 /** Perform a write lock/unlock operation. Only a single writer can 1826 * hold a lock at any given time. */ 1827 READ_WRITE = 1 1828 } 1829 1830 private { 1831 //Queue counters 1832 /** The number of reading tasks waiting for the lock to become available. */ 1833 shared(uint) m_waitingForReadLock = 0; 1834 /** The number of writing tasks waiting for the lock to become available. */ 1835 shared(uint) m_waitingForWriteLock = 0; 1836 1837 //Lock counters 1838 /** The number of reading tasks that currently hold the lock. */ 1839 uint m_activeReadLocks = 0; 1840 /** The number of writing tasks that currently hold the lock (binary). */ 1841 ubyte m_activeWriteLocks = 0; 1842 1843 /** The policy determining the lock's behavior. */ 1844 Policy m_policy; 1845 1846 //Queue Events 1847 /** The event used to wake reading tasks waiting for the lock while it is blocked. */ 1848 shared(ManualEvent) m_readyForReadLock; 1849 /** The event used to wake writing tasks waiting for the lock while it is blocked. */ 1850 shared(ManualEvent) m_readyForWriteLock; 1851 1852 /** The underlying mutex that gates the access to the shared state. */ 1853 Mutex m_counterMutex; 1854 } 1855 1856 this(Policy policy) 1857 { 1858 m_policy = policy; 1859 m_counterMutex = new Mutex(); 1860 m_readyForReadLock = createSharedManualEvent(); 1861 m_readyForWriteLock = createSharedManualEvent(); 1862 } 1863 1864 @disable this(this); 1865 1866 /** The policy with which the lock has been created. */ 1867 @property policy() const { return m_policy; } 1868 1869 version(RWMutexPrint) 1870 { 1871 /** Print out debug information during lock operations. */ 1872 void printInfo(string OP, LockingIntent INTENT)() nothrow 1873 { 1874 import std.string; 1875 try 1876 { 1877 import std.stdio; 1878 writefln("RWMutex: %s (%s), active: RO: %d, RW: %d; waiting: RO: %d, RW: %d", 1879 OP.leftJustify(10,' '), 1880 INTENT == LockingIntent.READ_ONLY ? "RO" : "RW", 1881 m_activeReadLocks, m_activeWriteLocks, 1882 m_waitingForReadLock, m_waitingForWriteLock 1883 ); 1884 } 1885 catch (Throwable t){} 1886 } 1887 } 1888 1889 /** An internal shortcut method to determine the queue event for a given intent. */ 1890 @property ref auto queueEvent(LockingIntent INTENT)() 1891 { 1892 static if (INTENT == LockingIntent.READ_ONLY) 1893 return m_readyForReadLock; 1894 else 1895 return m_readyForWriteLock; 1896 } 1897 1898 /** An internal shortcut method to determine the queue counter for a given intent. */ 1899 @property ref auto queueCounter(LockingIntent INTENT)() 1900 { 1901 static if (INTENT == LockingIntent.READ_ONLY) 1902 return m_waitingForReadLock; 1903 else 1904 return m_waitingForWriteLock; 1905 } 1906 1907 /** An internal shortcut method to determine the current emitCount of the queue counter for a given intent. */ 1908 int emitCount(LockingIntent INTENT)() 1909 { 1910 return queueEvent!INTENT.emitCount(); 1911 } 1912 1913 /** An internal shortcut method to determine the active counter for a given intent. */ 1914 @property ref auto activeCounter(LockingIntent INTENT)() 1915 { 1916 static if (INTENT == LockingIntent.READ_ONLY) 1917 return m_activeReadLocks; 1918 else 1919 return m_activeWriteLocks; 1920 } 1921 1922 /** An internal shortcut method to wait for the queue event for a given intent. 1923 * 1924 * This method is used during the `lock()` operation, after a 1925 * `tryLock()` operation has been unsuccessfully finished. 1926 * The active fiber will yield and be suspended until the queue event 1927 * for the given intent will be fired. 1928 */ 1929 int wait(LockingIntent INTENT)(int count) 1930 { 1931 static if (INTERRUPTIBLE) 1932 return queueEvent!INTENT.wait(count); 1933 else 1934 return queueEvent!INTENT.waitUninterruptible(count); 1935 } 1936 1937 /** An internal shortcut method to notify tasks waiting for the lock to become available again. 1938 * 1939 * This method is called whenever the number of owners of the mutex hits 1940 * zero; this is basically the counterpart to `wait()`. 1941 * It wakes any Task currently waiting for the mutex to be released. 1942 */ 1943 @trusted void notify(LockingIntent INTENT)() 1944 { 1945 static if (INTENT == LockingIntent.READ_ONLY) 1946 { //If the last reader unlocks the mutex, notify all waiting writers 1947 if (atomicLoad(m_waitingForWriteLock) > 0) 1948 m_readyForWriteLock.emit(); 1949 } 1950 else 1951 { //If a writer unlocks the mutex, notify both readers and writers 1952 if (atomicLoad(m_waitingForReadLock) > 0) 1953 m_readyForReadLock.emit(); 1954 1955 if (atomicLoad(m_waitingForWriteLock) > 0) 1956 m_readyForWriteLock.emit(); 1957 } 1958 } 1959 1960 /** An internal method that performs the acquisition attempt in different variations. 1961 * 1962 * Since both locks rely on a common TaskMutex object which gates the access 1963 * to their common data acquisition attempts for this lock are more complex 1964 * than for simple mutex variants. This method will thus be performing the 1965 * `tryLock()` operation in two variations, depending on the callee: 1966 * 1967 * If called from the outside ($(D WAIT_FOR_BLOCKING_MUTEX) = false), the method 1968 * will instantly fail if the underlying mutex is locked (i.e. during another 1969 * `tryLock()` or `unlock()` operation), in order to guarantee the fastest 1970 * possible locking attempt. 1971 * 1972 * If used internally by the `lock()` method ($(D WAIT_FOR_BLOCKING_MUTEX) = true), 1973 * the operation will wait for the mutex to be available before deciding if 1974 * the lock can be acquired, since the attempt would anyway be repeated until 1975 * it succeeds. This will prevent frequent retries under heavy loads and thus 1976 * should ensure better performance. 1977 */ 1978 @trusted bool tryLock(LockingIntent INTENT, bool WAIT_FOR_BLOCKING_MUTEX)() 1979 { 1980 //Log a debug statement for the attempt 1981 version(RWMutexPrint) 1982 printInfo!("tryLock",INTENT)(); 1983 1984 //Try to acquire the lock 1985 static if (!WAIT_FOR_BLOCKING_MUTEX) 1986 { 1987 if (!m_counterMutex.tryLock()) 1988 return false; 1989 } 1990 else 1991 m_counterMutex.lock(); 1992 1993 scope(exit) 1994 m_counterMutex.unlock(); 1995 1996 //Log a debug statement for the attempt 1997 version(RWMutexPrint) 1998 printInfo!("checkCtrs",INTENT)(); 1999 2000 //Check if there's already an active writer 2001 if (m_activeWriteLocks > 0) 2002 return false; 2003 2004 //If writers are preferred over readers, check whether there 2005 //currently is a writer in the waiting queue and abort if 2006 //that's the case. 2007 static if (INTENT == LockingIntent.READ_ONLY) 2008 if (m_policy.PREFER_WRITERS && m_waitingForWriteLock > 0) 2009 return false; 2010 2011 //If we are locking the mutex for writing, make sure that 2012 //there's no reader active. 2013 static if (INTENT == LockingIntent.READ_WRITE) 2014 if (m_activeReadLocks > 0) 2015 return false; 2016 2017 //We can successfully acquire the lock! 2018 //Log a debug statement for the success. 2019 version(RWMutexPrint) 2020 printInfo!("lock",INTENT)(); 2021 2022 //Increase the according counter 2023 //(number of active readers/writers) 2024 //and return a success code. 2025 activeCounter!INTENT += 1; 2026 return true; 2027 } 2028 2029 /** Attempt to acquire the lock for a given intent. 2030 * 2031 * Returns: 2032 * `true`, if the lock was successfully acquired; 2033 * `false` otherwise. 2034 */ 2035 @trusted bool tryLock(LockingIntent INTENT)() 2036 { 2037 //Try to lock this mutex without waiting for the underlying 2038 //TaskMutex - fail if it is already blocked. 2039 return tryLock!(INTENT,false)(); 2040 } 2041 2042 /** Acquire the lock for the given intent; yield and suspend until the lock has been acquired. */ 2043 @trusted void lock(LockingIntent INTENT)() 2044 { 2045 //Prepare a waiting action before the first 2046 //`tryLock()` call in order to avoid a race 2047 //condition that could lead to the queue notification 2048 //not being fired. 2049 auto count = emitCount!INTENT; 2050 atomicOp!"+="(queueCounter!INTENT,1); 2051 scope(exit) 2052 atomicOp!"-="(queueCounter!INTENT,1); 2053 2054 //Try to lock the mutex 2055 auto locked = tryLock!(INTENT,true)(); 2056 if (locked) 2057 return; 2058 2059 //Retry until we successfully acquired the lock 2060 while(!locked) 2061 { 2062 version(RWMutexPrint) 2063 printInfo!("wait",INTENT)(); 2064 2065 count = wait!INTENT(count); 2066 locked = tryLock!(INTENT,true)(); 2067 } 2068 } 2069 2070 /** Unlock the mutex after a successful acquisition. */ 2071 @trusted void unlock(LockingIntent INTENT)() 2072 { 2073 version(RWMutexPrint) 2074 printInfo!("unlock",INTENT)(); 2075 2076 debug assert(activeCounter!INTENT > 0); 2077 2078 synchronized(m_counterMutex) 2079 { 2080 //Decrement the counter of active lock holders. 2081 //If the counter hits zero, notify waiting Tasks 2082 activeCounter!INTENT -= 1; 2083 if (activeCounter!INTENT == 0) 2084 { 2085 version(RWMutexPrint) 2086 printInfo!("notify",INTENT)(); 2087 2088 notify!INTENT(); 2089 } 2090 } 2091 } 2092 } 2093 2094 /** A ReadWriteMutex implementation for fibers. 2095 * 2096 * This mutex can be used in exchange for a $(D core.sync.mutex.ReadWriteMutex), 2097 * but does not block the event loop in contention situations. The `reader` and `writer` 2098 * members are used for locking. Locking the `reader` mutex allows access to multiple 2099 * readers at once, while the `writer` mutex only allows a single writer to lock it at 2100 * any given time. Locks on `reader` and `writer` are mutually exclusive (i.e. whenever a 2101 * writer is active, no readers can be active at the same time, and vice versa). 2102 * 2103 * Notice: 2104 * Mutexes implemented by this class cannot be interrupted 2105 * using $(D vibe.core.task.Task.interrupt()). The corresponding 2106 * InterruptException will be deferred until the next blocking 2107 * operation yields the event loop. 2108 * 2109 * Use $(D InterruptibleTaskReadWriteMutex) as an alternative that can be 2110 * interrupted. 2111 * 2112 * cf. $(D core.sync.mutex.ReadWriteMutex) 2113 */ 2114 final class TaskReadWriteMutex 2115 { 2116 private { 2117 alias State = ReadWriteMutexState!false; 2118 alias LockingIntent = State.LockingIntent; 2119 alias READ_ONLY = LockingIntent.READ_ONLY; 2120 alias READ_WRITE = LockingIntent.READ_WRITE; 2121 2122 /** The shared state used by the reader and writer mutexes. */ 2123 State m_state; 2124 } 2125 2126 /** The policy with which the mutex should operate. 2127 * 2128 * The policy determines how the acquisition of the locks is 2129 * performed and can be used to tune the mutex according to the 2130 * underlying algorithm in which it is used. 2131 * 2132 * According to the provided policy, the mutex will either favor 2133 * reading or writing tasks and could potentially starve the 2134 * respective opposite. 2135 * 2136 * cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy) 2137 */ 2138 alias Policy = State.Policy; 2139 2140 /** A common baseclass for both of the provided mutexes. 2141 * 2142 * The intent for the according mutex is specified through the 2143 * $(D INTENT) template argument, which determines if a mutex is 2144 * used for read or write locking. 2145 */ 2146 final class Mutex(LockingIntent INTENT): core.sync.mutex.Mutex, Lockable 2147 { 2148 /** Try to lock the mutex. cf. $(D core.sync.mutex.Mutex) */ 2149 override bool tryLock() { return m_state.tryLock!INTENT(); } 2150 /** Lock the mutex. cf. $(D core.sync.mutex.Mutex) */ 2151 override void lock() { m_state.lock!INTENT(); } 2152 /** Unlock the mutex. cf. $(D core.sync.mutex.Mutex) */ 2153 override void unlock() { m_state.unlock!INTENT(); } 2154 } 2155 alias Reader = Mutex!READ_ONLY; 2156 alias Writer = Mutex!READ_WRITE; 2157 2158 Reader reader; 2159 Writer writer; 2160 2161 this(Policy policy = Policy.PREFER_WRITERS) 2162 { 2163 m_state = State(policy); 2164 reader = new Reader(); 2165 writer = new Writer(); 2166 } 2167 2168 /** The policy with which the lock has been created. */ 2169 @property Policy policy() const { return m_state.policy; } 2170 } 2171 2172 /** Alternative to $(D TaskReadWriteMutex) that supports interruption. 2173 * 2174 * This class supports the use of $(D vibe.core.task.Task.interrupt()) while 2175 * waiting in the `lock()` method. 2176 * 2177 * cf. $(D core.sync.mutex.ReadWriteMutex) 2178 */ 2179 final class InterruptibleTaskReadWriteMutex 2180 { 2181 @safe: 2182 2183 private { 2184 alias State = ReadWriteMutexState!true; 2185 alias LockingIntent = State.LockingIntent; 2186 alias READ_ONLY = LockingIntent.READ_ONLY; 2187 alias READ_WRITE = LockingIntent.READ_WRITE; 2188 2189 /** The shared state used by the reader and writer mutexes. */ 2190 State m_state; 2191 } 2192 2193 /** The policy with which the mutex should operate. 2194 * 2195 * The policy determines how the acquisition of the locks is 2196 * performed and can be used to tune the mutex according to the 2197 * underlying algorithm in which it is used. 2198 * 2199 * According to the provided policy, the mutex will either favor 2200 * reading or writing tasks and could potentially starve the 2201 * respective opposite. 2202 * 2203 * cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy) 2204 */ 2205 alias Policy = State.Policy; 2206 2207 /** A common baseclass for both of the provided mutexes. 2208 * 2209 * The intent for the according mutex is specified through the 2210 * $(D INTENT) template argument, which determines if a mutex is 2211 * used for read or write locking. 2212 * 2213 */ 2214 final class Mutex(LockingIntent INTENT): core.sync.mutex.Mutex, Lockable 2215 { 2216 /** Try to lock the mutex. cf. $(D core.sync.mutex.Mutex) */ 2217 override bool tryLock() { return m_state.tryLock!INTENT(); } 2218 /** Lock the mutex. cf. $(D core.sync.mutex.Mutex) */ 2219 override void lock() { m_state.lock!INTENT(); } 2220 /** Unlock the mutex. cf. $(D core.sync.mutex.Mutex) */ 2221 override void unlock() { m_state.unlock!INTENT(); } 2222 } 2223 alias Reader = Mutex!READ_ONLY; 2224 alias Writer = Mutex!READ_WRITE; 2225 2226 Reader reader; 2227 Writer writer; 2228 2229 this(Policy policy = Policy.PREFER_WRITERS) 2230 { 2231 m_state = State(policy); 2232 reader = new Reader(); 2233 writer = new Writer(); 2234 } 2235 2236 /** The policy with which the lock has been created. */ 2237 @property Policy policy() const { return m_state.policy; } 2238 }