1 /** 2 Event loop compatible task synchronization facilities. 3 4 This module provides replacement primitives for the modules in `core.sync` 5 that do not block vibe.d's event loop in their wait states. These should 6 always be preferred over the ones in Druntime under usual circumstances. 7 8 Using a standard `Mutex` is possible as long as it is ensured that no event 9 loop based functionality (I/O, task interaction or anything that implicitly 10 calls `vibe.core.core.yield`) is executed within a section of code that is 11 protected by the mutex. $(B Failure to do so may result in dead-locks and 12 high-level race-conditions!) 13 14 Copyright: © 2012-2019 Sönke Ludwig 15 Authors: Leonid Kramer, Sönke Ludwig, Manuel Frischknecht 16 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 17 */ 18 module vibe.core.sync; 19 20 import vibe.core.log : logDebugV, logTrace, logInfo; 21 import vibe.core.task; 22 23 import core.atomic; 24 import core.sync.mutex; 25 import core.sync.condition; 26 import eventcore.core; 27 import std.exception; 28 import std.stdio; 29 import std.traits : ReturnType; 30 31 32 /** Creates a new signal that can be shared between fibers. 33 */ 34 LocalManualEvent createManualEvent() 35 @safe nothrow { 36 LocalManualEvent ret; 37 ret.initialize(); 38 return ret; 39 } 40 /// ditto 41 shared(ManualEvent) createSharedManualEvent() 42 @trusted nothrow { 43 shared(ManualEvent) ret; 44 ret.initialize(); 45 return ret; 46 } 47 48 49 /** Performs RAII based locking/unlocking of a mutex. 50 51 Note that while `TaskMutex` can be used with D's built-in `synchronized` 52 statement, `InterruptibleTaskMutex` cannot. This function provides a 53 library based alternative that is suitable for use with all mutex types. 54 */ 55 ScopedMutexLock!M scopedMutexLock(M)(M mutex, LockMode mode = LockMode.lock) 56 if (is(M : Mutex) || is(M : Lockable)) 57 { 58 return ScopedMutexLock!M(mutex, mode); 59 } 60 61 /// 62 unittest { 63 import vibe.core.core : runWorkerTaskH; 64 65 __gshared int counter; 66 __gshared InterruptibleTaskMutex mutex; 67 68 mutex = new InterruptibleTaskMutex; 69 70 Task[] tasks; 71 72 foreach (i; 0 .. 100) { 73 tasks ~= runWorkerTaskH({ 74 auto l = scopedMutexLock(mutex); 75 counter++; 76 }); 77 } 78 79 foreach (t; tasks) t.join(); 80 81 assert(counter == 100); 82 } 83 84 unittest { 85 scopedMutexLock(new Mutex); 86 scopedMutexLock(new TaskMutex); 87 scopedMutexLock(new InterruptibleTaskMutex); 88 } 89 90 enum LockMode { 91 lock, 92 tryLock, 93 defer 94 } 95 96 interface Lockable { 97 @safe: 98 void lock(); 99 void unlock(); 100 bool tryLock(); 101 } 102 103 /** RAII lock for the Mutex class. 104 */ 105 struct ScopedMutexLock(M) 106 if (is(M : Mutex) || is(M : Lockable)) 107 { 108 @disable this(this); 109 private { 110 M m_mutex; 111 bool m_locked; 112 LockMode m_mode; 113 } 114 115 this(M mutex, LockMode mode = LockMode.lock) { 116 assert(mutex !is null); 117 m_mutex = mutex; 118 119 final switch (mode) { 120 case LockMode.lock: lock(); break; 121 case LockMode.tryLock: tryLock(); break; 122 case LockMode.defer: break; 123 } 124 } 125 126 ~this() 127 { 128 if( m_locked ) 129 m_mutex.unlock(); 130 } 131 132 @property bool locked() const { return m_locked; } 133 134 void unlock() 135 { 136 enforce(m_locked); 137 m_mutex.unlock(); 138 m_locked = false; 139 } 140 141 bool tryLock() 142 { 143 enforce(!m_locked); 144 return m_locked = m_mutex.tryLock(); 145 } 146 147 void lock() 148 { 149 enforce(!m_locked); 150 m_locked = true; 151 m_mutex.lock(); 152 } 153 } 154 155 156 /* 157 Only for internal use: 158 Ensures that a mutex is locked while executing the given procedure. 159 160 This function works for all kinds of mutexes, in particular for 161 $(D core.sync.mutex.Mutex), $(D TaskMutex) and $(D InterruptibleTaskMutex). 162 163 Returns: 164 Returns the value returned from $(D PROC), if any. 165 */ 166 /// private 167 ReturnType!PROC performLocked(alias PROC, MUTEX)(MUTEX mutex) 168 { 169 mutex.lock(); 170 scope (exit) mutex.unlock(); 171 return PROC(); 172 } 173 174 /// 175 unittest { 176 int protected_var = 0; 177 auto mtx = new TaskMutex; 178 mtx.performLocked!({ 179 protected_var++; 180 }); 181 } 182 183 184 /** 185 Thread-local semaphore implementation for tasks. 186 187 When the semaphore runs out of concurrent locks, it will suspend. This class 188 is used in `vibe.core.connectionpool` to limit the number of concurrent 189 connections. 190 */ 191 final class LocalTaskSemaphore 192 { 193 @safe: 194 195 // requires a queue 196 import std.container.binaryheap; 197 import std.container.array; 198 //import vibe.utils.memory; 199 200 private { 201 static struct ThreadWaiter { 202 ubyte priority; 203 uint seq; 204 } 205 206 BinaryHeap!(Array!ThreadWaiter, asc) m_waiters; 207 uint m_maxLocks; 208 uint m_locks; 209 uint m_seq; 210 LocalManualEvent m_signal; 211 } 212 213 this(uint max_locks) 214 { 215 m_maxLocks = max_locks; 216 m_signal = createManualEvent(); 217 } 218 219 /// Maximum number of concurrent locks 220 @property void maxLocks(uint max_locks) { m_maxLocks = max_locks; } 221 /// ditto 222 @property uint maxLocks() const { return m_maxLocks; } 223 224 /// Number of concurrent locks still available 225 @property uint available() const { return m_maxLocks - m_locks; } 226 227 /** Try to acquire a lock. 228 229 If a lock cannot be acquired immediately, returns `false` and leaves the 230 semaphore in its previous state. 231 232 Returns: 233 `true` is returned $(I iff) the number of available locks is greater 234 than one. 235 */ 236 bool tryLock() 237 { 238 if (available > 0) 239 { 240 m_locks++; 241 return true; 242 } 243 return false; 244 } 245 246 /** Acquires a lock. 247 248 Once the limit of concurrent locks is reached, this method will block 249 until the number of locks drops below the limit. 250 */ 251 void lock(ubyte priority = 0) 252 { 253 import std.algorithm.comparison : min; 254 255 if (tryLock()) 256 return; 257 258 ThreadWaiter w; 259 w.priority = priority; 260 w.seq = min(0, m_seq - w.priority); 261 if (++m_seq == uint.max) 262 rewindSeq(); 263 264 () @trusted { m_waiters.insert(w); } (); 265 266 while (true) { 267 m_signal.waitUninterruptible(); 268 if (m_waiters.front.seq == w.seq && tryLock()) { 269 return; 270 } 271 } 272 } 273 274 /** Gives up an existing lock. 275 */ 276 void unlock() 277 { 278 assert(m_locks >= 1); 279 m_locks--; 280 if (m_waiters.length > 0) 281 m_signal.emit(); // resume one 282 } 283 284 // if true, a goes after b. ie. b comes out front() 285 /// private 286 static bool asc(ref ThreadWaiter a, ref ThreadWaiter b) 287 { 288 if (a.priority != b.priority) 289 return a.priority < b.priority; 290 return a.seq > b.seq; 291 } 292 293 private void rewindSeq() 294 @trusted { 295 Array!ThreadWaiter waiters = m_waiters.release(); 296 ushort min_seq; 297 import std.algorithm : min; 298 foreach (ref waiter; waiters[]) 299 min_seq = min(waiter.seq, min_seq); 300 foreach (ref waiter; waiters[]) 301 waiter.seq -= min_seq; 302 m_waiters.assume(waiters); 303 } 304 } 305 306 307 /** 308 Mutex implementation for fibers. 309 310 This mutex type can be used in exchange for a core.sync.mutex.Mutex, but 311 does not block the event loop when contention happens. Note that this 312 mutex does not allow recursive locking. 313 314 Notice: 315 Because this class is annotated nothrow, it cannot be interrupted 316 using $(D vibe.core.task.Task.interrupt()). The corresponding 317 $(D InterruptException) will be deferred until the next blocking 318 operation yields the event loop. 319 320 Use $(D InterruptibleTaskMutex) as an alternative that can be 321 interrupted. 322 323 See_Also: InterruptibleTaskMutex, RecursiveTaskMutex, core.sync.mutex.Mutex 324 */ 325 final class TaskMutex : core.sync.mutex.Mutex, Lockable { 326 @safe: 327 private TaskMutexImpl!false m_impl; 328 329 this(Object o) { m_impl.setup(); super(o); } 330 this() { m_impl.setup(); } 331 332 override bool tryLock() nothrow { return m_impl.tryLock(); } 333 override void lock() nothrow { m_impl.lock(); } 334 override void unlock() nothrow { m_impl.unlock(); } 335 } 336 337 unittest { 338 auto mutex = new TaskMutex; 339 340 { 341 auto lock = scopedMutexLock(mutex); 342 assert(lock.locked); 343 assert(mutex.m_impl.m_locked); 344 345 auto lock2 = scopedMutexLock(mutex, LockMode.tryLock); 346 assert(!lock2.locked); 347 } 348 assert(!mutex.m_impl.m_locked); 349 350 auto lock = scopedMutexLock(mutex, LockMode.tryLock); 351 assert(lock.locked); 352 lock.unlock(); 353 assert(!lock.locked); 354 355 synchronized(mutex){ 356 assert(mutex.m_impl.m_locked); 357 } 358 assert(!mutex.m_impl.m_locked); 359 360 mutex.performLocked!({ 361 assert(mutex.m_impl.m_locked); 362 }); 363 assert(!mutex.m_impl.m_locked); 364 365 static if (__VERSION__ >= 2067) { 366 with(mutex.scopedMutexLock) { 367 assert(mutex.m_impl.m_locked); 368 } 369 } 370 } 371 372 unittest { // test deferred throwing 373 import vibe.core.core; 374 375 auto mutex = new TaskMutex; 376 auto t1 = runTask({ 377 scope (failure) assert(false, "No exception expected in first task!"); 378 mutex.lock(); 379 scope (exit) mutex.unlock(); 380 sleep(20.msecs); 381 }); 382 383 auto t2 = runTask({ 384 mutex.lock(); 385 scope (exit) mutex.unlock(); 386 try { 387 yield(); 388 assert(false, "Yield is supposed to have thrown an InterruptException."); 389 } catch (InterruptException) { 390 // as expected! 391 } catch (Exception) { 392 assert(false, "Only InterruptException supposed to be thrown!"); 393 } 394 }); 395 396 runTask({ 397 // mutex is now locked in first task for 20 ms 398 // the second tasks is waiting in lock() 399 t2.interrupt(); 400 t1.join(); 401 t2.join(); 402 assert(!mutex.m_impl.m_locked); // ensure that the scope(exit) has been executed 403 exitEventLoop(); 404 }); 405 406 runEventLoop(); 407 } 408 409 unittest { 410 runMutexUnitTests!TaskMutex(); 411 } 412 413 414 /** 415 Alternative to $(D TaskMutex) that supports interruption. 416 417 This class supports the use of $(D vibe.core.task.Task.interrupt()) while 418 waiting in the $(D lock()) method. However, because the interface is not 419 $(D nothrow), it cannot be used as an object monitor. 420 421 See_Also: $(D TaskMutex), $(D InterruptibleRecursiveTaskMutex) 422 */ 423 final class InterruptibleTaskMutex : Lockable { 424 @safe: 425 426 private TaskMutexImpl!true m_impl; 427 428 this() 429 { 430 m_impl.setup(); 431 432 // detects invalid usage within synchronized(...) 433 () @trusted { this.__monitor = cast(void*)&NoUseMonitor.instance(); } (); 434 } 435 436 bool tryLock() nothrow { return m_impl.tryLock(); } 437 void lock() { m_impl.lock(); } 438 void unlock() nothrow { m_impl.unlock(); } 439 } 440 441 unittest { 442 runMutexUnitTests!InterruptibleTaskMutex(); 443 } 444 445 446 447 /** 448 Recursive mutex implementation for tasks. 449 450 This mutex type can be used in exchange for a `core.sync.mutex.Mutex`, but 451 does not block the event loop when contention happens. 452 453 Notice: 454 Because this class is annotated `nothrow`, it cannot be interrupted 455 using $(D vibe.core.task.Task.interrupt()). The corresponding 456 $(D InterruptException) will be deferred until the next blocking 457 operation yields the event loop. 458 459 Use $(D InterruptibleRecursiveTaskMutex) as an alternative that can be 460 interrupted. 461 462 See_Also: `TaskMutex`, `core.sync.mutex.Mutex` 463 */ 464 final class RecursiveTaskMutex : core.sync.mutex.Mutex, Lockable { 465 @safe: 466 467 private RecursiveTaskMutexImpl!false m_impl; 468 469 this(Object o) { m_impl.setup(); super(o); } 470 this() { m_impl.setup(); } 471 472 override bool tryLock() { return m_impl.tryLock(); } 473 override void lock() { m_impl.lock(); } 474 override void unlock() { m_impl.unlock(); } 475 } 476 477 unittest { 478 runMutexUnitTests!RecursiveTaskMutex(); 479 } 480 481 482 /** 483 Alternative to $(D RecursiveTaskMutex) that supports interruption. 484 485 This class supports the use of $(D vibe.core.task.Task.interrupt()) while 486 waiting in the $(D lock()) method. However, because the interface is not 487 $(D nothrow), it cannot be used as an object monitor. 488 489 See_Also: $(D RecursiveTaskMutex), $(D InterruptibleTaskMutex) 490 */ 491 final class InterruptibleRecursiveTaskMutex : Lockable { 492 @safe: 493 private RecursiveTaskMutexImpl!true m_impl; 494 495 this() 496 { 497 m_impl.setup(); 498 499 // detects invalid usage within synchronized(...) 500 () @trusted { this.__monitor = cast(void*)&NoUseMonitor.instance(); } (); 501 } 502 503 bool tryLock() { return m_impl.tryLock(); } 504 void lock() { m_impl.lock(); } 505 void unlock() { m_impl.unlock(); } 506 } 507 508 unittest { 509 runMutexUnitTests!InterruptibleRecursiveTaskMutex(); 510 } 511 512 513 // Helper class to ensure that the non Object.Monitor compatible interruptible 514 // mutex classes are not accidentally used with the `synchronized` statement 515 private final class NoUseMonitor : Object.Monitor { 516 private static shared Proxy st_instance; 517 518 static struct Proxy { 519 Object.Monitor monitor; 520 } 521 522 static @property ref shared(Proxy) instance() 523 @safe nothrow { 524 static shared(Proxy)* inst = null; 525 if (inst) return *inst; 526 527 () @trusted { // synchronized {} not @safe for DMD <= 2.078.3 528 synchronized { 529 if (!st_instance.monitor) 530 st_instance.monitor = new shared NoUseMonitor; 531 inst = &st_instance; 532 } 533 } (); 534 535 return *inst; 536 } 537 538 override void lock() @safe @nogc nothrow { 539 assert(false, "Interruptible task mutexes cannot be used with synchronized(), use scopedMutexLock instead."); 540 } 541 542 override void unlock() @safe @nogc nothrow {} 543 } 544 545 546 private void runMutexUnitTests(M)() 547 { 548 import vibe.core.core; 549 550 auto m = new M; 551 Task t1, t2; 552 void runContendedTasks(bool interrupt_t1, bool interrupt_t2) { 553 assert(!m.m_impl.m_locked); 554 555 // t1 starts first and acquires the mutex for 20 ms 556 // t2 starts second and has to wait in m.lock() 557 t1 = runTask({ 558 assert(!m.m_impl.m_locked); 559 m.lock(); 560 assert(m.m_impl.m_locked); 561 if (interrupt_t1) assertThrown!InterruptException(sleep(100.msecs)); 562 else assertNotThrown(sleep(20.msecs)); 563 m.unlock(); 564 }); 565 t2 = runTask({ 566 assert(!m.tryLock()); 567 if (interrupt_t2) { 568 try m.lock(); 569 catch (InterruptException) return; 570 try yield(); // rethrows any deferred exceptions 571 catch (InterruptException) { 572 m.unlock(); 573 return; 574 } 575 assert(false, "Supposed to have thrown an InterruptException."); 576 } else assertNotThrown(m.lock()); 577 assert(m.m_impl.m_locked); 578 sleep(20.msecs); 579 m.unlock(); 580 assert(!m.m_impl.m_locked); 581 }); 582 } 583 584 // basic lock test 585 m.performLocked!({ 586 assert(m.m_impl.m_locked); 587 }); 588 assert(!m.m_impl.m_locked); 589 590 // basic contention test 591 runContendedTasks(false, false); 592 auto t3 = runTask({ 593 assert(t1.running && t2.running); 594 assert(m.m_impl.m_locked); 595 t1.join(); 596 assert(!t1.running && t2.running); 597 yield(); // give t2 a chance to take the lock 598 assert(m.m_impl.m_locked); 599 t2.join(); 600 assert(!t2.running); 601 assert(!m.m_impl.m_locked); 602 exitEventLoop(); 603 }); 604 runEventLoop(); 605 assert(!t3.running); 606 assert(!m.m_impl.m_locked); 607 608 // interruption test #1 609 runContendedTasks(true, false); 610 t3 = runTask({ 611 assert(t1.running && t2.running); 612 assert(m.m_impl.m_locked); 613 t1.interrupt(); 614 t1.join(); 615 assert(!t1.running && t2.running); 616 yield(); // give t2 a chance to take the lock 617 assert(m.m_impl.m_locked); 618 t2.join(); 619 assert(!t2.running); 620 assert(!m.m_impl.m_locked); 621 exitEventLoop(); 622 }); 623 runEventLoop(); 624 assert(!t3.running); 625 assert(!m.m_impl.m_locked); 626 627 // interruption test #2 628 runContendedTasks(false, true); 629 t3 = runTask({ 630 assert(t1.running && t2.running); 631 assert(m.m_impl.m_locked); 632 t2.interrupt(); 633 t2.join(); 634 assert(!t2.running); 635 static if (is(M == InterruptibleTaskMutex) || is (M == InterruptibleRecursiveTaskMutex)) 636 assert(t1.running && m.m_impl.m_locked); 637 t1.join(); 638 assert(!t1.running); 639 assert(!m.m_impl.m_locked); 640 exitEventLoop(); 641 }); 642 runEventLoop(); 643 assert(!t3.running); 644 assert(!m.m_impl.m_locked); 645 } 646 647 648 /** 649 Event loop based condition variable or "event" implementation. 650 651 This class can be used in exchange for a $(D core.sync.condition.Condition) 652 to avoid blocking the event loop when waiting. 653 654 Notice: 655 Because this class is annotated nothrow, it cannot be interrupted 656 using $(D vibe.core.task.Task.interrupt()). The corresponding 657 $(D InterruptException) will be deferred until the next blocking 658 operation yields to the event loop. 659 660 Use $(D InterruptibleTaskCondition) as an alternative that can be 661 interrupted. 662 663 Note that it is generally not safe to use a `TaskCondition` together with an 664 interruptible mutex type. 665 666 See_Also: `InterruptibleTaskCondition` 667 */ 668 final class TaskCondition : core.sync.condition.Condition { 669 @safe: 670 671 private TaskConditionImpl!(false, Mutex) m_impl; 672 673 this(core.sync.mutex.Mutex mtx) 674 { 675 assert(mtx.classinfo is Mutex.classinfo || mtx.classinfo is TaskMutex.classinfo, 676 "TaskCondition can only be used with Mutex or TaskMutex"); 677 678 m_impl.setup(mtx); 679 super(mtx); 680 } 681 override @property Mutex mutex() nothrow { return m_impl.mutex; } 682 override void wait() nothrow { m_impl.wait(); } 683 override bool wait(Duration timeout) nothrow { return m_impl.wait(timeout); } 684 override void notify() nothrow { m_impl.notify(); } 685 override void notifyAll() nothrow { m_impl.notifyAll(); } 686 } 687 688 unittest { 689 new TaskCondition(new Mutex); 690 new TaskCondition(new TaskMutex); 691 } 692 693 694 /** This example shows the typical usage pattern using a `while` loop to make 695 sure that the final condition is reached. 696 */ 697 unittest { 698 import vibe.core.core; 699 import vibe.core.log; 700 701 __gshared Mutex mutex; 702 __gshared TaskCondition condition; 703 __gshared int workers_still_running = 0; 704 705 // setup the task condition 706 mutex = new Mutex; 707 condition = new TaskCondition(mutex); 708 709 logDebug("SETTING UP TASKS"); 710 711 // start up the workers and count how many are running 712 foreach (i; 0 .. 4) { 713 workers_still_running++; 714 runWorkerTask({ 715 // simulate some work 716 sleep(100.msecs); 717 718 // notify the waiter that we're finished 719 synchronized (mutex) { 720 workers_still_running--; 721 logDebug("DECREMENT %s", workers_still_running); 722 } 723 logDebug("NOTIFY"); 724 condition.notify(); 725 }); 726 } 727 728 logDebug("STARTING WAIT LOOP"); 729 730 // wait until all tasks have decremented the counter back to zero 731 synchronized (mutex) { 732 while (workers_still_running > 0) { 733 logDebug("STILL running %s", workers_still_running); 734 condition.wait(); 735 } 736 } 737 } 738 739 740 /** 741 Alternative to `TaskCondition` that supports interruption. 742 743 This class supports the use of `vibe.core.task.Task.interrupt()` while 744 waiting in the `wait()` method. 745 746 See `TaskCondition` for an example. 747 748 Notice: 749 Note that it is generally not safe to use an 750 `InterruptibleTaskCondition` together with an interruptible mutex type. 751 752 See_Also: `TaskCondition` 753 */ 754 final class InterruptibleTaskCondition { 755 @safe: 756 757 private TaskConditionImpl!(true, Lockable) m_impl; 758 759 this(M)(M mutex) 760 if (is(M : Mutex) || is (M : Lockable)) 761 { 762 static if (is(M : Lockable)) 763 m_impl.setup(mutex); 764 else 765 m_impl.setupForMutex(mutex); 766 } 767 768 @property Lockable mutex() { return m_impl.mutex; } 769 void wait() { m_impl.wait(); } 770 bool wait(Duration timeout) { return m_impl.wait(timeout); } 771 void notify() { m_impl.notify(); } 772 void notifyAll() { m_impl.notifyAll(); } 773 } 774 775 unittest { 776 new InterruptibleTaskCondition(new Mutex); 777 new InterruptibleTaskCondition(new TaskMutex); 778 new InterruptibleTaskCondition(new InterruptibleTaskMutex); 779 } 780 781 782 /** A manually triggered single threaded cross-task event. 783 784 Note: the ownership can be shared between multiple fibers of the same thread. 785 */ 786 struct LocalManualEvent { 787 import core.thread : Thread; 788 import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny; 789 790 @safe: 791 792 private { 793 alias Waiter = ThreadLocalWaiter!false; 794 795 Waiter m_waiter; 796 } 797 798 // thread destructor in vibe.core.core will decrement the ref. count 799 package static EventID ms_threadEvent; 800 801 private void initialize() 802 nothrow { 803 import vibe.internal.allocator : Mallocator, makeGCSafe; 804 m_waiter = () @trusted { return Mallocator.instance.makeGCSafe!Waiter; } (); 805 } 806 807 this(this) 808 { 809 if (m_waiter) 810 return m_waiter.addRef(); 811 } 812 813 ~this() 814 nothrow { 815 import vibe.internal.allocator : Mallocator, disposeGCSafe; 816 if (m_waiter) { 817 if (!m_waiter.releaseRef()) { 818 static if (__VERSION__ < 2087) scope (failure) assert(false); 819 () @trusted { Mallocator.instance.disposeGCSafe(m_waiter); } (); 820 } 821 } 822 } 823 824 bool opCast() const nothrow { return m_waiter !is null; } 825 826 /// A counter that is increased with every emit() call 827 int emitCount() const nothrow { return m_waiter.m_emitCount; } 828 829 /// Emits the signal, waking up all owners of the signal. 830 int emit() 831 nothrow { 832 assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()"); 833 logTrace("unshared emit"); 834 auto ec = m_waiter.m_emitCount++; 835 m_waiter.emit(); 836 return ec; 837 } 838 839 /// Emits the signal, waking up a single owners of the signal. 840 int emitSingle() 841 nothrow { 842 assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()"); 843 logTrace("unshared single emit"); 844 auto ec = m_waiter.m_emitCount++; 845 m_waiter.emitSingle(); 846 return ec; 847 } 848 849 /** Acquires ownership and waits until the signal is emitted. 850 851 Note that in order not to miss any emits it is necessary to use the 852 overload taking an integer. 853 854 Throws: 855 May throw an $(D InterruptException) if the task gets interrupted 856 using $(D Task.interrupt()). 857 */ 858 int wait() { return wait(this.emitCount); } 859 860 /** Acquires ownership and waits until the signal is emitted and the emit 861 count is larger than a given one. 862 863 Throws: 864 May throw an $(D InterruptException) if the task gets interrupted 865 using $(D Task.interrupt()). 866 */ 867 int wait(int emit_count) { return doWait!true(Duration.max, emit_count); } 868 /// ditto 869 int wait(Duration timeout, int emit_count) { return doWait!true(timeout, emit_count); } 870 871 /** Same as $(D wait), but defers throwing any $(D InterruptException). 872 873 This method is annotated $(D nothrow) at the expense that it cannot be 874 interrupted. 875 */ 876 int waitUninterruptible() nothrow { return waitUninterruptible(this.emitCount); } 877 /// ditto 878 int waitUninterruptible(int emit_count) nothrow { return doWait!false(Duration.max, emit_count); } 879 /// ditto 880 int waitUninterruptible(Duration timeout, int emit_count) nothrow { return doWait!false(timeout, emit_count); } 881 882 private int doWait(bool interruptible)(Duration timeout, int emit_count) 883 { 884 import core.time : MonoTime; 885 886 assert(m_waiter !is null, "LocalManualEvent is not initialized - use createManualEvent()"); 887 888 MonoTime target_timeout, now; 889 if (timeout != Duration.max) { 890 try now = MonoTime.currTime(); 891 catch (Exception e) { assert(false, e.msg); } 892 target_timeout = now + timeout; 893 } 894 895 while (m_waiter.m_emitCount - emit_count <= 0) { 896 m_waiter.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max); 897 try now = MonoTime.currTime(); 898 catch (Exception e) { assert(false, e.msg); } 899 if (now >= target_timeout) break; 900 } 901 902 return m_waiter.m_emitCount; 903 } 904 } 905 906 unittest { 907 import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep; 908 909 auto e = createManualEvent(); 910 auto w1 = runTask({ e.wait(100.msecs, e.emitCount); }); 911 auto w2 = runTask({ e.wait(500.msecs, e.emitCount); }); 912 runTask({ 913 sleep(50.msecs); 914 e.emit(); 915 sleep(50.msecs); 916 assert(!w1.running && !w2.running); 917 exitEventLoop(); 918 }); 919 runEventLoop(); 920 } 921 922 unittest { 923 import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep; 924 auto e = createManualEvent(); 925 // integer overflow test 926 e.m_waiter.m_emitCount = int.max; 927 auto w1 = runTask({ e.wait(50.msecs, e.emitCount); }); 928 runTask({ 929 sleep(5.msecs); 930 e.emit(); 931 sleep(50.msecs); 932 assert(!w1.running); 933 exitEventLoop(); 934 }); 935 runEventLoop(); 936 } 937 938 unittest { // ensure that cancelled waiters are properly handled and that a FIFO order is implemented 939 import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep; 940 941 LocalManualEvent l = createManualEvent(); 942 943 Task t2; 944 runTask({ 945 l.wait(); 946 t2.interrupt(); 947 sleep(20.msecs); 948 exitEventLoop(); 949 }); 950 t2 = runTask({ 951 try { 952 l.wait(); 953 assert(false, "Shouldn't reach this."); 954 } catch (InterruptException e) {} 955 }); 956 runTask({ 957 l.emit(); 958 }); 959 runEventLoop(); 960 } 961 962 unittest { // ensure that LocalManualEvent behaves correctly after being copied 963 import vibe.core.core : exitEventLoop, runEventLoop, runTask, sleep; 964 965 LocalManualEvent l = createManualEvent(); 966 runTask({ 967 auto lc = l; 968 sleep(100.msecs); 969 lc.emit(); 970 }); 971 runTask({ 972 assert(l.wait(1.seconds, l.emitCount)); 973 exitEventLoop(); 974 }); 975 runEventLoop(); 976 } 977 978 979 /** A manually triggered multi threaded cross-task event. 980 981 Note: the ownership can be shared between multiple fibers and threads. 982 */ 983 struct ManualEvent { 984 import core.thread : Thread; 985 import vibe.internal.async : Waitable, asyncAwait, asyncAwaitUninterruptible, asyncAwaitAny; 986 import vibe.internal.list : StackSList; 987 988 @safe: 989 990 private { 991 alias ThreadWaiter = ThreadLocalWaiter!true; 992 993 int m_emitCount; 994 static struct Waiters { 995 StackSList!ThreadWaiter active; // actively waiting 996 StackSList!ThreadWaiter free; // free-list of reusable waiter structs 997 } 998 Monitor!(Waiters, shared(Mutex)) m_waiters; 999 } 1000 1001 // thread destructor in vibe.core.core will decrement the ref. count 1002 package static EventID ms_threadEvent; 1003 1004 enum EmitMode { 1005 single, 1006 all 1007 } 1008 1009 @disable this(this); 1010 1011 private void initialize() 1012 shared nothrow { 1013 m_waiters = createMonitor!(ManualEvent.Waiters)(new shared Mutex); 1014 } 1015 1016 deprecated("ManualEvent is always non-null!") 1017 bool opCast() const shared nothrow { return true; } 1018 1019 /// A counter that is increased with every emit() call 1020 int emitCount() const shared nothrow @trusted { return atomicLoad(m_emitCount); } 1021 1022 /// Emits the signal, waking up all owners of the signal. 1023 int emit() 1024 shared nothrow @trusted { 1025 import core.atomic : atomicOp, cas; 1026 1027 () @trusted { logTrace("emit shared %s", cast(void*)&this); } (); 1028 1029 auto ec = atomicOp!"+="(m_emitCount, 1); 1030 auto thisthr = Thread.getThis(); 1031 1032 ThreadWaiter lw; 1033 auto drv = eventDriver; 1034 m_waiters.lock.active.filter((ThreadWaiter w) { 1035 () @trusted { logTrace("waiter %s", cast(void*)w); } (); 1036 if (w.m_driver is drv) { 1037 lw = w; 1038 lw.addRef(); 1039 } else { 1040 try { 1041 assert(w.m_event != EventID.init); 1042 () @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true); 1043 } catch (Exception e) assert(false, e.msg); 1044 } 1045 return true; 1046 }); 1047 () @trusted { logTrace("lw %s", cast(void*)lw); } (); 1048 if (lw) { 1049 lw.emit(); 1050 releaseWaiter(lw); 1051 } 1052 1053 logTrace("emit shared done"); 1054 1055 return ec; 1056 } 1057 1058 /// Emits the signal, waking up at least one waiting task 1059 int emitSingle() 1060 shared nothrow @trusted { 1061 import core.atomic : atomicOp, cas; 1062 1063 () @trusted { logTrace("emit shared single %s", cast(void*)&this); } (); 1064 1065 auto ec = atomicOp!"+="(m_emitCount, 1); 1066 auto thisthr = Thread.getThis(); 1067 1068 ThreadWaiter lw; 1069 auto drv = eventDriver; 1070 m_waiters.lock.active.iterate((ThreadWaiter w) { 1071 () @trusted { logTrace("waiter %s", cast(void*)w); } (); 1072 if (w.m_driver is drv) { 1073 if (w.unused) return true; 1074 lw = w; 1075 lw.addRef(); 1076 } else { 1077 try { 1078 assert(w.m_event != EventID.invalid); 1079 () @trusted { return cast(shared)w.m_driver; } ().events.trigger(w.m_event, true); 1080 } catch (Exception e) assert(false, e.msg); 1081 } 1082 return false; 1083 }); 1084 () @trusted { logTrace("lw %s", cast(void*)lw); } (); 1085 if (lw) { 1086 lw.emitSingle(); 1087 releaseWaiter(lw); 1088 } 1089 1090 logTrace("emit shared done"); 1091 1092 return ec; 1093 } 1094 1095 /** Acquires ownership and waits until the signal is emitted. 1096 1097 Note that in order not to miss any emits it is necessary to use the 1098 overload taking an integer. 1099 1100 Throws: 1101 May throw an $(D InterruptException) if the task gets interrupted 1102 using $(D Task.interrupt()). 1103 */ 1104 int wait() shared { return wait(this.emitCount); } 1105 1106 /** Acquires ownership and waits until the emit count differs from the 1107 given one or until a timeout is reached. 1108 1109 Throws: 1110 May throw an $(D InterruptException) if the task gets interrupted 1111 using $(D Task.interrupt()). 1112 */ 1113 int wait(int emit_count) shared { return doWaitShared!true(Duration.max, emit_count); } 1114 /// ditto 1115 int wait(Duration timeout, int emit_count) shared { return doWaitShared!true(timeout, emit_count); } 1116 1117 /** Same as $(D wait), but defers throwing any $(D InterruptException). 1118 1119 This method is annotated $(D nothrow) at the expense that it cannot be 1120 interrupted. 1121 */ 1122 int waitUninterruptible() shared nothrow { return waitUninterruptible(this.emitCount); } 1123 /// ditto 1124 int waitUninterruptible(int emit_count) shared nothrow { return doWaitShared!false(Duration.max, emit_count); } 1125 /// ditto 1126 int waitUninterruptible(Duration timeout, int emit_count) shared nothrow { return doWaitShared!false(timeout, emit_count); } 1127 1128 private int doWaitShared(bool interruptible)(Duration timeout, int emit_count) 1129 shared { 1130 import core.time : MonoTime; 1131 1132 () @trusted { logTrace("wait shared %s", cast(void*)&this); } (); 1133 1134 if (ms_threadEvent is EventID.invalid) { 1135 ms_threadEvent = eventDriver.events.create(); 1136 assert(ms_threadEvent != EventID.invalid, "Failed to create event!"); 1137 } 1138 1139 MonoTime target_timeout, now; 1140 if (timeout != Duration.max) { 1141 try now = MonoTime.currTime(); 1142 catch (Exception e) { assert(false, e.msg); } 1143 target_timeout = now + timeout; 1144 } 1145 1146 int ec = this.emitCount; 1147 1148 acquireThreadWaiter((scope ThreadWaiter w) { 1149 while (ec - emit_count <= 0) { 1150 w.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, ms_threadEvent, () => (this.emitCount - emit_count) > 0); 1151 ec = this.emitCount; 1152 1153 if (timeout != Duration.max) { 1154 try now = MonoTime.currTime(); 1155 catch (Exception e) { assert(false, e.msg); } 1156 if (now >= target_timeout) break; 1157 } 1158 } 1159 }); 1160 1161 return ec; 1162 } 1163 1164 private void acquireThreadWaiter(DEL)(scope DEL del) 1165 shared { 1166 import vibe.internal.allocator : processAllocator, makeGCSafe; 1167 1168 ThreadWaiter w; 1169 auto drv = eventDriver; 1170 1171 with (m_waiters.lock) { 1172 active.iterate((aw) { 1173 if (aw.m_driver is drv) { 1174 w = aw; 1175 w.addRef(); 1176 return false; 1177 } 1178 return true; 1179 }); 1180 1181 if (!w) { 1182 free.filter((fw) { 1183 if (fw.m_driver is drv) { 1184 w = fw; 1185 w.addRef(); 1186 return false; 1187 } 1188 return true; 1189 }); 1190 1191 if (!w) { 1192 () @trusted { 1193 try { 1194 w = processAllocator.makeGCSafe!ThreadWaiter; 1195 w.m_driver = drv; 1196 w.m_event = ms_threadEvent; 1197 } catch (Exception e) { 1198 assert(false, "Failed to allocate thread waiter."); 1199 } 1200 } (); 1201 } 1202 1203 assert(w.m_refCount == 1); 1204 active.add(w); 1205 } 1206 } 1207 1208 scope (exit) releaseWaiter(w); 1209 1210 del(w); 1211 } 1212 1213 private void releaseWaiter(ThreadWaiter w) 1214 shared nothrow { 1215 if (!w.releaseRef()) { 1216 assert(w.m_driver is eventDriver, "Waiter was reassigned a different driver!?"); 1217 assert(w.unused, "Waiter still used, but not referenced!?"); 1218 with (m_waiters.lock) { 1219 auto rmvd = active.remove(w); 1220 assert(rmvd, "Waiter not in active queue anymore!?"); 1221 free.add(w); 1222 // TODO: cap size of m_freeWaiters 1223 } 1224 } 1225 } 1226 } 1227 1228 unittest { 1229 import vibe.core.core : exitEventLoop, runEventLoop, runTask, runWorkerTaskH, sleep; 1230 1231 auto e = createSharedManualEvent(); 1232 auto w1 = runTask({ e.wait(100.msecs, e.emitCount); }); 1233 static void w(shared(ManualEvent)* e) { e.wait(500.msecs, e.emitCount); } 1234 auto w2 = runWorkerTaskH(&w, &e); 1235 runTask({ 1236 sleep(50.msecs); 1237 e.emit(); 1238 sleep(50.msecs); 1239 assert(!w1.running && !w2.running); 1240 exitEventLoop(); 1241 }); 1242 runEventLoop(); 1243 } 1244 1245 unittest { 1246 import vibe.core.core : runTask, runWorkerTaskH, setTimer, sleep; 1247 import vibe.core.taskpool : TaskPool; 1248 import core.time : msecs, usecs; 1249 import std.concurrency : send, receiveOnly; 1250 import std.random : uniform; 1251 1252 auto tpool = new shared TaskPool(4); 1253 scope (exit) tpool.terminate(); 1254 1255 static void test(shared(ManualEvent)* evt, Task owner) 1256 { 1257 owner.tid.send(Task.getThis()); 1258 1259 int ec = evt.emitCount; 1260 auto thist = Task.getThis(); 1261 auto tm = setTimer(500.msecs, { thist.interrupt(); }); // watchdog 1262 scope (exit) tm.stop(); 1263 while (ec < 5_000) { 1264 tm.rearm(500.msecs); 1265 sleep(uniform(0, 10_000).usecs); 1266 if (uniform(0, 10) == 0) evt.emit(); 1267 auto ecn = evt.wait(ec); 1268 assert(ecn > ec); 1269 ec = ecn; 1270 } 1271 } 1272 1273 auto watchdog = setTimer(30.seconds, { assert(false, "ManualEvent test has hung."); }); 1274 scope (exit) watchdog.stop(); 1275 1276 auto e = createSharedManualEvent(); 1277 Task[] tasks; 1278 1279 runTask({ 1280 auto thist = Task.getThis(); 1281 1282 // start 25 tasks in each thread 1283 foreach (i; 0 .. 25) tpool.runTaskDist(&test, &e, thist); 1284 // collect all task handles 1285 foreach (i; 0 .. 4*25) tasks ~= receiveOnly!Task; 1286 1287 auto tm = setTimer(500.msecs, { thist.interrupt(); }); // watchdog 1288 scope (exit) tm.stop(); 1289 int pec = 0; 1290 while (e.emitCount < 5_000) { 1291 tm.rearm(500.msecs); 1292 sleep(50.usecs); 1293 e.emit(); 1294 } 1295 1296 // wait for all worker tasks to finish 1297 foreach (t; tasks) t.join(); 1298 }).join(); 1299 } 1300 1301 package shared struct Monitor(T, M) 1302 { 1303 alias Mutex = M; 1304 alias Data = T; 1305 private { 1306 Mutex mutex; 1307 Data data; 1308 } 1309 1310 static struct Locked { 1311 shared(Monitor)* m; 1312 @disable this(this); 1313 ~this() { 1314 () @trusted { 1315 static if (is(typeof(Mutex.init.unlock_nothrow()))) 1316 (cast(Mutex)m.mutex).unlock_nothrow(); 1317 else (cast(Mutex)m.mutex).unlock(); 1318 } (); 1319 } 1320 ref inout(Data) get() inout @trusted { return *cast(inout(Data)*)&m.data; } 1321 alias get this; 1322 } 1323 1324 Locked lock() { 1325 () @trusted { 1326 static if (is(typeof(Mutex.init.lock_nothrow()))) 1327 (cast(Mutex)mutex).lock_nothrow(); 1328 else (cast(Mutex)mutex).lock(); 1329 } (); 1330 return Locked(() @trusted { return &this; } ()); 1331 } 1332 1333 const(Locked) lock() const { 1334 () @trusted { 1335 static if (is(typeof(Mutex.init.lock_nothrow()))) 1336 (cast(Mutex)mutex).lock_nothrow(); 1337 else (cast(Mutex)mutex).lock(); 1338 } (); 1339 return const(Locked)(() @trusted { return &this; } ()); 1340 } 1341 } 1342 1343 1344 package shared(Monitor!(T, M)) createMonitor(T, M)(M mutex) 1345 @trusted { 1346 shared(Monitor!(T, M)) ret; 1347 ret.mutex = cast(shared)mutex; 1348 return ret; 1349 } 1350 1351 1352 private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) { 1353 import vibe.internal.list : CircularDList; 1354 1355 private { 1356 static struct TaskWaiter { 1357 TaskWaiter* prev, next; 1358 void delegate() @safe nothrow notifier; 1359 1360 void wait(void delegate() @safe nothrow del) @safe nothrow { 1361 assert(notifier is null, "Local waiter is used twice!"); 1362 notifier = del; 1363 } 1364 void cancel() @safe nothrow { notifier = null; } 1365 void emit() @safe nothrow { auto n = notifier; notifier = null; n(); } 1366 } 1367 1368 static if (EVENT_TRIGGERED) { 1369 package(vibe) ThreadLocalWaiter next; // queue of other waiters of the same thread 1370 NativeEventDriver m_driver; 1371 EventID m_event = EventID.invalid; 1372 } else { 1373 int m_emitCount = 0; 1374 } 1375 int m_refCount = 1; 1376 TaskWaiter m_pivot; 1377 TaskWaiter m_emitPivot; 1378 CircularDList!(TaskWaiter*) m_waiters; 1379 } 1380 1381 this() 1382 { 1383 m_waiters = CircularDList!(TaskWaiter*)(() @trusted { return &m_pivot; } ()); 1384 } 1385 1386 static if (EVENT_TRIGGERED) { 1387 ~this() 1388 { 1389 import vibe.core.internal.release : releaseHandle; 1390 1391 if (m_event != EventID.invalid) 1392 releaseHandle!"events"(m_event, () @trusted { return cast(shared)m_driver; } ()); 1393 } 1394 } 1395 1396 @property bool unused() const @safe nothrow { return m_waiters.empty; } 1397 1398 void addRef() @safe nothrow { assert(m_refCount >= 0); m_refCount++; } 1399 bool releaseRef() @safe nothrow { assert(m_refCount > 0); return --m_refCount > 0; } 1400 1401 bool wait(bool interruptible)(Duration timeout, EventID evt = EventID.invalid, scope bool delegate() @safe nothrow exit_condition = null) 1402 @safe { 1403 import core.time : MonoTime; 1404 import vibe.internal.async : Waitable, asyncAwaitAny; 1405 1406 TaskWaiter waiter_store; 1407 TaskWaiter* waiter = () @trusted { return &waiter_store; } (); 1408 1409 m_waiters.insertBack(waiter); 1410 assert(waiter.next !is null); 1411 scope (exit) 1412 if (waiter.next !is null) { 1413 m_waiters.remove(waiter); 1414 assert(!waiter.next); 1415 } 1416 1417 MonoTime target_timeout, now; 1418 if (timeout != Duration.max) { 1419 try now = MonoTime.currTime(); 1420 catch (Exception e) { assert(false, e.msg); } 1421 target_timeout = now + timeout; 1422 } 1423 1424 bool cancelled; 1425 1426 alias waitable = Waitable!(typeof(TaskWaiter.notifier), 1427 (cb) { waiter.wait(cb); }, 1428 (cb) { cancelled = true; waiter.cancel(); }, 1429 () {} 1430 ); 1431 1432 alias ewaitable = Waitable!(EventCallback, 1433 (cb) { 1434 eventDriver.events.wait(evt, cb); 1435 // check for exit condition *after* starting to wait for the event 1436 // to avoid a race condition 1437 if (exit_condition()) { 1438 eventDriver.events.cancelWait(evt, cb); 1439 cb(evt); 1440 } 1441 }, 1442 (cb) { eventDriver.events.cancelWait(evt, cb); }, 1443 (EventID) {} 1444 ); 1445 1446 if (evt != EventID.invalid) { 1447 asyncAwaitAny!(interruptible, waitable, ewaitable)(timeout); 1448 } else { 1449 asyncAwaitAny!(interruptible, waitable)(timeout); 1450 } 1451 1452 if (cancelled) { 1453 assert(waiter.next !is null, "Cancelled waiter not in queue anymore!?"); 1454 return false; 1455 } else { 1456 assert(waiter.next is null, "Triggered waiter still in queue!?"); 1457 return true; 1458 } 1459 } 1460 1461 void emit() 1462 @safe nothrow { 1463 import std.algorithm.mutation : swap; 1464 import vibe.core.core : yield; 1465 1466 if (m_waiters.empty) return; 1467 1468 TaskWaiter* pivot = () @trusted { return &m_emitPivot; } (); 1469 1470 if (pivot.next) { // another emit in progress? 1471 // shift pivot to the end, so that the other emit call will process all pending waiters 1472 if (pivot !is m_waiters.back) { 1473 m_waiters.remove(pivot); 1474 m_waiters.insertBack(pivot); 1475 } 1476 return; 1477 } 1478 1479 m_waiters.insertBack(pivot); 1480 scope (exit) m_waiters.remove(pivot); 1481 1482 foreach (w; m_waiters) { 1483 if (w is pivot) break; 1484 emitWaiter(w); 1485 } 1486 } 1487 1488 bool emitSingle() 1489 @safe nothrow { 1490 if (m_waiters.empty) return false; 1491 1492 TaskWaiter* pivot = () @trusted { return &m_emitPivot; } (); 1493 1494 if (pivot.next) { // another emit in progress? 1495 // shift pivot to the right, so that the other emit call will process another waiter 1496 if (pivot !is m_waiters.back) { 1497 auto n = pivot.next; 1498 m_waiters.remove(pivot); 1499 m_waiters.insertAfter(pivot, n); 1500 } 1501 return true; 1502 } 1503 1504 emitWaiter(m_waiters.front); 1505 return true; 1506 } 1507 1508 private void emitWaiter(TaskWaiter* w) 1509 @safe nothrow { 1510 m_waiters.remove(w); 1511 1512 if (w.notifier !is null) { 1513 logTrace("notify task %s %s %s", cast(void*)w, () @trusted { return cast(void*)w.notifier.funcptr; } (), w.notifier.ptr); 1514 w.emit(); 1515 } else logTrace("notify callback is null"); 1516 } 1517 } 1518 1519 private struct TaskMutexImpl(bool INTERRUPTIBLE) { 1520 private { 1521 shared(bool) m_locked = false; 1522 shared(uint) m_waiters = 0; 1523 shared(ManualEvent) m_signal; 1524 debug Task m_owner; 1525 } 1526 1527 void setup() 1528 { 1529 m_signal.initialize(); 1530 } 1531 1532 @trusted bool tryLock() 1533 { 1534 if (cas(&m_locked, false, true)) { 1535 debug m_owner = Task.getThis(); 1536 debug(VibeMutexLog) logTrace("mutex %s lock %s", cast(void*)&this, atomicLoad(m_waiters)); 1537 return true; 1538 } 1539 return false; 1540 } 1541 1542 @trusted void lock() 1543 { 1544 if (tryLock()) return; 1545 debug assert(m_owner == Task() || m_owner != Task.getThis(), "Recursive mutex lock."); 1546 atomicOp!"+="(m_waiters, 1); 1547 debug(VibeMutexLog) logTrace("mutex %s wait %s", cast(void*)&this, atomicLoad(m_waiters)); 1548 scope(exit) atomicOp!"-="(m_waiters, 1); 1549 auto ecnt = m_signal.emitCount(); 1550 while (!tryLock()) { 1551 static if (INTERRUPTIBLE) ecnt = m_signal.wait(ecnt); 1552 else ecnt = m_signal.waitUninterruptible(ecnt); 1553 } 1554 } 1555 1556 @trusted void unlock() 1557 { 1558 assert(m_locked); 1559 debug { 1560 assert(m_owner == Task.getThis()); 1561 m_owner = Task(); 1562 } 1563 atomicStore!(MemoryOrder.rel)(m_locked, false); 1564 debug(VibeMutexLog) logTrace("mutex %s unlock %s", cast(void*)&this, atomicLoad(m_waiters)); 1565 if (atomicLoad(m_waiters) > 0) 1566 m_signal.emit(); 1567 } 1568 } 1569 1570 private struct RecursiveTaskMutexImpl(bool INTERRUPTIBLE) { 1571 import std.stdio; 1572 private { 1573 core.sync.mutex.Mutex m_mutex; 1574 Task m_owner; 1575 size_t m_recCount = 0; 1576 shared(uint) m_waiters = 0; 1577 shared(ManualEvent) m_signal; 1578 @property bool m_locked() const { return m_recCount > 0; } 1579 } 1580 1581 void setup() 1582 { 1583 m_mutex = new core.sync.mutex.Mutex; 1584 m_signal.initialize(); 1585 } 1586 1587 @trusted bool tryLock() 1588 { 1589 auto self = Task.getThis(); 1590 return m_mutex.performLocked!({ 1591 if (!m_owner) { 1592 assert(m_recCount == 0); 1593 m_recCount = 1; 1594 m_owner = self; 1595 return true; 1596 } else if (m_owner == self) { 1597 m_recCount++; 1598 return true; 1599 } 1600 return false; 1601 }); 1602 } 1603 1604 @trusted void lock() 1605 { 1606 if (tryLock()) return; 1607 atomicOp!"+="(m_waiters, 1); 1608 debug(VibeMutexLog) logTrace("mutex %s wait %s", cast(void*)&this, atomicLoad(m_waiters)); 1609 scope(exit) atomicOp!"-="(m_waiters, 1); 1610 auto ecnt = m_signal.emitCount(); 1611 while (!tryLock()) { 1612 static if (INTERRUPTIBLE) ecnt = m_signal.wait(ecnt); 1613 else ecnt = m_signal.waitUninterruptible(ecnt); 1614 } 1615 } 1616 1617 @trusted void unlock() 1618 { 1619 auto self = Task.getThis(); 1620 m_mutex.performLocked!({ 1621 assert(m_owner == self); 1622 assert(m_recCount > 0); 1623 m_recCount--; 1624 if (m_recCount == 0) { 1625 m_owner = Task.init; 1626 } 1627 }); 1628 debug(VibeMutexLog) logTrace("mutex %s unlock %s", cast(void*)&this, atomicLoad(m_waiters)); 1629 if (atomicLoad(m_waiters) > 0) 1630 m_signal.emit(); 1631 } 1632 } 1633 1634 private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) { 1635 private { 1636 LOCKABLE m_mutex; 1637 static if (is(LOCKABLE == Mutex)) 1638 TaskMutex m_taskMutex; 1639 shared(ManualEvent) m_signal; 1640 } 1641 1642 static if (is(LOCKABLE == Lockable)) { 1643 final class MutexWrapper : Lockable { 1644 private core.sync.mutex.Mutex m_mutex; 1645 this(core.sync.mutex.Mutex mtx) { m_mutex = mtx; } 1646 @trusted void lock() { m_mutex.lock(); } 1647 @trusted void unlock() { m_mutex.unlock(); } 1648 @trusted bool tryLock() { return m_mutex.tryLock(); } 1649 } 1650 1651 void setupForMutex(core.sync.mutex.Mutex mtx) 1652 { 1653 setup(new MutexWrapper(mtx)); 1654 } 1655 } 1656 1657 void setup(LOCKABLE mtx) 1658 { 1659 m_mutex = mtx; 1660 static if (is(typeof(m_taskMutex))) 1661 m_taskMutex = cast(TaskMutex)mtx; 1662 m_signal.initialize(); 1663 } 1664 1665 @property LOCKABLE mutex() { return m_mutex; } 1666 1667 @trusted void wait() 1668 { 1669 if (auto tm = cast(TaskMutex)m_mutex) { 1670 assert(tm.m_impl.m_locked); 1671 debug assert(tm.m_impl.m_owner == Task.getThis()); 1672 } 1673 1674 auto refcount = m_signal.emitCount; 1675 1676 static if (is(LOCKABLE == Mutex)) { 1677 if (m_taskMutex) m_taskMutex.unlock(); 1678 else m_mutex.unlock_nothrow(); 1679 } else m_mutex.unlock(); 1680 1681 scope(exit) { 1682 static if (is(LOCKABLE == Mutex)) { 1683 if (m_taskMutex) m_taskMutex.lock(); 1684 else m_mutex.lock_nothrow(); 1685 } else m_mutex.lock(); 1686 } 1687 static if (INTERRUPTIBLE) m_signal.wait(refcount); 1688 else m_signal.waitUninterruptible(refcount); 1689 } 1690 1691 @trusted bool wait(Duration timeout) 1692 { 1693 assert(!timeout.isNegative()); 1694 if (auto tm = cast(TaskMutex)m_mutex) { 1695 assert(tm.m_impl.m_locked); 1696 debug assert(tm.m_impl.m_owner == Task.getThis()); 1697 } 1698 1699 auto refcount = m_signal.emitCount; 1700 1701 static if (is(LOCKABLE == Mutex)) { 1702 if (m_taskMutex) m_taskMutex.unlock(); 1703 else m_mutex.unlock_nothrow(); 1704 } else m_mutex.unlock(); 1705 1706 scope(exit) { 1707 static if (is(LOCKABLE == Mutex)) { 1708 if (m_taskMutex) m_taskMutex.lock(); 1709 else m_mutex.lock_nothrow(); 1710 } else m_mutex.lock(); 1711 } 1712 1713 static if (INTERRUPTIBLE) return m_signal.wait(timeout, refcount) != refcount; 1714 else return m_signal.waitUninterruptible(timeout, refcount) != refcount; 1715 } 1716 1717 @trusted void notify() 1718 { 1719 m_signal.emit(); 1720 } 1721 1722 @trusted void notifyAll() 1723 { 1724 m_signal.emit(); 1725 } 1726 } 1727 1728 /** Contains the shared state of a $(D TaskReadWriteMutex). 1729 * 1730 * Since a $(D TaskReadWriteMutex) consists of two actual Mutex 1731 * objects that rely on common memory, this class implements 1732 * the actual functionality of their method calls. 1733 * 1734 * The method implementations are based on two static parameters 1735 * ($(D INTERRUPTIBLE) and $(D INTENT)), which are configured through 1736 * template arguments: 1737 * 1738 * - $(D INTERRUPTIBLE) determines whether the mutex implementation 1739 * are interruptible by vibe.d's $(D vibe.core.task.Task.interrupt()) 1740 * method or not. 1741 * 1742 * - $(D INTENT) describes the intent, with which a locking operation is 1743 * performed (i.e. $(D READ_ONLY) or $(D READ_WRITE)). RO locking allows for 1744 * multiple Tasks holding the mutex, whereas RW locking will cause 1745 * a "bottleneck" so that only one Task can write to the protected 1746 * data at once. 1747 */ 1748 private struct ReadWriteMutexState(bool INTERRUPTIBLE) 1749 { 1750 /** The policy with which the mutex should operate. 1751 * 1752 * The policy determines how the acquisition of the locks is 1753 * performed and can be used to tune the mutex according to the 1754 * underlying algorithm in which it is used. 1755 * 1756 * According to the provided policy, the mutex will either favor 1757 * reading or writing tasks and could potentially starve the 1758 * respective opposite. 1759 * 1760 * cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy) 1761 */ 1762 enum Policy : int 1763 { 1764 /** Readers are prioritized, writers may be starved as a result. */ 1765 PREFER_READERS = 0, 1766 /** Writers are prioritized, readers may be starved as a result. */ 1767 PREFER_WRITERS 1768 } 1769 1770 /** The intent with which a locking operation is performed. 1771 * 1772 * Since both locks share the same underlying algorithms, the actual 1773 * intent with which a lock operation is performed (i.e read/write) 1774 * are passed as a template parameter to each method. 1775 */ 1776 enum LockingIntent : bool 1777 { 1778 /** Perform a read lock/unlock operation. Multiple reading locks can be 1779 * active at a time. */ 1780 READ_ONLY = 0, 1781 /** Perform a write lock/unlock operation. Only a single writer can 1782 * hold a lock at any given time. */ 1783 READ_WRITE = 1 1784 } 1785 1786 private { 1787 //Queue counters 1788 /** The number of reading tasks waiting for the lock to become available. */ 1789 shared(uint) m_waitingForReadLock = 0; 1790 /** The number of writing tasks waiting for the lock to become available. */ 1791 shared(uint) m_waitingForWriteLock = 0; 1792 1793 //Lock counters 1794 /** The number of reading tasks that currently hold the lock. */ 1795 uint m_activeReadLocks = 0; 1796 /** The number of writing tasks that currently hold the lock (binary). */ 1797 ubyte m_activeWriteLocks = 0; 1798 1799 /** The policy determining the lock's behavior. */ 1800 Policy m_policy; 1801 1802 //Queue Events 1803 /** The event used to wake reading tasks waiting for the lock while it is blocked. */ 1804 shared(ManualEvent) m_readyForReadLock; 1805 /** The event used to wake writing tasks waiting for the lock while it is blocked. */ 1806 shared(ManualEvent) m_readyForWriteLock; 1807 1808 /** The underlying mutex that gates the access to the shared state. */ 1809 Mutex m_counterMutex; 1810 } 1811 1812 this(Policy policy) 1813 { 1814 m_policy = policy; 1815 m_counterMutex = new Mutex(); 1816 m_readyForReadLock = createSharedManualEvent(); 1817 m_readyForWriteLock = createSharedManualEvent(); 1818 } 1819 1820 @disable this(this); 1821 1822 /** The policy with which the lock has been created. */ 1823 @property policy() const { return m_policy; } 1824 1825 version(RWMutexPrint) 1826 { 1827 /** Print out debug information during lock operations. */ 1828 void printInfo(string OP, LockingIntent INTENT)() nothrow 1829 { 1830 import std..string; 1831 try 1832 { 1833 import std.stdio; 1834 writefln("RWMutex: %s (%s), active: RO: %d, RW: %d; waiting: RO: %d, RW: %d", 1835 OP.leftJustify(10,' '), 1836 INTENT == LockingIntent.READ_ONLY ? "RO" : "RW", 1837 m_activeReadLocks, m_activeWriteLocks, 1838 m_waitingForReadLock, m_waitingForWriteLock 1839 ); 1840 } 1841 catch (Throwable t){} 1842 } 1843 } 1844 1845 /** An internal shortcut method to determine the queue event for a given intent. */ 1846 @property ref auto queueEvent(LockingIntent INTENT)() 1847 { 1848 static if (INTENT == LockingIntent.READ_ONLY) 1849 return m_readyForReadLock; 1850 else 1851 return m_readyForWriteLock; 1852 } 1853 1854 /** An internal shortcut method to determine the queue counter for a given intent. */ 1855 @property ref auto queueCounter(LockingIntent INTENT)() 1856 { 1857 static if (INTENT == LockingIntent.READ_ONLY) 1858 return m_waitingForReadLock; 1859 else 1860 return m_waitingForWriteLock; 1861 } 1862 1863 /** An internal shortcut method to determine the current emitCount of the queue counter for a given intent. */ 1864 int emitCount(LockingIntent INTENT)() 1865 { 1866 return queueEvent!INTENT.emitCount(); 1867 } 1868 1869 /** An internal shortcut method to determine the active counter for a given intent. */ 1870 @property ref auto activeCounter(LockingIntent INTENT)() 1871 { 1872 static if (INTENT == LockingIntent.READ_ONLY) 1873 return m_activeReadLocks; 1874 else 1875 return m_activeWriteLocks; 1876 } 1877 1878 /** An internal shortcut method to wait for the queue event for a given intent. 1879 * 1880 * This method is used during the `lock()` operation, after a 1881 * `tryLock()` operation has been unsuccessfully finished. 1882 * The active fiber will yield and be suspended until the queue event 1883 * for the given intent will be fired. 1884 */ 1885 int wait(LockingIntent INTENT)(int count) 1886 { 1887 static if (INTERRUPTIBLE) 1888 return queueEvent!INTENT.wait(count); 1889 else 1890 return queueEvent!INTENT.waitUninterruptible(count); 1891 } 1892 1893 /** An internal shortcut method to notify tasks waiting for the lock to become available again. 1894 * 1895 * This method is called whenever the number of owners of the mutex hits 1896 * zero; this is basically the counterpart to `wait()`. 1897 * It wakes any Task currently waiting for the mutex to be released. 1898 */ 1899 @trusted void notify(LockingIntent INTENT)() 1900 { 1901 static if (INTENT == LockingIntent.READ_ONLY) 1902 { //If the last reader unlocks the mutex, notify all waiting writers 1903 if (atomicLoad(m_waitingForWriteLock) > 0) 1904 m_readyForWriteLock.emit(); 1905 } 1906 else 1907 { //If a writer unlocks the mutex, notify both readers and writers 1908 if (atomicLoad(m_waitingForReadLock) > 0) 1909 m_readyForReadLock.emit(); 1910 1911 if (atomicLoad(m_waitingForWriteLock) > 0) 1912 m_readyForWriteLock.emit(); 1913 } 1914 } 1915 1916 /** An internal method that performs the acquisition attempt in different variations. 1917 * 1918 * Since both locks rely on a common TaskMutex object which gates the access 1919 * to their common data acquisition attempts for this lock are more complex 1920 * than for simple mutex variants. This method will thus be performing the 1921 * `tryLock()` operation in two variations, depending on the callee: 1922 * 1923 * If called from the outside ($(D WAIT_FOR_BLOCKING_MUTEX) = false), the method 1924 * will instantly fail if the underlying mutex is locked (i.e. during another 1925 * `tryLock()` or `unlock()` operation), in order to guarantee the fastest 1926 * possible locking attempt. 1927 * 1928 * If used internally by the `lock()` method ($(D WAIT_FOR_BLOCKING_MUTEX) = true), 1929 * the operation will wait for the mutex to be available before deciding if 1930 * the lock can be acquired, since the attempt would anyway be repeated until 1931 * it succeeds. This will prevent frequent retries under heavy loads and thus 1932 * should ensure better performance. 1933 */ 1934 @trusted bool tryLock(LockingIntent INTENT, bool WAIT_FOR_BLOCKING_MUTEX)() 1935 { 1936 //Log a debug statement for the attempt 1937 version(RWMutexPrint) 1938 printInfo!("tryLock",INTENT)(); 1939 1940 //Try to acquire the lock 1941 static if (!WAIT_FOR_BLOCKING_MUTEX) 1942 { 1943 if (!m_counterMutex.tryLock()) 1944 return false; 1945 } 1946 else 1947 m_counterMutex.lock(); 1948 1949 scope(exit) 1950 m_counterMutex.unlock(); 1951 1952 //Log a debug statement for the attempt 1953 version(RWMutexPrint) 1954 printInfo!("checkCtrs",INTENT)(); 1955 1956 //Check if there's already an active writer 1957 if (m_activeWriteLocks > 0) 1958 return false; 1959 1960 //If writers are preferred over readers, check whether there 1961 //currently is a writer in the waiting queue and abort if 1962 //that's the case. 1963 static if (INTENT == LockingIntent.READ_ONLY) 1964 if (m_policy.PREFER_WRITERS && m_waitingForWriteLock > 0) 1965 return false; 1966 1967 //If we are locking the mutex for writing, make sure that 1968 //there's no reader active. 1969 static if (INTENT == LockingIntent.READ_WRITE) 1970 if (m_activeReadLocks > 0) 1971 return false; 1972 1973 //We can successfully acquire the lock! 1974 //Log a debug statement for the success. 1975 version(RWMutexPrint) 1976 printInfo!("lock",INTENT)(); 1977 1978 //Increase the according counter 1979 //(number of active readers/writers) 1980 //and return a success code. 1981 activeCounter!INTENT += 1; 1982 return true; 1983 } 1984 1985 /** Attempt to acquire the lock for a given intent. 1986 * 1987 * Returns: 1988 * `true`, if the lock was successfully acquired; 1989 * `false` otherwise. 1990 */ 1991 @trusted bool tryLock(LockingIntent INTENT)() 1992 { 1993 //Try to lock this mutex without waiting for the underlying 1994 //TaskMutex - fail if it is already blocked. 1995 return tryLock!(INTENT,false)(); 1996 } 1997 1998 /** Acquire the lock for the given intent; yield and suspend until the lock has been acquired. */ 1999 @trusted void lock(LockingIntent INTENT)() 2000 { 2001 //Prepare a waiting action before the first 2002 //`tryLock()` call in order to avoid a race 2003 //condition that could lead to the queue notification 2004 //not being fired. 2005 auto count = emitCount!INTENT; 2006 atomicOp!"+="(queueCounter!INTENT,1); 2007 scope(exit) 2008 atomicOp!"-="(queueCounter!INTENT,1); 2009 2010 //Try to lock the mutex 2011 auto locked = tryLock!(INTENT,true)(); 2012 if (locked) 2013 return; 2014 2015 //Retry until we successfully acquired the lock 2016 while(!locked) 2017 { 2018 version(RWMutexPrint) 2019 printInfo!("wait",INTENT)(); 2020 2021 count = wait!INTENT(count); 2022 locked = tryLock!(INTENT,true)(); 2023 } 2024 } 2025 2026 /** Unlock the mutex after a successful acquisition. */ 2027 @trusted void unlock(LockingIntent INTENT)() 2028 { 2029 version(RWMutexPrint) 2030 printInfo!("unlock",INTENT)(); 2031 2032 debug assert(activeCounter!INTENT > 0); 2033 2034 synchronized(m_counterMutex) 2035 { 2036 //Decrement the counter of active lock holders. 2037 //If the counter hits zero, notify waiting Tasks 2038 activeCounter!INTENT -= 1; 2039 if (activeCounter!INTENT == 0) 2040 { 2041 version(RWMutexPrint) 2042 printInfo!("notify",INTENT)(); 2043 2044 notify!INTENT(); 2045 } 2046 } 2047 } 2048 } 2049 2050 /** A ReadWriteMutex implementation for fibers. 2051 * 2052 * This mutex can be used in exchange for a $(D core.sync.mutex.ReadWriteMutex), 2053 * but does not block the event loop in contention situations. The `reader` and `writer` 2054 * members are used for locking. Locking the `reader` mutex allows access to multiple 2055 * readers at once, while the `writer` mutex only allows a single writer to lock it at 2056 * any given time. Locks on `reader` and `writer` are mutually exclusive (i.e. whenever a 2057 * writer is active, no readers can be active at the same time, and vice versa). 2058 * 2059 * Notice: 2060 * Mutexes implemented by this class cannot be interrupted 2061 * using $(D vibe.core.task.Task.interrupt()). The corresponding 2062 * InterruptException will be deferred until the next blocking 2063 * operation yields the event loop. 2064 * 2065 * Use $(D InterruptibleTaskReadWriteMutex) as an alternative that can be 2066 * interrupted. 2067 * 2068 * cf. $(D core.sync.mutex.ReadWriteMutex) 2069 */ 2070 final class TaskReadWriteMutex 2071 { 2072 private { 2073 alias State = ReadWriteMutexState!false; 2074 alias LockingIntent = State.LockingIntent; 2075 alias READ_ONLY = LockingIntent.READ_ONLY; 2076 alias READ_WRITE = LockingIntent.READ_WRITE; 2077 2078 /** The shared state used by the reader and writer mutexes. */ 2079 State m_state; 2080 } 2081 2082 /** The policy with which the mutex should operate. 2083 * 2084 * The policy determines how the acquisition of the locks is 2085 * performed and can be used to tune the mutex according to the 2086 * underlying algorithm in which it is used. 2087 * 2088 * According to the provided policy, the mutex will either favor 2089 * reading or writing tasks and could potentially starve the 2090 * respective opposite. 2091 * 2092 * cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy) 2093 */ 2094 alias Policy = State.Policy; 2095 2096 /** A common baseclass for both of the provided mutexes. 2097 * 2098 * The intent for the according mutex is specified through the 2099 * $(D INTENT) template argument, which determines if a mutex is 2100 * used for read or write locking. 2101 */ 2102 final class Mutex(LockingIntent INTENT): core.sync.mutex.Mutex, Lockable 2103 { 2104 /** Try to lock the mutex. cf. $(D core.sync.mutex.Mutex) */ 2105 override bool tryLock() { return m_state.tryLock!INTENT(); } 2106 /** Lock the mutex. cf. $(D core.sync.mutex.Mutex) */ 2107 override void lock() { m_state.lock!INTENT(); } 2108 /** Unlock the mutex. cf. $(D core.sync.mutex.Mutex) */ 2109 override void unlock() { m_state.unlock!INTENT(); } 2110 } 2111 alias Reader = Mutex!READ_ONLY; 2112 alias Writer = Mutex!READ_WRITE; 2113 2114 Reader reader; 2115 Writer writer; 2116 2117 this(Policy policy = Policy.PREFER_WRITERS) 2118 { 2119 m_state = State(policy); 2120 reader = new Reader(); 2121 writer = new Writer(); 2122 } 2123 2124 /** The policy with which the lock has been created. */ 2125 @property Policy policy() const { return m_state.policy; } 2126 } 2127 2128 /** Alternative to $(D TaskReadWriteMutex) that supports interruption. 2129 * 2130 * This class supports the use of $(D vibe.core.task.Task.interrupt()) while 2131 * waiting in the `lock()` method. 2132 * 2133 * cf. $(D core.sync.mutex.ReadWriteMutex) 2134 */ 2135 final class InterruptibleTaskReadWriteMutex 2136 { 2137 @safe: 2138 2139 private { 2140 alias State = ReadWriteMutexState!true; 2141 alias LockingIntent = State.LockingIntent; 2142 alias READ_ONLY = LockingIntent.READ_ONLY; 2143 alias READ_WRITE = LockingIntent.READ_WRITE; 2144 2145 /** The shared state used by the reader and writer mutexes. */ 2146 State m_state; 2147 } 2148 2149 /** The policy with which the mutex should operate. 2150 * 2151 * The policy determines how the acquisition of the locks is 2152 * performed and can be used to tune the mutex according to the 2153 * underlying algorithm in which it is used. 2154 * 2155 * According to the provided policy, the mutex will either favor 2156 * reading or writing tasks and could potentially starve the 2157 * respective opposite. 2158 * 2159 * cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy) 2160 */ 2161 alias Policy = State.Policy; 2162 2163 /** A common baseclass for both of the provided mutexes. 2164 * 2165 * The intent for the according mutex is specified through the 2166 * $(D INTENT) template argument, which determines if a mutex is 2167 * used for read or write locking. 2168 * 2169 */ 2170 final class Mutex(LockingIntent INTENT): core.sync.mutex.Mutex, Lockable 2171 { 2172 /** Try to lock the mutex. cf. $(D core.sync.mutex.Mutex) */ 2173 override bool tryLock() { return m_state.tryLock!INTENT(); } 2174 /** Lock the mutex. cf. $(D core.sync.mutex.Mutex) */ 2175 override void lock() { m_state.lock!INTENT(); } 2176 /** Unlock the mutex. cf. $(D core.sync.mutex.Mutex) */ 2177 override void unlock() { m_state.unlock!INTENT(); } 2178 } 2179 alias Reader = Mutex!READ_ONLY; 2180 alias Writer = Mutex!READ_WRITE; 2181 2182 Reader reader; 2183 Writer writer; 2184 2185 this(Policy policy = Policy.PREFER_WRITERS) 2186 { 2187 m_state = State(policy); 2188 reader = new Reader(); 2189 writer = new Writer(); 2190 } 2191 2192 /** The policy with which the lock has been created. */ 2193 @property Policy policy() const { return m_state.policy; } 2194 }