1 /** 2 Contains interfaces and enums for evented I/O drivers. 3 4 Copyright: © 2012-2020 Sönke Ludwig 5 Authors: Sönke Ludwig 6 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 7 */ 8 module vibe.core.task; 9 10 import vibe.core.log; 11 import vibe.core.sync; 12 13 import core.atomic : atomicOp, atomicLoad, atomicStore, cas; 14 import core.thread; 15 import std.exception; 16 import std.traits; 17 import std.typecons; 18 19 20 /** Represents a single task as started using vibe.core.runTask. 21 22 Note that the Task type is considered weakly isolated and thus can be 23 passed between threads using vibe.core.concurrency.send or by passing 24 it as a parameter to vibe.core.core.runWorkerTask. 25 */ 26 struct Task { 27 private { 28 shared(TaskFiber) m_fiber; 29 size_t m_taskCounter; 30 import std.concurrency : ThreadInfo, Tid; 31 static ThreadInfo s_tidInfo; 32 } 33 34 enum basePriority = 0x00010000; 35 36 private this(TaskFiber fiber, size_t task_counter) 37 @safe nothrow { 38 () @trusted { m_fiber = cast(shared)fiber; } (); 39 m_taskCounter = task_counter; 40 } 41 42 this(in Task other) 43 @safe nothrow { 44 m_fiber = () @trusted { return cast(shared(TaskFiber))other.m_fiber; } (); 45 m_taskCounter = other.m_taskCounter; 46 } 47 48 /** Returns the Task instance belonging to the calling task. 49 */ 50 static Task getThis() @safe nothrow 51 { 52 // In 2067, synchronized statements where annotated nothrow. 53 // DMD#4115, Druntime#1013, Druntime#1021, Phobos#2704 54 // However, they were "logically" nothrow before. 55 static if (__VERSION__ <= 2066) 56 scope (failure) assert(0, "Internal error: function should be nothrow"); 57 58 auto fiber = () @trusted { return Fiber.getThis(); } (); 59 if (!fiber) return Task.init; 60 auto tfiber = cast(TaskFiber)fiber; 61 if (!tfiber) return Task.init; 62 // FIXME: returning a non-.init handle for a finished task might break some layered logic 63 return Task(tfiber, tfiber.getTaskStatusFromOwnerThread().counter); 64 } 65 66 nothrow { 67 package @property inout(TaskFiber) taskFiber() inout @system { return cast(inout(TaskFiber))m_fiber; } 68 @property inout(Fiber) fiber() inout @system { return this.taskFiber; } 69 @property size_t taskCounter() const @safe { return m_taskCounter; } 70 @property inout(Thread) thread() inout @trusted { if (m_fiber) return this.taskFiber.thread; return null; } 71 72 /** Determines if the task is still running or scheduled to be run. 73 */ 74 @property bool running() 75 const @trusted { 76 assert(m_fiber !is null, "Invalid task handle"); 77 auto tf = this.taskFiber; 78 try if (tf.state == Fiber.State.TERM) return false; catch (Throwable) {} 79 auto st = m_fiber.getTaskStatus(); 80 if (st.counter != m_taskCounter) 81 return false; 82 return st.initialized; 83 } 84 85 package @property ref ThreadInfo tidInfo() @system { return m_fiber ? taskFiber.tidInfo : s_tidInfo; } // FIXME: this is not thread safe! 86 package @property ref const(ThreadInfo) tidInfo() const @system { return m_fiber ? taskFiber.tidInfo : s_tidInfo; } // FIXME: this is not thread safe! 87 88 /** Gets the `Tid` associated with this task for use with 89 `std.concurrency`. 90 */ 91 @property Tid tid() @trusted { return tidInfo.ident; } 92 /// ditto 93 @property const(Tid) tid() const @trusted { return tidInfo.ident; } 94 } 95 96 T opCast(T)() const @safe nothrow if (is(T == bool)) { return m_fiber !is null; } 97 98 void join() @trusted { if (m_fiber) m_fiber.join!true(m_taskCounter); } 99 void joinUninterruptible() @trusted nothrow { if (m_fiber) m_fiber.join!false(m_taskCounter); } 100 void interrupt() @trusted nothrow { if (m_fiber && this.running) m_fiber.interrupt(m_taskCounter); } 101 102 unittest { // regression test for bogus "task cannot interrupt itself" 103 import vibe.core.core : runTask; 104 auto t = runTask({}); 105 t.join(); 106 runTask({ t.interrupt(); }).join(); 107 } 108 109 string toString() const @safe { import std.string; return format("%s:%s", () @trusted { return cast(void*)m_fiber; } (), m_taskCounter); } 110 111 void getDebugID(R)(ref R dst) 112 { 113 import std.digest.md : MD5; 114 import std.bitmanip : nativeToLittleEndian; 115 import std.base64 : Base64; 116 117 if (!m_fiber) { 118 dst.put("----"); 119 return; 120 } 121 122 MD5 md; 123 md.start(); 124 md.put(nativeToLittleEndian(() @trusted { return cast(size_t)cast(void*)m_fiber; } ())); 125 md.put(nativeToLittleEndian(m_taskCounter)); 126 Base64.encode(md.finish()[0 .. 3], dst); 127 if (!this.running) dst.put("-fin"); 128 } 129 string getDebugID() 130 @trusted { 131 import std.array : appender; 132 auto app = appender!string; 133 getDebugID(app); 134 return app.data; 135 } 136 137 // Remove me when `-preview=in` becomes the default 138 static if (is(typeof(mixin(q{(in ref int a) => a})))) 139 mixin(q{ 140 bool opEquals(in ref Task other) const @safe nothrow { 141 return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; 142 }}); 143 bool opEquals(in Task other) const @safe nothrow { 144 return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; 145 } 146 } 147 148 /** Settings to control the behavior of newly started tasks. 149 */ 150 struct TaskSettings { 151 /** Scheduling priority of the task 152 153 The priority of a task is roughly proportional to the amount of 154 times it gets scheduled in comparison to competing tasks. For 155 example, a task with priority 100 will be scheduled every 10 rounds 156 when competing against a task with priority 1000. 157 158 The default priority is defined by `basePriority` and has a value 159 of 65536. Priorities should be computed relative to `basePriority`. 160 161 A task with a priority of zero will only be executed if no other 162 non-zero task is competing. 163 */ 164 uint priority = Task.basePriority; 165 } 166 167 168 /** 169 Implements a task local storage variable. 170 171 Task local variables, similar to thread local variables, exist separately 172 in each task. Consequently, they do not need any form of synchronization 173 when accessing them. 174 175 Note, however, that each TaskLocal variable will increase the memory footprint 176 of any task that uses task local storage. There is also an overhead to access 177 TaskLocal variables, higher than for thread local variables, but generelly 178 still O(1) (since actual storage acquisition is done lazily the first access 179 can require a memory allocation with unknown computational costs). 180 181 Notice: 182 FiberLocal instances MUST be declared as static/global thread-local 183 variables. Defining them as a temporary/stack variable will cause 184 crashes or data corruption! 185 186 Examples: 187 --- 188 TaskLocal!string s_myString = "world"; 189 190 void taskFunc() 191 { 192 assert(s_myString == "world"); 193 s_myString = "hello"; 194 assert(s_myString == "hello"); 195 } 196 197 shared static this() 198 { 199 // both tasks will get independent storage for s_myString 200 runTask(&taskFunc); 201 runTask(&taskFunc); 202 } 203 --- 204 */ 205 struct TaskLocal(T) 206 { 207 private { 208 size_t m_offset = size_t.max; 209 size_t m_id; 210 T m_initValue; 211 bool m_hasInitValue = false; 212 } 213 214 this(T init_val) { m_initValue = init_val; m_hasInitValue = true; } 215 216 @disable this(this); 217 218 void opAssign(T value) { this.storage = value; } 219 220 @property ref T storage() 221 @safe { 222 import std.conv : emplace; 223 224 auto fiber = TaskFiber.getThis(); 225 226 // lazily register in FLS storage 227 if (m_offset == size_t.max) { 228 static assert(T.alignof <= 8, "Unsupported alignment for type "~T.stringof); 229 assert(TaskFiber.ms_flsFill % 8 == 0, "Misaligned fiber local storage pool."); 230 m_offset = TaskFiber.ms_flsFill; 231 m_id = TaskFiber.ms_flsCounter++; 232 233 234 TaskFiber.ms_flsFill += T.sizeof; 235 while (TaskFiber.ms_flsFill % 8 != 0) 236 TaskFiber.ms_flsFill++; 237 } 238 239 // make sure the current fiber has enough FLS storage 240 if (fiber.m_fls.length < TaskFiber.ms_flsFill) { 241 fiber.m_fls.length = TaskFiber.ms_flsFill + 128; 242 () @trusted { fiber.m_flsInit.length = TaskFiber.ms_flsCounter + 64; } (); 243 } 244 245 // return (possibly default initialized) value 246 auto data = () @trusted { return fiber.m_fls.ptr[m_offset .. m_offset+T.sizeof]; } (); 247 if (!() @trusted { return fiber.m_flsInit[m_id]; } ()) { 248 () @trusted { fiber.m_flsInit[m_id] = true; } (); 249 import std.traits : hasElaborateDestructor, hasAliasing; 250 static if (hasElaborateDestructor!T || hasAliasing!T) { 251 void function(void[], size_t) destructor = (void[] fls, size_t offset){ 252 static if (hasElaborateDestructor!T) { 253 auto obj = cast(T*)&fls[offset]; 254 // call the destructor on the object if a custom one is known declared 255 obj.destroy(); 256 } 257 else static if (hasAliasing!T) { 258 // zero the memory to avoid false pointers 259 foreach (size_t i; offset .. offset + T.sizeof) { 260 ubyte* u = cast(ubyte*)&fls[i]; 261 *u = 0; 262 } 263 } 264 }; 265 FLSInfo fls_info; 266 fls_info.fct = destructor; 267 fls_info.offset = m_offset; 268 269 // make sure flsInfo has enough space 270 if (TaskFiber.ms_flsInfo.length <= m_id) 271 TaskFiber.ms_flsInfo.length = m_id + 64; 272 273 TaskFiber.ms_flsInfo[m_id] = fls_info; 274 } 275 276 if (m_hasInitValue) { 277 static if (__traits(compiles, () @trusted { emplace!T(data, m_initValue); } ())) 278 () @trusted { emplace!T(data, m_initValue); } (); 279 else assert(false, "Cannot emplace initialization value for type "~T.stringof); 280 } else () @trusted { emplace!T(data); } (); 281 } 282 return *() @trusted { return cast(T*)data.ptr; } (); 283 } 284 285 alias storage this; 286 } 287 288 289 /** Exception that is thrown by Task.interrupt. 290 */ 291 class InterruptException : Exception { 292 this() 293 @safe nothrow { 294 super("Task interrupted."); 295 } 296 } 297 298 /** 299 High level state change events for a Task 300 */ 301 enum TaskEvent { 302 preStart, /// Just about to invoke the fiber which starts execution 303 postStart, /// After the fiber has returned for the first time (by yield or exit) 304 start, /// Just about to start execution 305 yield, /// Temporarily paused 306 resume, /// Resumed from a prior yield 307 end, /// Ended normally 308 fail /// Ended with an exception 309 } 310 311 struct TaskCreationInfo { 312 Task handle; 313 const(void)* functionPointer; 314 } 315 316 alias TaskEventCallback = void function(TaskEvent, Task) nothrow; 317 alias TaskCreationCallback = void function(ref TaskCreationInfo) nothrow @safe; 318 319 /** 320 The maximum combined size of all parameters passed to a task delegate 321 322 See_Also: runTask 323 */ 324 enum maxTaskParameterSize = 128; 325 326 327 /** The base class for a task aka Fiber. 328 329 This class represents a single task that is executed concurrently 330 with other tasks. Each task is owned by a single thread. 331 */ 332 final package class TaskFiber : Fiber { 333 import std.concurrency : Tid, thisTid; 334 335 static if ((void*).sizeof >= 8) enum defaultTaskStackSize = 16*1024*1024; 336 else enum defaultTaskStackSize = 512*1024; 337 338 private enum Flags { 339 running = 1UL << 0, 340 initialized = 1UL << 1, 341 interrupt = 1UL << 2, 342 343 shiftAmount = 3, 344 flagsMask = (1<<shiftAmount) - 1 345 } 346 347 private { 348 import std.concurrency : ThreadInfo; 349 import std.bitmanip : BitArray; 350 351 // task queue management (TaskScheduler.m_taskQueue) 352 TaskFiber m_prev, m_next; 353 TaskFiberQueue* m_queue; 354 355 Thread m_thread; 356 ThreadInfo m_tidInfo; 357 uint m_staticPriority, m_dynamicPriority; 358 shared ulong m_taskCounterAndFlags = 0; // bits 0-Flags.shiftAmount are flags 359 360 bool m_shutdown = false; 361 shared(bool) m_tidUsed = false; 362 363 shared(ManualEvent) m_onExit; 364 365 // task local storage 366 BitArray m_flsInit; 367 void[] m_fls; 368 369 package int m_yieldLockCount; 370 371 static TaskFiber ms_globalDummyFiber; 372 static FLSInfo[] ms_flsInfo; 373 static size_t ms_flsFill = 0; // thread-local 374 static size_t ms_flsCounter = 0; 375 } 376 377 package { 378 TaskFuncInfo m_taskFunc; 379 __gshared size_t ms_taskStackSize = defaultTaskStackSize; 380 __gshared debug TaskEventCallback ms_taskEventCallback; 381 __gshared debug TaskCreationCallback ms_taskCreationCallback; 382 383 debug (VibeRunningTasks) 384 static string[TaskFiber] s_runningTasks; 385 } 386 387 this() 388 @trusted nothrow { 389 super(&run, ms_taskStackSize); 390 m_onExit = createSharedManualEvent(); 391 m_thread = Thread.getThis(); 392 } 393 394 static TaskFiber getThis() 395 @safe nothrow { 396 auto f = () @trusted nothrow { return Fiber.getThis(); } (); 397 if (auto tf = cast(TaskFiber)f) return tf; 398 if (!ms_globalDummyFiber) ms_globalDummyFiber = new TaskFiber; 399 return ms_globalDummyFiber; 400 } 401 402 // expose Fiber.state as @safe on older DMD versions 403 static if (!__traits(compiles, () @safe { return Fiber.init.state; } ())) 404 @property State state() @trusted const nothrow { return super.state; } 405 406 private void run() 407 nothrow { 408 import std.algorithm.mutation : swap; 409 import std.encoding : sanitize; 410 import vibe.core.core : isEventLoopRunning, recycleFiber, taskScheduler, yield, yieldLock; 411 412 version (VibeDebugCatchAll) alias UncaughtException = Throwable; 413 else alias UncaughtException = Exception; 414 try { 415 // force creation of a message box/Tid 416 try thisTid; 417 catch (Exception e) assert(false, e.msg); 418 419 while (true) { 420 while (!m_taskFunc.func) { 421 try { 422 debug (VibeTaskLog) logTrace("putting fiber to sleep waiting for new task..."); 423 static if (__VERSION__ >= 2090) { 424 import core.memory : GC; 425 assert(!GC.inFinalizer, "Yiedling within finalizer - this would most likely result in a dead-lock!"); 426 } 427 Fiber.yield(); 428 } catch (Exception e) { 429 e.logException!(LogLevel.warn)( 430 "CoreTaskFiber was resumed with exception but without active task"); 431 } 432 if (m_shutdown) return; 433 } 434 435 debug assert(Thread.getThis() is m_thread, "Fiber moved between threads!?"); 436 437 TaskFuncInfo task; 438 swap(task, m_taskFunc); 439 m_dynamicPriority = m_staticPriority = task.settings.priority; 440 Task handle = this.task; 441 try { 442 atomicOp!"|="(m_taskCounterAndFlags, Flags.running); // set running 443 scope(exit) atomicOp!"&="(m_taskCounterAndFlags, ~Flags.flagsMask); // clear running/initialized 444 445 debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.start, handle); 446 if (!isEventLoopRunning) { 447 debug (VibeTaskLog) logTrace("Event loop not running at task start - yielding."); 448 taskScheduler.yieldUninterruptible(); 449 debug (VibeTaskLog) logTrace("Initial resume of task."); 450 } 451 452 debug (VibeRunningTasks) s_runningTasks[this] = "initial"; 453 454 task.call(); 455 debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.end, handle); 456 457 debug if (() @trusted { return (cast(shared)this); } ().getTaskStatus().interrupt) 458 logDebugV("Task exited while an interrupt was in flight."); 459 } catch (Exception e) { 460 debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.fail, handle); 461 e.logException!(LogLevel.critical)("Task terminated with uncaught exception"); 462 } 463 464 debug (VibeRunningTasks) s_runningTasks.remove(this); 465 466 debug assert(Thread.getThis() is m_thread, "Fiber moved?"); 467 468 // re-create message box if it has been used to guarantee a 469 // unique Tid per task 470 if (atomicLoad(m_tidUsed)) { 471 m_tidInfo.cleanup(); 472 m_tidInfo.ident = Tid.init; 473 try thisTid; 474 catch (Exception e) assert(false, e.msg); 475 atomicStore(m_tidUsed, false); 476 } 477 478 debug (VibeTaskLog) logTrace("Notifying joining tasks."); 479 480 // Issue #161: This fiber won't be resumed before the next task 481 // is assigned, because it is already marked as de-initialized. 482 // Since ManualEvent.emit() will need to switch tasks, this 483 // would mean that only the first waiter is notified before 484 // this fiber gets a new task assigned. 485 // Using a yield lock forces all corresponding tasks to be 486 // enqueued into the schedule queue and resumed in sequence 487 // at the end of the scope. 488 auto l = yieldLock(); 489 490 m_onExit.emit(); 491 492 // make sure that the task does not get left behind in the yielder queue if terminated during yield() 493 if (m_queue) m_queue.remove(this); 494 495 // zero the fls initialization ByteArray for memory safety 496 foreach (size_t i, ref bool b; m_flsInit) { 497 if (b) { 498 if (ms_flsInfo !is null && ms_flsInfo.length >= i && ms_flsInfo[i] != FLSInfo.init) 499 ms_flsInfo[i].destroy(m_fls); 500 b = false; 501 } 502 } 503 504 assert(!m_queue, "Fiber done but still scheduled to be resumed!?"); 505 506 debug assert(Thread.getThis() is m_thread, "Fiber moved between threads!?"); 507 508 // make the fiber available for the next task 509 recycleFiber(this); 510 } 511 } catch (UncaughtException th) { 512 th.logException("CoreTaskFiber was terminated unexpectedly"); 513 } catch (Throwable th) { 514 import std.stdio : stderr, writeln; 515 import core.stdc.stdlib : abort; 516 try stderr.writeln(th); 517 catch (Exception e) { 518 try stderr.writeln(th.msg); 519 catch (Exception e) {} 520 } 521 abort(); 522 } 523 } 524 525 526 /** Returns the thread that owns this task. 527 */ 528 @property inout(Thread) thread() inout @safe nothrow { return m_thread; } 529 530 /** Returns the handle of the current Task running on this fiber. 531 */ 532 @property Task task() 533 @safe nothrow { 534 auto ts = getTaskStatusFromOwnerThread(); 535 if (!ts.initialized) return Task.init; 536 return Task(this, ts.counter); 537 } 538 539 @property ref inout(ThreadInfo) tidInfo() inout @trusted nothrow { atomicStore((cast()this).m_tidUsed, true); return m_tidInfo; } 540 541 /** Shuts down the task handler loop. 542 */ 543 void shutdown() 544 @safe nothrow { 545 debug assert(Thread.getThis() is m_thread); 546 547 assert(!() @trusted { return cast(shared)this; } ().getTaskStatus().initialized); 548 549 m_shutdown = true; 550 while (state != Fiber.State.TERM) 551 () @trusted { 552 try call(Fiber.Rethrow.no); 553 catch (Exception e) assert(false, e.msg); 554 } (); 555 } 556 557 /** Blocks until the task has ended. 558 */ 559 void join(bool interruptiple)(size_t task_counter) 560 shared @trusted { 561 auto cnt = m_onExit.emitCount; 562 while (true) { 563 auto st = getTaskStatus(); 564 if (!st.initialized || st.counter != task_counter) 565 break; 566 static if (interruptiple) 567 cnt = m_onExit.wait(cnt); 568 else 569 cnt = m_onExit.waitUninterruptible(cnt); 570 } 571 } 572 573 /** Throws an InterruptExeption within the task as soon as it calls an interruptible function. 574 */ 575 void interrupt(size_t task_counter) 576 shared @safe nothrow { 577 import vibe.core.core : taskScheduler; 578 579 auto caller = () @trusted { return cast(shared)TaskFiber.getThis(); } (); 580 581 assert(caller !is this, "A task cannot interrupt itself."); 582 583 while (true) { 584 auto tcf = atomicLoad(m_taskCounterAndFlags); 585 auto st = getTaskStatus(tcf); 586 if (!st.initialized || st.interrupt || st.counter != task_counter) 587 return; 588 auto tcf_int = tcf | Flags.interrupt; 589 if (cas(&m_taskCounterAndFlags, tcf, tcf_int)) 590 break; 591 } 592 593 if (caller.m_thread is m_thread) { 594 auto thisus = () @trusted { return cast()this; } (); 595 debug (VibeTaskLog) logTrace("Resuming task with interrupt flag."); 596 auto defer = caller.m_yieldLockCount > 0; 597 taskScheduler.switchTo(thisus.task, defer ? TaskSwitchPriority.prioritized : TaskSwitchPriority.immediate); 598 } else { 599 debug (VibeTaskLog) logTrace("Set interrupt flag on task without resuming."); 600 } 601 } 602 603 /** Sets the fiber to initialized state and increments the task counter. 604 605 Note that the task information needs to be set up first. 606 */ 607 void bumpTaskCounter() 608 @safe nothrow { 609 debug { 610 auto ts = atomicLoad(m_taskCounterAndFlags); 611 assert((ts & Flags.flagsMask) == 0, "bumpTaskCounter() called on fiber with non-zero flags"); 612 assert(m_taskFunc.func !is null, "bumpTaskCounter() called without initializing the task function"); 613 } 614 615 () @trusted { atomicOp!"+="(m_taskCounterAndFlags, (1 << Flags.shiftAmount) + Flags.initialized); } (); 616 } 617 618 private auto getTaskStatus() 619 shared const @safe nothrow { 620 return getTaskStatus(atomicLoad(m_taskCounterAndFlags)); 621 } 622 623 private auto getTaskStatusFromOwnerThread() 624 const @safe nothrow { 625 debug assert(Thread.getThis() is m_thread); 626 return getTaskStatus(atomicLoad(m_taskCounterAndFlags)); 627 } 628 629 private static auto getTaskStatus(ulong counter_and_flags) 630 @safe nothrow { 631 static struct S { 632 size_t counter; 633 bool running; 634 bool initialized; 635 bool interrupt; 636 } 637 S ret; 638 ret.counter = cast(size_t)(counter_and_flags >> Flags.shiftAmount); 639 ret.running = (counter_and_flags & Flags.running) != 0; 640 ret.initialized = (counter_and_flags & Flags.initialized) != 0; 641 ret.interrupt = (counter_and_flags & Flags.interrupt) != 0; 642 return ret; 643 } 644 645 package void handleInterrupt(scope void delegate() @safe nothrow on_interrupt) 646 @safe nothrow { 647 assert(() @trusted { return Task.getThis().fiber; } () is this, 648 "Handling interrupt outside of the corresponding fiber."); 649 if (getTaskStatusFromOwnerThread().interrupt && on_interrupt) { 650 debug (VibeTaskLog) logTrace("Handling interrupt flag."); 651 clearInterruptFlag(); 652 on_interrupt(); 653 } 654 } 655 656 package void handleInterrupt() 657 @safe { 658 assert(() @trusted { return Task.getThis().fiber; } () is this, 659 "Handling interrupt outside of the corresponding fiber."); 660 if (getTaskStatusFromOwnerThread().interrupt) { 661 clearInterruptFlag(); 662 throw new InterruptException; 663 } 664 } 665 666 private void clearInterruptFlag() 667 @safe nothrow { 668 auto tcf = atomicLoad(m_taskCounterAndFlags); 669 auto st = getTaskStatus(tcf); 670 while (true) { 671 assert(st.initialized); 672 if (!st.interrupt) break; 673 auto tcf_int = tcf & ~Flags.interrupt; 674 if (cas(&m_taskCounterAndFlags, tcf, tcf_int)) 675 break; 676 } 677 } 678 } 679 680 681 /** Controls the priority to use for switching execution to a task. 682 */ 683 enum TaskSwitchPriority { 684 /** Rescheduled according to the tasks priority 685 */ 686 normal, 687 688 /** Rescheduled with maximum priority. 689 690 The task will resume as soon as the current task yields. 691 */ 692 prioritized, 693 694 /** Switch to the task immediately. 695 */ 696 immediate 697 } 698 699 package struct TaskFuncInfo { 700 void function(ref TaskFuncInfo) func; 701 void[2*size_t.sizeof] callable; 702 void[maxTaskParameterSize] args; 703 debug ulong functionPointer; 704 TaskSettings settings; 705 706 void set(CALLABLE, ARGS...)(ref CALLABLE callable, ref ARGS args) 707 { 708 assert(!func, "Setting TaskFuncInfo that is already set."); 709 710 import std.algorithm : move; 711 import std.traits : hasElaborateAssign; 712 import std.conv : to; 713 714 static struct TARGS { ARGS expand; } 715 716 static assert(CALLABLE.sizeof <= TaskFuncInfo.callable.length, 717 "Storage required for task callable is too large ("~CALLABLE.sizeof~" vs max "~callable.length~"): "~CALLABLE.stringof); 718 static assert(TARGS.sizeof <= maxTaskParameterSize, 719 "The arguments passed to run(Worker)Task must not exceed "~ 720 maxTaskParameterSize.to!string~" bytes in total size: "~TARGS.sizeof.stringof~" bytes"); 721 722 debug functionPointer = callPointer(callable); 723 724 static void callDelegate(ref TaskFuncInfo tfi) { 725 assert(tfi.func is &callDelegate, "Wrong callDelegate called!?"); 726 727 // copy original call data to stack 728 CALLABLE c; 729 TARGS args; 730 move(*(cast(CALLABLE*)tfi.callable.ptr), c); 731 move(*(cast(TARGS*)tfi.args.ptr), args); 732 733 // reset the info 734 tfi.func = null; 735 736 // make the call 737 mixin(callWithMove!ARGS("c", "args.expand")); 738 } 739 740 func = &callDelegate; 741 742 () @trusted { 743 static if (hasElaborateAssign!CALLABLE) initCallable!CALLABLE(); 744 static if (hasElaborateAssign!TARGS) initArgs!TARGS(); 745 typedCallable!CALLABLE = callable; 746 foreach (i, A; ARGS) { 747 static if (needsMove!A) args[i].move(typedArgs!TARGS.expand[i]); 748 else typedArgs!TARGS.expand[i] = args[i]; 749 } 750 } (); 751 } 752 753 void call() 754 { 755 this.func(this); 756 } 757 758 @property ref C typedCallable(C)() 759 { 760 static assert(C.sizeof <= callable.sizeof); 761 return *cast(C*)callable.ptr; 762 } 763 764 @property ref A typedArgs(A)() 765 { 766 static assert(A.sizeof <= args.sizeof); 767 return *cast(A*)args.ptr; 768 } 769 770 void initCallable(C)() 771 nothrow { 772 static const C cinit; 773 this.callable[0 .. C.sizeof] = cast(void[])(&cinit)[0 .. 1]; 774 } 775 776 void initArgs(A)() 777 nothrow { 778 static const A ainit; 779 this.args[0 .. A.sizeof] = cast(void[])(&ainit)[0 .. 1]; 780 } 781 } 782 783 private ulong callPointer(C)(ref C callable) 784 @trusted nothrow @nogc { 785 alias IP = ulong; 786 static if (is(C == function)) return cast(IP)cast(void*)callable; 787 else static if (is(C == delegate)) return cast(IP)callable.funcptr; 788 else static if (is(typeof(&callable.opCall) == function)) return cast(IP)cast(void*)&callable.opCall; 789 else static if (is(typeof(&callable.opCall) == delegate)) return cast(IP)(&callable.opCall).funcptr; 790 else return cast(IP)&callable; 791 } 792 793 package struct TaskScheduler { 794 import eventcore.driver : ExitReason; 795 import eventcore.core : eventDriver; 796 797 private { 798 TaskFiberQueue m_taskQueue; 799 } 800 801 @safe: 802 803 @disable this(this); 804 805 @property size_t scheduledTaskCount() const nothrow { return m_taskQueue.length; } 806 807 /** Lets other pending tasks execute before continuing execution. 808 809 This will give other tasks or events a chance to be processed. If 810 multiple tasks call this function, they will be processed in a 811 fírst-in-first-out manner. 812 */ 813 void yield() 814 { 815 auto t = Task.getThis(); 816 if (t == Task.init) return; // not really a task -> no-op 817 auto tf = () @trusted { return t.taskFiber; } (); 818 debug (VibeTaskLog) logTrace("Yielding (interrupt=%s)", () @trusted { return (cast(shared)tf).getTaskStatus().interrupt; } ()); 819 tf.handleInterrupt(); 820 if (tf.m_queue !is null) return; // already scheduled to be resumed 821 doYieldAndReschedule(t); 822 tf.handleInterrupt(); 823 } 824 825 nothrow: 826 827 /** Performs a single round of scheduling without blocking. 828 829 This will execute scheduled tasks and process events from the 830 event queue, as long as possible without having to wait. 831 832 Returns: 833 A reason is returned: 834 $(UL 835 $(LI `ExitReason.exit`: The event loop was exited due to a manual request) 836 $(LI `ExitReason.outOfWaiters`: There are no more scheduled 837 tasks or events, so the application would do nothing from 838 now on) 839 $(LI `ExitReason.idle`: All scheduled tasks and pending events 840 have been processed normally) 841 $(LI `ExitReason.timeout`: Scheduled tasks have been processed, 842 but there were no pending events present.) 843 ) 844 */ 845 ExitReason process() 846 { 847 assert(TaskFiber.getThis().m_yieldLockCount == 0, "May not process events within an active yieldLock()!"); 848 849 bool any_events = false; 850 while (true) { 851 debug (VibeTaskLog) logTrace("Scheduling before peeking new events..."); 852 // process pending tasks 853 bool any_tasks_processed = schedule() != ScheduleStatus.idle; 854 855 debug (VibeTaskLog) logTrace("Processing pending events..."); 856 ExitReason er = eventDriver.core.processEvents(0.seconds); 857 debug (VibeTaskLog) logTrace("Done: %s", er); 858 859 final switch (er) { 860 case ExitReason.exited: return ExitReason.exited; 861 case ExitReason.outOfWaiters: 862 if (!scheduledTaskCount) 863 return ExitReason.outOfWaiters; 864 break; 865 case ExitReason.timeout: 866 if (!scheduledTaskCount) 867 return any_events || any_tasks_processed ? ExitReason.idle : ExitReason.timeout; 868 break; 869 case ExitReason.idle: 870 any_events = true; 871 if (!scheduledTaskCount) 872 return ExitReason.idle; 873 break; 874 } 875 } 876 } 877 878 /** Performs a single round of scheduling, blocking if necessary. 879 880 Returns: 881 A reason is returned: 882 $(UL 883 $(LI `ExitReason.exit`: The event loop was exited due to a manual request) 884 $(LI `ExitReason.outOfWaiters`: There are no more scheduled 885 tasks or events, so the application would do nothing from 886 now on) 887 $(LI `ExitReason.idle`: All scheduled tasks and pending events 888 have been processed normally) 889 ) 890 */ 891 ExitReason waitAndProcess() 892 { 893 // first, process tasks without blocking 894 auto er = process(); 895 896 final switch (er) { 897 case ExitReason.exited, ExitReason.outOfWaiters: return er; 898 case ExitReason.idle: return ExitReason.idle; 899 case ExitReason.timeout: break; 900 } 901 902 // if the first run didn't process any events, block and 903 // process one chunk 904 debug (VibeTaskLog) logTrace("Wait for new events to process..."); 905 er = eventDriver.core.processEvents(Duration.max); 906 debug (VibeTaskLog) logTrace("Done: %s", er); 907 final switch (er) { 908 case ExitReason.exited: return ExitReason.exited; 909 case ExitReason.outOfWaiters: 910 if (!scheduledTaskCount) 911 return ExitReason.outOfWaiters; 912 break; 913 case ExitReason.timeout: assert(false, "Unexpected return code"); 914 case ExitReason.idle: break; 915 } 916 917 // finally, make sure that all scheduled tasks are run 918 er = process(); 919 if (er == ExitReason.timeout) return ExitReason.idle; 920 else return er; 921 } 922 923 void yieldUninterruptible() 924 { 925 auto t = Task.getThis(); 926 if (t == Task.init) return; // not really a task -> no-op 927 auto tf = () @trusted { return t.taskFiber; } (); 928 if (tf.m_queue !is null) return; // already scheduled to be resumed 929 doYieldAndReschedule(t); 930 } 931 932 /** Holds execution until the task gets explicitly resumed. 933 */ 934 void hibernate() 935 { 936 import vibe.core.core : isEventLoopRunning; 937 auto thist = Task.getThis(); 938 if (thist == Task.init) { 939 assert(!isEventLoopRunning, "Event processing outside of a fiber should only happen before the event loop is running!?"); 940 static import vibe.core.core; 941 vibe.core.core.runEventLoopOnce(); 942 } else { 943 doYield(thist); 944 } 945 } 946 947 /** Immediately switches execution to the specified task without giving up execution privilege. 948 949 This forces immediate execution of the specified task. After the tasks finishes or yields, 950 the calling task will continue execution. 951 */ 952 void switchTo(Task t, TaskSwitchPriority priority) 953 { 954 auto thist = Task.getThis(); 955 956 if (t == thist) return; 957 958 auto thisthr = thist ? thist.thread : () @trusted { return Thread.getThis(); } (); 959 assert(t.thread is thisthr, "Cannot switch to a task that lives in a different thread."); 960 961 auto tf = () @trusted { return t.taskFiber; } (); 962 if (tf.m_queue) { 963 // don't reset the position of already scheduled tasks 964 if (priority == TaskSwitchPriority.normal) return; 965 966 debug (VibeTaskLog) logTrace("Task to switch to is already scheduled. Moving to front of queue."); 967 assert(tf.m_queue is &m_taskQueue, "Task is already enqueued, but not in the main task queue."); 968 m_taskQueue.remove(tf); 969 assert(!tf.m_queue, "Task removed from queue, but still has one set!?"); 970 } 971 972 if (thist == Task.init && priority == TaskSwitchPriority.immediate) { 973 assert(TaskFiber.getThis().m_yieldLockCount == 0, "Cannot yield within an active yieldLock()!"); 974 debug (VibeTaskLog) logTrace("switch to task from global context"); 975 resumeTask(t); 976 debug (VibeTaskLog) logTrace("task yielded control back to global context"); 977 } else { 978 auto thistf = () @trusted { return thist.taskFiber; } (); 979 assert(!thistf || !thistf.m_queue, "Calling task is running, but scheduled to be resumed!?"); 980 981 debug (VibeTaskLog) logDebugV("Switching tasks (%s already in queue, prio=%s)", m_taskQueue.length, priority); 982 final switch (priority) { 983 case TaskSwitchPriority.normal: 984 reschedule(tf); 985 break; 986 case TaskSwitchPriority.prioritized: 987 tf.m_dynamicPriority = uint.max; 988 reschedule(tf); 989 break; 990 case TaskSwitchPriority.immediate: 991 tf.m_dynamicPriority = uint.max; 992 m_taskQueue.insertFront(thistf); 993 m_taskQueue.insertFront(tf); 994 doYield(thist); 995 break; 996 } 997 } 998 } 999 1000 /** Runs any pending tasks. 1001 1002 A pending tasks is a task that is scheduled to be resumed by either `yield` or 1003 `switchTo`. 1004 1005 Returns: 1006 Returns `true` $(I iff) there are more tasks left to process. 1007 */ 1008 ScheduleStatus schedule() 1009 nothrow { 1010 if (m_taskQueue.empty) 1011 return ScheduleStatus.idle; 1012 1013 1014 assert(Task.getThis() == Task.init, "TaskScheduler.schedule() may not be called from a task!"); 1015 1016 if (m_taskQueue.empty) return ScheduleStatus.idle; 1017 1018 foreach (i; 0 .. m_taskQueue.length) { 1019 auto t = m_taskQueue.front; 1020 m_taskQueue.popFront(); 1021 1022 // reset priority 1023 t.m_dynamicPriority = t.m_staticPriority; 1024 1025 debug (VibeTaskLog) logTrace("resuming task"); 1026 auto task = t.task; 1027 if (task != Task.init) 1028 resumeTask(t.task); 1029 debug (VibeTaskLog) logTrace("task out"); 1030 1031 if (m_taskQueue.empty) break; 1032 } 1033 1034 debug (VibeTaskLog) logDebugV("schedule finished - %s tasks left in queue", m_taskQueue.length); 1035 1036 return m_taskQueue.empty ? ScheduleStatus.allProcessed : ScheduleStatus.busy; 1037 } 1038 1039 /// Resumes execution of a yielded task. 1040 private void resumeTask(Task t) 1041 nothrow { 1042 import std.encoding : sanitize; 1043 1044 assert(t != Task.init, "Resuming null task"); 1045 1046 debug (VibeTaskLog) logTrace("task fiber resume"); 1047 auto uncaught_exception = () @trusted nothrow { return t.fiber.call!(Fiber.Rethrow.no)(); } (); 1048 debug (VibeTaskLog) logTrace("task fiber yielded"); 1049 1050 if (uncaught_exception) { 1051 auto th = cast(Throwable)uncaught_exception; 1052 assert(th, "Fiber returned exception object that is not a Throwable!?"); 1053 1054 assert(() @trusted nothrow { return t.fiber.state; } () == Fiber.State.TERM); 1055 th.logException("Task terminated with unhandled exception"); 1056 1057 // always pass Errors on 1058 if (auto err = cast(Error)th) throw err; 1059 } 1060 } 1061 1062 private void reschedule(TaskFiber tf) 1063 { 1064 import std.algorithm.comparison : min; 1065 1066 // insert according to priority, limited to a priority 1067 // factor of 1:10 in case of heavy concurrency 1068 m_taskQueue.insertBackPred(tf, 10, (t) { 1069 if (t.m_dynamicPriority >= tf.m_dynamicPriority) 1070 return true; 1071 1072 // increase dynamic priority each time a task gets overtaken to 1073 // ensure a fair schedule 1074 t.m_dynamicPriority += min(t.m_staticPriority, uint.max - t.m_dynamicPriority); 1075 return false; 1076 }); 1077 } 1078 1079 private void doYieldAndReschedule(Task task) 1080 { 1081 auto tf = () @trusted { return task.taskFiber; } (); 1082 1083 reschedule(tf); 1084 1085 doYield(task); 1086 } 1087 1088 private void doYield(Task task) 1089 { 1090 debug (VibeRunningTasks) () nothrow { 1091 try throw new Exception(""); 1092 catch (Exception e) { 1093 try TaskFiber.s_runningTasks[task.taskFiber] = e.info.toString; 1094 catch (Exception e) TaskFiber.s_runningTasks[task.taskFiber] = "(failed to get stack)"; 1095 } 1096 } (); 1097 1098 assert(() @trusted { return task.taskFiber; } ().m_yieldLockCount == 0, "May not yield while in an active yieldLock()!"); 1099 debug if (TaskFiber.ms_taskEventCallback) () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.yield, task); } (); 1100 static if (__VERSION__ >= 2090) { 1101 import core.memory : GC; 1102 assert(!GC.inFinalizer, "Yiedling within finalizer - this would most likely result in a dead-lock!"); 1103 } 1104 () @trusted { Fiber.yield(); } (); 1105 debug if (TaskFiber.ms_taskEventCallback) () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.resume, task); } (); 1106 assert(!task.m_fiber.m_queue, "Task is still scheduled after resumption."); 1107 } 1108 } 1109 1110 package enum ScheduleStatus { 1111 idle, 1112 allProcessed, 1113 busy 1114 } 1115 1116 private struct TaskFiberQueue { 1117 @safe nothrow: 1118 1119 TaskFiber first, last; 1120 size_t length; 1121 1122 @disable this(this); 1123 1124 @property bool empty() const { return first is null; } 1125 1126 @property TaskFiber front() { return first; } 1127 1128 void insertFront(TaskFiber task) @trusted 1129 { 1130 assert(task.m_queue is null, "Task is already scheduled to be resumed!"); 1131 assert(task.m_prev is null, "Task has m_prev set without being in a queue!?"); 1132 assert(task.m_next is null, "Task has m_next set without being in a queue!?"); 1133 task.m_queue = &this; 1134 if (empty) { 1135 first = task; 1136 last = task; 1137 } else { 1138 first.m_prev = task; 1139 task.m_next = first; 1140 first = task; 1141 } 1142 length++; 1143 } 1144 1145 void insertBack(TaskFiber task) @trusted 1146 { 1147 assert(task.m_queue is null, "Task is already scheduled to be resumed!"); 1148 assert(task.m_prev is null, "Task has m_prev set without being in a queue!?"); 1149 assert(task.m_next is null, "Task has m_next set without being in a queue!?"); 1150 task.m_queue = &this; 1151 if (empty) { 1152 first = task; 1153 last = task; 1154 } else { 1155 last.m_next = task; 1156 task.m_prev = last; 1157 last = task; 1158 } 1159 length++; 1160 } 1161 1162 // inserts a task after the first task for which the predicate yields `true`, 1163 // starting from the back. a maximum of max_skip tasks will be skipped 1164 // before the task is inserted regardless of the predicate. 1165 void insertBackPred(TaskFiber task, size_t max_skip, 1166 scope bool delegate(TaskFiber) @safe nothrow pred) @trusted 1167 { 1168 assert(task.m_queue is null, "Task is already scheduled to be resumed!"); 1169 assert(task.m_prev is null, "Task has m_prev set without being in a queue!?"); 1170 assert(task.m_next is null, "Task has m_next set without being in a queue!?"); 1171 1172 for (auto t = last; t; t = t.m_prev) { 1173 if (!max_skip-- || pred(t)) { 1174 task.m_queue = &this; 1175 task.m_next = t.m_next; 1176 if (task.m_next) task.m_next.m_prev = task; 1177 t.m_next = task; 1178 task.m_prev = t; 1179 if (!task.m_next) last = task; 1180 length++; 1181 return; 1182 } 1183 } 1184 1185 insertFront(task); 1186 } 1187 1188 void popFront() 1189 { 1190 if (first is last) last = null; 1191 assert(first && first.m_queue == &this, "Popping from empty or mismatching queue"); 1192 auto next = first.m_next; 1193 if (next) next.m_prev = null; 1194 first.m_next = null; 1195 first.m_queue = null; 1196 first = next; 1197 length--; 1198 } 1199 1200 void remove(TaskFiber task) 1201 { 1202 assert(task.m_queue is &this, "Task is not contained in task queue."); 1203 if (task.m_prev) task.m_prev.m_next = task.m_next; 1204 else first = task.m_next; 1205 if (task.m_next) task.m_next.m_prev = task.m_prev; 1206 else last = task.m_prev; 1207 task.m_queue = null; 1208 task.m_prev = null; 1209 task.m_next = null; 1210 length--; 1211 } 1212 } 1213 1214 unittest { 1215 auto f1 = new TaskFiber; 1216 auto f2 = new TaskFiber; 1217 1218 TaskFiberQueue q; 1219 assert(q.empty && q.length == 0); 1220 q.insertFront(f1); 1221 assert(!q.empty && q.length == 1); 1222 q.insertFront(f2); 1223 assert(!q.empty && q.length == 2); 1224 q.popFront(); 1225 assert(!q.empty && q.length == 1); 1226 q.popFront(); 1227 assert(q.empty && q.length == 0); 1228 q.insertFront(f1); 1229 q.remove(f1); 1230 assert(q.empty && q.length == 0); 1231 } 1232 1233 unittest { 1234 auto f1 = new TaskFiber; 1235 auto f2 = new TaskFiber; 1236 auto f3 = new TaskFiber; 1237 auto f4 = new TaskFiber; 1238 auto f5 = new TaskFiber; 1239 auto f6 = new TaskFiber; 1240 TaskFiberQueue q; 1241 1242 void checkQueue() 1243 { 1244 TaskFiber p; 1245 for (auto t = q.front; t; t = t.m_next) { 1246 assert(t.m_prev is p); 1247 assert(t.m_next || t is q.last); 1248 p = t; 1249 } 1250 1251 TaskFiber n; 1252 for (auto t = q.last; t; t = t.m_prev) { 1253 assert(t.m_next is n); 1254 assert(t.m_prev || t is q.first); 1255 n = t; 1256 } 1257 } 1258 1259 q.insertBackPred(f1, 0, delegate bool(tf) { assert(false); }); 1260 assert(q.first is f1 && q.last is f1); 1261 checkQueue(); 1262 1263 q.insertBackPred(f2, 0, delegate bool(tf) { assert(false); }); 1264 assert(q.first is f1 && q.last is f2); 1265 checkQueue(); 1266 1267 q.insertBackPred(f3, 1, (tf) => false); 1268 assert(q.first is f1 && q.last is f2); 1269 assert(f1.m_next is f3); 1270 assert(f3.m_prev is f1); 1271 checkQueue(); 1272 1273 q.insertBackPred(f4, 10, (tf) => false); 1274 assert(q.first is f4 && q.last is f2); 1275 checkQueue(); 1276 1277 q.insertBackPred(f5, 10, (tf) => true); 1278 assert(q.first is f4 && q.last is f5); 1279 checkQueue(); 1280 1281 q.insertBackPred(f6, 10, (tf) => tf is f4); 1282 assert(q.first is f4 && q.last is f5); 1283 assert(f4.m_next is f6); 1284 checkQueue(); 1285 } 1286 1287 private struct FLSInfo { 1288 void function(void[], size_t) fct; 1289 size_t offset; 1290 void destroy(void[] fls) { 1291 fct(fls, offset); 1292 } 1293 } 1294 1295 // mixin string helper to call a function with arguments that potentially have 1296 // to be moved 1297 package string callWithMove(ARGS...)(string func, string args) 1298 { 1299 import std.string; 1300 string ret = func ~ "("; 1301 foreach (i, T; ARGS) { 1302 if (i > 0) ret ~= ", "; 1303 ret ~= format("%s[%s]", args, i); 1304 static if (needsMove!T) ret ~= ".move"; 1305 } 1306 return ret ~ ");"; 1307 } 1308 1309 private template needsMove(T) 1310 { 1311 template isCopyable(T) 1312 { 1313 enum isCopyable = __traits(compiles, (T a) { return a; }); 1314 } 1315 1316 template isMoveable(T) 1317 { 1318 enum isMoveable = __traits(compiles, (T a) { return a.move; }); 1319 } 1320 1321 enum needsMove = !isCopyable!T; 1322 1323 static assert(isCopyable!T || isMoveable!T, 1324 "Non-copyable type "~T.stringof~" must be movable with a .move property."); 1325 } 1326 1327 unittest { 1328 enum E { a, move } 1329 static struct S { 1330 @disable this(this); 1331 @property S move() { return S.init; } 1332 } 1333 static struct T { @property T move() { return T.init; } } 1334 static struct U { } 1335 static struct V { 1336 @disable this(); 1337 @disable this(this); 1338 @property V move() { return V.init; } 1339 } 1340 static struct W { @disable this(); } 1341 1342 static assert(needsMove!S); 1343 static assert(!needsMove!int); 1344 static assert(!needsMove!string); 1345 static assert(!needsMove!E); 1346 static assert(!needsMove!T); 1347 static assert(!needsMove!U); 1348 static assert(needsMove!V); 1349 static assert(!needsMove!W); 1350 }