1 /** 2 Contains interfaces and enums for evented I/O drivers. 3 4 Copyright: © 2012-2020 Sönke Ludwig 5 Authors: Sönke Ludwig 6 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 7 */ 8 module vibe.core.task; 9 10 import vibe.core.log; 11 import vibe.core.sync; 12 13 import core.atomic : atomicOp, atomicLoad, cas; 14 import core.thread; 15 import std.exception; 16 import std.traits; 17 import std.typecons; 18 19 20 /** Represents a single task as started using vibe.core.runTask. 21 22 Note that the Task type is considered weakly isolated and thus can be 23 passed between threads using vibe.core.concurrency.send or by passing 24 it as a parameter to vibe.core.core.runWorkerTask. 25 */ 26 struct Task { 27 private { 28 shared(TaskFiber) m_fiber; 29 size_t m_taskCounter; 30 import std.concurrency : ThreadInfo, Tid; 31 static ThreadInfo s_tidInfo; 32 } 33 34 enum basePriority = 0x00010000; 35 36 private this(TaskFiber fiber, size_t task_counter) 37 @safe nothrow { 38 () @trusted { m_fiber = cast(shared)fiber; } (); 39 m_taskCounter = task_counter; 40 } 41 42 this(in Task other) 43 @safe nothrow { 44 m_fiber = () @trusted { return cast(shared(TaskFiber))other.m_fiber; } (); 45 m_taskCounter = other.m_taskCounter; 46 } 47 48 /** Returns the Task instance belonging to the calling task. 49 */ 50 static Task getThis() @safe nothrow 51 { 52 // In 2067, synchronized statements where annotated nothrow. 53 // DMD#4115, Druntime#1013, Druntime#1021, Phobos#2704 54 // However, they were "logically" nothrow before. 55 static if (__VERSION__ <= 2066) 56 scope (failure) assert(0, "Internal error: function should be nothrow"); 57 58 auto fiber = () @trusted { return Fiber.getThis(); } (); 59 if (!fiber) return Task.init; 60 auto tfiber = cast(TaskFiber)fiber; 61 if (!tfiber) return Task.init; 62 // FIXME: returning a non-.init handle for a finished task might break some layered logic 63 return Task(tfiber, tfiber.getTaskStatusFromOwnerThread().counter); 64 } 65 66 nothrow { 67 package @property inout(TaskFiber) taskFiber() inout @system { return cast(inout(TaskFiber))m_fiber; } 68 @property inout(Fiber) fiber() inout @system { return this.taskFiber; } 69 @property size_t taskCounter() const @safe { return m_taskCounter; } 70 @property inout(Thread) thread() inout @trusted { if (m_fiber) return this.taskFiber.thread; return null; } 71 72 /** Determines if the task is still running or scheduled to be run. 73 */ 74 @property bool running() 75 const @trusted { 76 assert(m_fiber !is null, "Invalid task handle"); 77 auto tf = this.taskFiber; 78 try if (tf.state == Fiber.State.TERM) return false; catch (Throwable) {} 79 auto st = m_fiber.getTaskStatus(); 80 if (st.counter != m_taskCounter) 81 return false; 82 return st.initialized; 83 } 84 85 package @property ref ThreadInfo tidInfo() @system { return m_fiber ? taskFiber.tidInfo : s_tidInfo; } // FIXME: this is not thread safe! 86 package @property ref const(ThreadInfo) tidInfo() const @system { return m_fiber ? taskFiber.tidInfo : s_tidInfo; } // FIXME: this is not thread safe! 87 88 /** Gets the `Tid` associated with this task for use with 89 `std.concurrency`. 90 */ 91 @property Tid tid() @trusted { return tidInfo.ident; } 92 /// ditto 93 @property const(Tid) tid() const @trusted { return tidInfo.ident; } 94 } 95 96 T opCast(T)() const @safe nothrow if (is(T == bool)) { return m_fiber !is null; } 97 98 void join() @trusted { if (m_fiber) m_fiber.join!true(m_taskCounter); } 99 void joinUninterruptible() @trusted nothrow { if (m_fiber) m_fiber.join!false(m_taskCounter); } 100 void interrupt() @trusted nothrow { if (m_fiber) m_fiber.interrupt(m_taskCounter); } 101 102 string toString() const @safe { import std..string; return format("%s:%s", () @trusted { return cast(void*)m_fiber; } (), m_taskCounter); } 103 104 void getDebugID(R)(ref R dst) 105 { 106 import std.digest.md : MD5; 107 import std.bitmanip : nativeToLittleEndian; 108 import std.base64 : Base64; 109 110 if (!m_fiber) { 111 dst.put("----"); 112 return; 113 } 114 115 MD5 md; 116 md.start(); 117 md.put(nativeToLittleEndian(() @trusted { return cast(size_t)cast(void*)m_fiber; } ())); 118 md.put(nativeToLittleEndian(m_taskCounter)); 119 Base64.encode(md.finish()[0 .. 3], dst); 120 if (!this.running) dst.put("-fin"); 121 } 122 string getDebugID() 123 @trusted { 124 import std.array : appender; 125 auto app = appender!string; 126 getDebugID(app); 127 return app.data; 128 } 129 130 bool opEquals(in ref Task other) const @safe nothrow { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; } 131 bool opEquals(in Task other) const @safe nothrow { return m_fiber is other.m_fiber && m_taskCounter == other.m_taskCounter; } 132 } 133 134 135 /** Settings to control the behavior of newly started tasks. 136 */ 137 struct TaskSettings { 138 /** Scheduling priority of the task 139 140 The priority of a task is roughly proportional to the amount of 141 times it gets scheduled in comparison to competing tasks. For 142 example, a task with priority 100 will be scheduled every 10 rounds 143 when competing against a task with priority 1000. 144 145 The default priority is defined by `basePriority` and has a value 146 of 65536. Priorities should be computed relative to `basePriority`. 147 148 A task with a priority of zero will only be executed if no other 149 non-zero task is competing. 150 */ 151 uint priority = Task.basePriority; 152 } 153 154 155 /** 156 Implements a task local storage variable. 157 158 Task local variables, similar to thread local variables, exist separately 159 in each task. Consequently, they do not need any form of synchronization 160 when accessing them. 161 162 Note, however, that each TaskLocal variable will increase the memory footprint 163 of any task that uses task local storage. There is also an overhead to access 164 TaskLocal variables, higher than for thread local variables, but generelly 165 still O(1) (since actual storage acquisition is done lazily the first access 166 can require a memory allocation with unknown computational costs). 167 168 Notice: 169 FiberLocal instances MUST be declared as static/global thread-local 170 variables. Defining them as a temporary/stack variable will cause 171 crashes or data corruption! 172 173 Examples: 174 --- 175 TaskLocal!string s_myString = "world"; 176 177 void taskFunc() 178 { 179 assert(s_myString == "world"); 180 s_myString = "hello"; 181 assert(s_myString == "hello"); 182 } 183 184 shared static this() 185 { 186 // both tasks will get independent storage for s_myString 187 runTask(&taskFunc); 188 runTask(&taskFunc); 189 } 190 --- 191 */ 192 struct TaskLocal(T) 193 { 194 private { 195 size_t m_offset = size_t.max; 196 size_t m_id; 197 T m_initValue; 198 bool m_hasInitValue = false; 199 } 200 201 this(T init_val) { m_initValue = init_val; m_hasInitValue = true; } 202 203 @disable this(this); 204 205 void opAssign(T value) { this.storage = value; } 206 207 @property ref T storage() 208 @safe { 209 import std.conv : emplace; 210 211 auto fiber = TaskFiber.getThis(); 212 213 // lazily register in FLS storage 214 if (m_offset == size_t.max) { 215 static assert(T.alignof <= 8, "Unsupported alignment for type "~T.stringof); 216 assert(TaskFiber.ms_flsFill % 8 == 0, "Misaligned fiber local storage pool."); 217 m_offset = TaskFiber.ms_flsFill; 218 m_id = TaskFiber.ms_flsCounter++; 219 220 221 TaskFiber.ms_flsFill += T.sizeof; 222 while (TaskFiber.ms_flsFill % 8 != 0) 223 TaskFiber.ms_flsFill++; 224 } 225 226 // make sure the current fiber has enough FLS storage 227 if (fiber.m_fls.length < TaskFiber.ms_flsFill) { 228 fiber.m_fls.length = TaskFiber.ms_flsFill + 128; 229 () @trusted { fiber.m_flsInit.length = TaskFiber.ms_flsCounter + 64; } (); 230 } 231 232 // return (possibly default initialized) value 233 auto data = () @trusted { return fiber.m_fls.ptr[m_offset .. m_offset+T.sizeof]; } (); 234 if (!() @trusted { return fiber.m_flsInit[m_id]; } ()) { 235 () @trusted { fiber.m_flsInit[m_id] = true; } (); 236 import std.traits : hasElaborateDestructor, hasAliasing; 237 static if (hasElaborateDestructor!T || hasAliasing!T) { 238 void function(void[], size_t) destructor = (void[] fls, size_t offset){ 239 static if (hasElaborateDestructor!T) { 240 auto obj = cast(T*)&fls[offset]; 241 // call the destructor on the object if a custom one is known declared 242 obj.destroy(); 243 } 244 else static if (hasAliasing!T) { 245 // zero the memory to avoid false pointers 246 foreach (size_t i; offset .. offset + T.sizeof) { 247 ubyte* u = cast(ubyte*)&fls[i]; 248 *u = 0; 249 } 250 } 251 }; 252 FLSInfo fls_info; 253 fls_info.fct = destructor; 254 fls_info.offset = m_offset; 255 256 // make sure flsInfo has enough space 257 if (TaskFiber.ms_flsInfo.length <= m_id) 258 TaskFiber.ms_flsInfo.length = m_id + 64; 259 260 TaskFiber.ms_flsInfo[m_id] = fls_info; 261 } 262 263 if (m_hasInitValue) { 264 static if (__traits(compiles, () @trusted { emplace!T(data, m_initValue); } ())) 265 () @trusted { emplace!T(data, m_initValue); } (); 266 else assert(false, "Cannot emplace initialization value for type "~T.stringof); 267 } else () @trusted { emplace!T(data); } (); 268 } 269 return *() @trusted { return cast(T*)data.ptr; } (); 270 } 271 272 alias storage this; 273 } 274 275 276 /** Exception that is thrown by Task.interrupt. 277 */ 278 class InterruptException : Exception { 279 this() 280 @safe nothrow { 281 super("Task interrupted."); 282 } 283 } 284 285 /** 286 High level state change events for a Task 287 */ 288 enum TaskEvent { 289 preStart, /// Just about to invoke the fiber which starts execution 290 postStart, /// After the fiber has returned for the first time (by yield or exit) 291 start, /// Just about to start execution 292 yield, /// Temporarily paused 293 resume, /// Resumed from a prior yield 294 end, /// Ended normally 295 fail /// Ended with an exception 296 } 297 298 struct TaskCreationInfo { 299 Task handle; 300 const(void)* functionPointer; 301 } 302 303 alias TaskEventCallback = void function(TaskEvent, Task) nothrow; 304 alias TaskCreationCallback = void function(ref TaskCreationInfo) nothrow @safe; 305 306 /** 307 The maximum combined size of all parameters passed to a task delegate 308 309 See_Also: runTask 310 */ 311 enum maxTaskParameterSize = 128; 312 313 314 /** The base class for a task aka Fiber. 315 316 This class represents a single task that is executed concurrently 317 with other tasks. Each task is owned by a single thread. 318 */ 319 final package class TaskFiber : Fiber { 320 static if ((void*).sizeof >= 8) enum defaultTaskStackSize = 16*1024*1024; 321 else enum defaultTaskStackSize = 512*1024; 322 323 private enum Flags { 324 running = 1UL << 0, 325 initialized = 1UL << 1, 326 interrupt = 1UL << 2, 327 328 shiftAmount = 3, 329 flagsMask = (1<<shiftAmount) - 1 330 } 331 332 private { 333 import std.concurrency : ThreadInfo; 334 import std.bitmanip : BitArray; 335 336 // task queue management (TaskScheduler.m_taskQueue) 337 TaskFiber m_prev, m_next; 338 TaskFiberQueue* m_queue; 339 340 Thread m_thread; 341 ThreadInfo m_tidInfo; 342 uint m_staticPriority, m_dynamicPriority; 343 shared ulong m_taskCounterAndFlags = 0; // bits 0-Flags.shiftAmount are flags 344 345 bool m_shutdown = false; 346 347 shared(ManualEvent) m_onExit; 348 349 // task local storage 350 BitArray m_flsInit; 351 void[] m_fls; 352 353 package int m_yieldLockCount; 354 355 static TaskFiber ms_globalDummyFiber; 356 static FLSInfo[] ms_flsInfo; 357 static size_t ms_flsFill = 0; // thread-local 358 static size_t ms_flsCounter = 0; 359 } 360 361 362 package TaskFuncInfo m_taskFunc; 363 package __gshared size_t ms_taskStackSize = defaultTaskStackSize; 364 package __gshared debug TaskEventCallback ms_taskEventCallback; 365 package __gshared debug TaskCreationCallback ms_taskCreationCallback; 366 367 this() 368 @trusted nothrow { 369 super(&run, ms_taskStackSize); 370 m_onExit = createSharedManualEvent(); 371 m_thread = Thread.getThis(); 372 } 373 374 static TaskFiber getThis() 375 @safe nothrow { 376 auto f = () @trusted nothrow { return Fiber.getThis(); } (); 377 if (auto tf = cast(TaskFiber)f) return tf; 378 if (!ms_globalDummyFiber) ms_globalDummyFiber = new TaskFiber; 379 return ms_globalDummyFiber; 380 } 381 382 // expose Fiber.state as @safe on older DMD versions 383 static if (!__traits(compiles, () @safe { return Fiber.init.state; } ())) 384 @property State state() @trusted const nothrow { return super.state; } 385 386 private void run() 387 nothrow { 388 import std.algorithm.mutation : swap; 389 import std.concurrency : Tid, thisTid; 390 import std.encoding : sanitize; 391 import vibe.core.core : isEventLoopRunning, recycleFiber, taskScheduler, yield, yieldLock; 392 393 version (VibeDebugCatchAll) alias UncaughtException = Throwable; 394 else alias UncaughtException = Exception; 395 try { 396 while (true) { 397 while (!m_taskFunc.func) { 398 try { 399 debug (VibeTaskLog) logTrace("putting fiber to sleep waiting for new task..."); 400 Fiber.yield(); 401 } catch (Exception e) { 402 logWarn("CoreTaskFiber was resumed with exception but without active task!"); 403 logDiagnostic("Full error: %s", e.toString().sanitize()); 404 } 405 if (m_shutdown) return; 406 } 407 408 debug assert(Thread.getThis() is m_thread, "Fiber moved between threads!?"); 409 410 TaskFuncInfo task; 411 swap(task, m_taskFunc); 412 m_dynamicPriority = m_staticPriority = task.settings.priority; 413 Task handle = this.task; 414 try { 415 atomicOp!"|="(m_taskCounterAndFlags, Flags.running); // set running 416 scope(exit) atomicOp!"&="(m_taskCounterAndFlags, ~Flags.flagsMask); // clear running/initialized 417 418 thisTid; // force creation of a message box 419 420 debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.start, handle); 421 if (!isEventLoopRunning) { 422 debug (VibeTaskLog) logTrace("Event loop not running at task start - yielding."); 423 taskScheduler.yieldUninterruptible(); 424 debug (VibeTaskLog) logTrace("Initial resume of task."); 425 } 426 task.call(); 427 debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.end, handle); 428 429 debug if (() @trusted { return (cast(shared)this); } ().getTaskStatus().interrupt) 430 logDebug("Task exited while an interrupt was in flight."); 431 } catch (Exception e) { 432 debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.fail, handle); 433 import std.encoding; 434 logCritical("Task terminated with uncaught exception: %s", e.msg); 435 logDebug("Full error: %s", e.toString().sanitize()); 436 } 437 438 debug assert(Thread.getThis() is m_thread, "Fiber moved?"); 439 440 this.tidInfo.ident = Tid.init; // clear message box 441 442 debug (VibeTaskLog) logTrace("Notifying joining tasks."); 443 444 // Issue #161: This fiber won't be resumed before the next task 445 // is assigned, because it is already marked as de-initialized. 446 // Since ManualEvent.emit() will need to switch tasks, this 447 // would mean that only the first waiter is notified before 448 // this fiber gets a new task assigned. 449 // Using a yield lock forces all corresponding tasks to be 450 // enqueued into the schedule queue and resumed in sequence 451 // at the end of the scope. 452 auto l = yieldLock(); 453 454 m_onExit.emit(); 455 456 // make sure that the task does not get left behind in the yielder queue if terminated during yield() 457 if (m_queue) m_queue.remove(this); 458 459 // zero the fls initialization ByteArray for memory safety 460 foreach (size_t i, ref bool b; m_flsInit) { 461 if (b) { 462 if (ms_flsInfo !is null && ms_flsInfo.length >= i && ms_flsInfo[i] != FLSInfo.init) 463 ms_flsInfo[i].destroy(m_fls); 464 b = false; 465 } 466 } 467 468 assert(!m_queue, "Fiber done but still scheduled to be resumed!?"); 469 470 debug assert(Thread.getThis() is m_thread, "Fiber moved between threads!?"); 471 472 // make the fiber available for the next task 473 recycleFiber(this); 474 } 475 } catch(UncaughtException th) { 476 logCritical("CoreTaskFiber was terminated unexpectedly: %s", th.msg); 477 logDiagnostic("Full error: %s", th.toString().sanitize()); 478 } catch (Throwable th) { 479 import std.stdio : stderr, writeln; 480 import core.stdc.stdlib : abort; 481 try stderr.writeln(th); 482 catch (Exception e) { 483 try stderr.writeln(th.msg); 484 catch (Exception e) {} 485 } 486 abort(); 487 } 488 } 489 490 491 /** Returns the thread that owns this task. 492 */ 493 @property inout(Thread) thread() inout @safe nothrow { return m_thread; } 494 495 /** Returns the handle of the current Task running on this fiber. 496 */ 497 @property Task task() 498 @safe nothrow { 499 auto ts = getTaskStatusFromOwnerThread(); 500 if (!ts.initialized) return Task.init; 501 return Task(this, ts.counter); 502 } 503 504 @property ref inout(ThreadInfo) tidInfo() inout @safe nothrow { return m_tidInfo; } 505 506 /** Shuts down the task handler loop. 507 */ 508 void shutdown() 509 @safe nothrow { 510 debug assert(Thread.getThis() is m_thread); 511 512 assert(!() @trusted { return cast(shared)this; } ().getTaskStatus().initialized); 513 514 m_shutdown = true; 515 while (state != Fiber.State.TERM) 516 () @trusted { 517 try call(Fiber.Rethrow.no); 518 catch (Exception e) assert(false, e.msg); 519 } (); 520 } 521 522 /** Blocks until the task has ended. 523 */ 524 void join(bool interruptiple)(size_t task_counter) 525 shared @trusted { 526 auto cnt = m_onExit.emitCount; 527 while (true) { 528 auto st = getTaskStatus(); 529 if (!st.initialized || st.counter != task_counter) 530 break; 531 static if (interruptiple) 532 cnt = m_onExit.wait(cnt); 533 else 534 cnt = m_onExit.waitUninterruptible(cnt); 535 } 536 } 537 538 /** Throws an InterruptExeption within the task as soon as it calls an interruptible function. 539 */ 540 void interrupt(size_t task_counter) 541 shared @safe nothrow { 542 import vibe.core.core : taskScheduler; 543 544 auto caller = () @trusted { return cast(shared)TaskFiber.getThis(); } (); 545 546 assert(caller !is this, "A task cannot interrupt itself."); 547 548 while (true) { 549 auto tcf = atomicLoad(m_taskCounterAndFlags); 550 auto st = getTaskStatus(tcf); 551 if (!st.initialized || st.interrupt || st.counter != task_counter) 552 return; 553 auto tcf_int = tcf | Flags.interrupt; 554 if (cas(&m_taskCounterAndFlags, tcf, tcf_int)) 555 break; 556 } 557 558 if (caller.m_thread is m_thread) { 559 auto thisus = () @trusted { return cast()this; } (); 560 debug (VibeTaskLog) logTrace("Resuming task with interrupt flag."); 561 auto defer = caller.m_yieldLockCount > 0; 562 taskScheduler.switchTo(thisus.task, defer ? TaskSwitchPriority.prioritized : TaskSwitchPriority.immediate); 563 } else { 564 debug (VibeTaskLog) logTrace("Set interrupt flag on task without resuming."); 565 } 566 } 567 568 /** Sets the fiber to initialized state and increments the task counter. 569 570 Note that the task information needs to be set up first. 571 */ 572 void bumpTaskCounter() 573 @safe nothrow { 574 debug { 575 auto ts = atomicLoad(m_taskCounterAndFlags); 576 assert((ts & Flags.flagsMask) == 0, "bumpTaskCounter() called on fiber with non-zero flags"); 577 assert(m_taskFunc.func !is null, "bumpTaskCounter() called without initializing the task function"); 578 } 579 580 () @trusted { atomicOp!"+="(m_taskCounterAndFlags, (1 << Flags.shiftAmount) + Flags.initialized); } (); 581 } 582 583 private auto getTaskStatus() 584 shared const @safe nothrow { 585 return getTaskStatus(atomicLoad(m_taskCounterAndFlags)); 586 } 587 588 private auto getTaskStatusFromOwnerThread() 589 const @safe nothrow { 590 debug assert(Thread.getThis() is m_thread); 591 return getTaskStatus(atomicLoad(m_taskCounterAndFlags)); 592 } 593 594 private static auto getTaskStatus(ulong counter_and_flags) 595 @safe nothrow { 596 static struct S { 597 size_t counter; 598 bool running; 599 bool initialized; 600 bool interrupt; 601 } 602 S ret; 603 ret.counter = cast(size_t)(counter_and_flags >> Flags.shiftAmount); 604 ret.running = (counter_and_flags & Flags.running) != 0; 605 ret.initialized = (counter_and_flags & Flags.initialized) != 0; 606 ret.interrupt = (counter_and_flags & Flags.interrupt) != 0; 607 return ret; 608 } 609 610 package void handleInterrupt(scope void delegate() @safe nothrow on_interrupt) 611 @safe nothrow { 612 assert(() @trusted { return Task.getThis().fiber; } () is this, 613 "Handling interrupt outside of the corresponding fiber."); 614 if (getTaskStatusFromOwnerThread().interrupt && on_interrupt) { 615 debug (VibeTaskLog) logTrace("Handling interrupt flag."); 616 clearInterruptFlag(); 617 on_interrupt(); 618 } 619 } 620 621 package void handleInterrupt() 622 @safe { 623 assert(() @trusted { return Task.getThis().fiber; } () is this, 624 "Handling interrupt outside of the corresponding fiber."); 625 if (getTaskStatusFromOwnerThread().interrupt) { 626 clearInterruptFlag(); 627 throw new InterruptException; 628 } 629 } 630 631 private void clearInterruptFlag() 632 @safe nothrow { 633 auto tcf = atomicLoad(m_taskCounterAndFlags); 634 auto st = getTaskStatus(tcf); 635 while (true) { 636 assert(st.initialized); 637 if (!st.interrupt) break; 638 auto tcf_int = tcf & ~Flags.interrupt; 639 if (cas(&m_taskCounterAndFlags, tcf, tcf_int)) 640 break; 641 } 642 } 643 } 644 645 646 /** Controls the priority to use for switching execution to a task. 647 */ 648 enum TaskSwitchPriority { 649 /** Rescheduled according to the tasks priority 650 */ 651 normal, 652 653 /** Rescheduled with maximum priority. 654 655 The task will resume as soon as the current task yields. 656 */ 657 prioritized, 658 659 /** Switch to the task immediately. 660 */ 661 immediate 662 } 663 664 package struct TaskFuncInfo { 665 void function(ref TaskFuncInfo) func; 666 void[2*size_t.sizeof] callable; 667 void[maxTaskParameterSize] args; 668 debug ulong functionPointer; 669 TaskSettings settings; 670 671 void set(CALLABLE, ARGS...)(ref CALLABLE callable, ref ARGS args) 672 { 673 assert(!func, "Setting TaskFuncInfo that is already set."); 674 675 import std.algorithm : move; 676 import std.traits : hasElaborateAssign; 677 import std.conv : to; 678 679 static struct TARGS { ARGS expand; } 680 681 static assert(CALLABLE.sizeof <= TaskFuncInfo.callable.length, 682 "Storage required for task callable is too large ("~CALLABLE.sizeof~" vs max "~callable.length~"): "~CALLABLE.stringof); 683 static assert(TARGS.sizeof <= maxTaskParameterSize, 684 "The arguments passed to run(Worker)Task must not exceed "~ 685 maxTaskParameterSize.to!string~" bytes in total size: "~TARGS.sizeof.stringof~" bytes"); 686 687 debug functionPointer = callPointer(callable); 688 689 static void callDelegate(ref TaskFuncInfo tfi) { 690 assert(tfi.func is &callDelegate, "Wrong callDelegate called!?"); 691 692 // copy original call data to stack 693 CALLABLE c; 694 TARGS args; 695 move(*(cast(CALLABLE*)tfi.callable.ptr), c); 696 move(*(cast(TARGS*)tfi.args.ptr), args); 697 698 // reset the info 699 tfi.func = null; 700 701 // make the call 702 mixin(callWithMove!ARGS("c", "args.expand")); 703 } 704 705 func = &callDelegate; 706 707 () @trusted { 708 static if (hasElaborateAssign!CALLABLE) initCallable!CALLABLE(); 709 static if (hasElaborateAssign!TARGS) initArgs!TARGS(); 710 typedCallable!CALLABLE = callable; 711 foreach (i, A; ARGS) { 712 static if (needsMove!A) args[i].move(typedArgs!TARGS.expand[i]); 713 else typedArgs!TARGS.expand[i] = args[i]; 714 } 715 } (); 716 } 717 718 void call() 719 { 720 this.func(this); 721 } 722 723 @property ref C typedCallable(C)() 724 { 725 static assert(C.sizeof <= callable.sizeof); 726 return *cast(C*)callable.ptr; 727 } 728 729 @property ref A typedArgs(A)() 730 { 731 static assert(A.sizeof <= args.sizeof); 732 return *cast(A*)args.ptr; 733 } 734 735 void initCallable(C)() 736 nothrow { 737 static const C cinit; 738 this.callable[0 .. C.sizeof] = cast(void[])(&cinit)[0 .. 1]; 739 } 740 741 void initArgs(A)() 742 nothrow { 743 static const A ainit; 744 this.args[0 .. A.sizeof] = cast(void[])(&ainit)[0 .. 1]; 745 } 746 } 747 748 private ulong callPointer(C)(ref C callable) 749 @trusted nothrow @nogc { 750 alias IP = ulong; 751 static if (is(C == function)) return cast(IP)cast(void*)callable; 752 else static if (is(C == delegate)) return cast(IP)callable.funcptr; 753 else static if (is(typeof(&callable.opCall) == function)) return cast(IP)cast(void*)&callable.opCall; 754 else static if (is(typeof(&callable.opCall) == delegate)) return cast(IP)(&callable.opCall).funcptr; 755 else return cast(IP)&callable; 756 } 757 758 package struct TaskScheduler { 759 import eventcore.driver : ExitReason; 760 import eventcore.core : eventDriver; 761 762 private { 763 TaskFiberQueue m_taskQueue; 764 } 765 766 @safe: 767 768 @disable this(this); 769 770 @property size_t scheduledTaskCount() const nothrow { return m_taskQueue.length; } 771 772 /** Lets other pending tasks execute before continuing execution. 773 774 This will give other tasks or events a chance to be processed. If 775 multiple tasks call this function, they will be processed in a 776 fírst-in-first-out manner. 777 */ 778 void yield() 779 { 780 auto t = Task.getThis(); 781 if (t == Task.init) return; // not really a task -> no-op 782 auto tf = () @trusted { return t.taskFiber; } (); 783 debug (VibeTaskLog) logTrace("Yielding (interrupt=%s)", () @trusted { return (cast(shared)tf).getTaskStatus().interrupt; } ()); 784 tf.handleInterrupt(); 785 if (tf.m_queue !is null) return; // already scheduled to be resumed 786 doYieldAndReschedule(t); 787 tf.handleInterrupt(); 788 } 789 790 nothrow: 791 792 /** Performs a single round of scheduling without blocking. 793 794 This will execute scheduled tasks and process events from the 795 event queue, as long as possible without having to wait. 796 797 Returns: 798 A reason is returned: 799 $(UL 800 $(LI `ExitReason.exit`: The event loop was exited due to a manual request) 801 $(LI `ExitReason.outOfWaiters`: There are no more scheduled 802 tasks or events, so the application would do nothing from 803 now on) 804 $(LI `ExitReason.idle`: All scheduled tasks and pending events 805 have been processed normally) 806 $(LI `ExitReason.timeout`: Scheduled tasks have been processed, 807 but there were no pending events present.) 808 ) 809 */ 810 ExitReason process() 811 { 812 assert(TaskFiber.getThis().m_yieldLockCount == 0, "May not process events within an active yieldLock()!"); 813 814 bool any_events = false; 815 while (true) { 816 // process pending tasks 817 bool any_tasks_processed = schedule() != ScheduleStatus.idle; 818 819 debug (VibeTaskLog) logTrace("Processing pending events..."); 820 ExitReason er = eventDriver.core.processEvents(0.seconds); 821 debug (VibeTaskLog) logTrace("Done."); 822 823 final switch (er) { 824 case ExitReason.exited: return ExitReason.exited; 825 case ExitReason.outOfWaiters: 826 if (!scheduledTaskCount) 827 return ExitReason.outOfWaiters; 828 break; 829 case ExitReason.timeout: 830 if (!scheduledTaskCount) 831 return any_events || any_tasks_processed ? ExitReason.idle : ExitReason.timeout; 832 break; 833 case ExitReason.idle: 834 any_events = true; 835 if (!scheduledTaskCount) 836 return ExitReason.idle; 837 break; 838 } 839 } 840 } 841 842 /** Performs a single round of scheduling, blocking if necessary. 843 844 Returns: 845 A reason is returned: 846 $(UL 847 $(LI `ExitReason.exit`: The event loop was exited due to a manual request) 848 $(LI `ExitReason.outOfWaiters`: There are no more scheduled 849 tasks or events, so the application would do nothing from 850 now on) 851 $(LI `ExitReason.idle`: All scheduled tasks and pending events 852 have been processed normally) 853 ) 854 */ 855 ExitReason waitAndProcess() 856 { 857 // first, process tasks without blocking 858 auto er = process(); 859 860 final switch (er) { 861 case ExitReason.exited, ExitReason.outOfWaiters: return er; 862 case ExitReason.idle: return ExitReason.idle; 863 case ExitReason.timeout: break; 864 } 865 866 // if the first run didn't process any events, block and 867 // process one chunk 868 debug (VibeTaskLog) logTrace("Wait for new events to process..."); 869 er = eventDriver.core.processEvents(Duration.max); 870 debug (VibeTaskLog) logTrace("Done."); 871 final switch (er) { 872 case ExitReason.exited: return ExitReason.exited; 873 case ExitReason.outOfWaiters: 874 if (!scheduledTaskCount) 875 return ExitReason.outOfWaiters; 876 break; 877 case ExitReason.timeout: assert(false, "Unexpected return code"); 878 case ExitReason.idle: break; 879 } 880 881 // finally, make sure that all scheduled tasks are run 882 er = process(); 883 if (er == ExitReason.timeout) return ExitReason.idle; 884 else return er; 885 } 886 887 void yieldUninterruptible() 888 { 889 auto t = Task.getThis(); 890 if (t == Task.init) return; // not really a task -> no-op 891 auto tf = () @trusted { return t.taskFiber; } (); 892 if (tf.m_queue !is null) return; // already scheduled to be resumed 893 doYieldAndReschedule(t); 894 } 895 896 /** Holds execution until the task gets explicitly resumed. 897 */ 898 void hibernate() 899 { 900 import vibe.core.core : isEventLoopRunning; 901 auto thist = Task.getThis(); 902 if (thist == Task.init) { 903 assert(!isEventLoopRunning, "Event processing outside of a fiber should only happen before the event loop is running!?"); 904 static import vibe.core.core; 905 vibe.core.core.runEventLoopOnce(); 906 } else { 907 doYield(thist); 908 } 909 } 910 911 /** Immediately switches execution to the specified task without giving up execution privilege. 912 913 This forces immediate execution of the specified task. After the tasks finishes or yields, 914 the calling task will continue execution. 915 */ 916 void switchTo(Task t, TaskSwitchPriority priority) 917 { 918 auto thist = Task.getThis(); 919 920 if (t == thist) return; 921 922 auto thisthr = thist ? thist.thread : () @trusted { return Thread.getThis(); } (); 923 assert(t.thread is thisthr, "Cannot switch to a task that lives in a different thread."); 924 925 auto tf = () @trusted { return t.taskFiber; } (); 926 if (tf.m_queue) { 927 // don't reset the position of already scheduled tasks 928 if (priority == TaskSwitchPriority.normal) return; 929 930 debug (VibeTaskLog) logTrace("Task to switch to is already scheduled. Moving to front of queue."); 931 assert(tf.m_queue is &m_taskQueue, "Task is already enqueued, but not in the main task queue."); 932 m_taskQueue.remove(tf); 933 assert(!tf.m_queue, "Task removed from queue, but still has one set!?"); 934 } 935 936 if (thist == Task.init && priority == TaskSwitchPriority.immediate) { 937 assert(TaskFiber.getThis().m_yieldLockCount == 0, "Cannot yield within an active yieldLock()!"); 938 debug (VibeTaskLog) logTrace("switch to task from global context"); 939 resumeTask(t); 940 debug (VibeTaskLog) logTrace("task yielded control back to global context"); 941 } else { 942 auto thistf = () @trusted { return thist.taskFiber; } (); 943 assert(!thistf || !thistf.m_queue, "Calling task is running, but scheduled to be resumed!?"); 944 945 debug (VibeTaskLog) logDebugV("Switching tasks (%s already in queue)", m_taskQueue.length); 946 final switch (priority) { 947 case TaskSwitchPriority.normal: 948 reschedule(tf); 949 break; 950 case TaskSwitchPriority.prioritized: 951 tf.m_dynamicPriority = uint.max; 952 reschedule(tf); 953 break; 954 case TaskSwitchPriority.immediate: 955 tf.m_dynamicPriority = uint.max; 956 m_taskQueue.insertFront(thistf); 957 m_taskQueue.insertFront(tf); 958 doYield(thist); 959 break; 960 } 961 } 962 } 963 964 /** Runs any pending tasks. 965 966 A pending tasks is a task that is scheduled to be resumed by either `yield` or 967 `switchTo`. 968 969 Returns: 970 Returns `true` $(I iff) there are more tasks left to process. 971 */ 972 ScheduleStatus schedule() 973 nothrow { 974 if (m_taskQueue.empty) 975 return ScheduleStatus.idle; 976 977 978 assert(Task.getThis() == Task.init, "TaskScheduler.schedule() may not be called from a task!"); 979 980 if (m_taskQueue.empty) return ScheduleStatus.idle; 981 982 foreach (i; 0 .. m_taskQueue.length) { 983 auto t = m_taskQueue.front; 984 m_taskQueue.popFront(); 985 986 // reset priority 987 t.m_dynamicPriority = t.m_staticPriority; 988 989 debug (VibeTaskLog) logTrace("resuming task"); 990 auto task = t.task; 991 if (task != Task.init) 992 resumeTask(t.task); 993 debug (VibeTaskLog) logTrace("task out"); 994 995 if (m_taskQueue.empty) break; 996 } 997 998 debug (VibeTaskLog) logDebugV("schedule finished - %s tasks left in queue", m_taskQueue.length); 999 1000 return m_taskQueue.empty ? ScheduleStatus.allProcessed : ScheduleStatus.busy; 1001 } 1002 1003 /// Resumes execution of a yielded task. 1004 private void resumeTask(Task t) 1005 nothrow { 1006 import std.encoding : sanitize; 1007 1008 assert(t != Task.init, "Resuming null task"); 1009 1010 debug (VibeTaskLog) logTrace("task fiber resume"); 1011 auto uncaught_exception = () @trusted nothrow { return t.fiber.call!(Fiber.Rethrow.no)(); } (); 1012 debug (VibeTaskLog) logTrace("task fiber yielded"); 1013 1014 if (uncaught_exception) { 1015 auto th = cast(Throwable)uncaught_exception; 1016 assert(th, "Fiber returned exception object that is not a Throwable!?"); 1017 1018 assert(() @trusted nothrow { return t.fiber.state; } () == Fiber.State.TERM); 1019 logError("Task terminated with unhandled exception: %s", th.msg); 1020 logDebug("Full error: %s", () @trusted { return th.toString().sanitize; } ()); 1021 1022 // always pass Errors on 1023 if (auto err = cast(Error)th) throw err; 1024 } 1025 } 1026 1027 private void reschedule(TaskFiber tf) 1028 { 1029 import std.algorithm.comparison : min; 1030 1031 // insert according to priority, limited to a priority 1032 // factor of 1:10 in case of heavy concurrency 1033 m_taskQueue.insertBackPred(tf, 10, (t) { 1034 if (t.m_dynamicPriority >= tf.m_dynamicPriority) 1035 return true; 1036 1037 // increase dynamic priority each time a task gets overtaken to 1038 // ensure a fair schedule 1039 t.m_dynamicPriority += min(t.m_staticPriority, uint.max - t.m_dynamicPriority); 1040 return false; 1041 }); 1042 } 1043 1044 private void doYieldAndReschedule(Task task) 1045 { 1046 auto tf = () @trusted { return task.taskFiber; } (); 1047 1048 reschedule(tf); 1049 1050 doYield(task); 1051 } 1052 1053 private void doYield(Task task) 1054 { 1055 assert(() @trusted { return task.taskFiber; } ().m_yieldLockCount == 0, "May not yield while in an active yieldLock()!"); 1056 debug if (TaskFiber.ms_taskEventCallback) () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.yield, task); } (); 1057 () @trusted { Fiber.yield(); } (); 1058 debug if (TaskFiber.ms_taskEventCallback) () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.resume, task); } (); 1059 assert(!task.m_fiber.m_queue, "Task is still scheduled after resumption."); 1060 } 1061 } 1062 1063 package enum ScheduleStatus { 1064 idle, 1065 allProcessed, 1066 busy 1067 } 1068 1069 private struct TaskFiberQueue { 1070 @safe nothrow: 1071 1072 TaskFiber first, last; 1073 size_t length; 1074 1075 @disable this(this); 1076 1077 @property bool empty() const { return first is null; } 1078 1079 @property TaskFiber front() { return first; } 1080 1081 void insertFront(TaskFiber task) 1082 { 1083 assert(task.m_queue is null, "Task is already scheduled to be resumed!"); 1084 assert(task.m_prev is null, "Task has m_prev set without being in a queue!?"); 1085 assert(task.m_next is null, "Task has m_next set without being in a queue!?"); 1086 task.m_queue = &this; 1087 if (empty) { 1088 first = task; 1089 last = task; 1090 } else { 1091 first.m_prev = task; 1092 task.m_next = first; 1093 first = task; 1094 } 1095 length++; 1096 } 1097 1098 void insertBack(TaskFiber task) 1099 { 1100 assert(task.m_queue is null, "Task is already scheduled to be resumed!"); 1101 assert(task.m_prev is null, "Task has m_prev set without being in a queue!?"); 1102 assert(task.m_next is null, "Task has m_next set without being in a queue!?"); 1103 task.m_queue = &this; 1104 if (empty) { 1105 first = task; 1106 last = task; 1107 } else { 1108 last.m_next = task; 1109 task.m_prev = last; 1110 last = task; 1111 } 1112 length++; 1113 } 1114 1115 // inserts a task after the first task for which the predicate yields `true`, 1116 // starting from the back. a maximum of max_skip tasks will be skipped 1117 // before the task is inserted regardless of the predicate. 1118 void insertBackPred(TaskFiber task, size_t max_skip, 1119 scope bool delegate(TaskFiber) @safe nothrow pred) 1120 { 1121 assert(task.m_queue is null, "Task is already scheduled to be resumed!"); 1122 assert(task.m_prev is null, "Task has m_prev set without being in a queue!?"); 1123 assert(task.m_next is null, "Task has m_next set without being in a queue!?"); 1124 1125 for (auto t = last; t; t = t.m_prev) { 1126 if (!max_skip-- || pred(t)) { 1127 task.m_queue = &this; 1128 task.m_next = t.m_next; 1129 if (task.m_next) task.m_next.m_prev = task; 1130 t.m_next = task; 1131 task.m_prev = t; 1132 if (!task.m_next) last = task; 1133 length++; 1134 return; 1135 } 1136 } 1137 1138 insertFront(task); 1139 } 1140 1141 void popFront() 1142 { 1143 if (first is last) last = null; 1144 assert(first && first.m_queue == &this, "Popping from empty or mismatching queue"); 1145 auto next = first.m_next; 1146 if (next) next.m_prev = null; 1147 first.m_next = null; 1148 first.m_queue = null; 1149 first = next; 1150 length--; 1151 } 1152 1153 void remove(TaskFiber task) 1154 { 1155 assert(task.m_queue is &this, "Task is not contained in task queue."); 1156 if (task.m_prev) task.m_prev.m_next = task.m_next; 1157 else first = task.m_next; 1158 if (task.m_next) task.m_next.m_prev = task.m_prev; 1159 else last = task.m_prev; 1160 task.m_queue = null; 1161 task.m_prev = null; 1162 task.m_next = null; 1163 length--; 1164 } 1165 } 1166 1167 unittest { 1168 auto f1 = new TaskFiber; 1169 auto f2 = new TaskFiber; 1170 1171 TaskFiberQueue q; 1172 assert(q.empty && q.length == 0); 1173 q.insertFront(f1); 1174 assert(!q.empty && q.length == 1); 1175 q.insertFront(f2); 1176 assert(!q.empty && q.length == 2); 1177 q.popFront(); 1178 assert(!q.empty && q.length == 1); 1179 q.popFront(); 1180 assert(q.empty && q.length == 0); 1181 q.insertFront(f1); 1182 q.remove(f1); 1183 assert(q.empty && q.length == 0); 1184 } 1185 1186 unittest { 1187 auto f1 = new TaskFiber; 1188 auto f2 = new TaskFiber; 1189 auto f3 = new TaskFiber; 1190 auto f4 = new TaskFiber; 1191 auto f5 = new TaskFiber; 1192 auto f6 = new TaskFiber; 1193 TaskFiberQueue q; 1194 1195 void checkQueue() 1196 { 1197 TaskFiber p; 1198 for (auto t = q.front; t; t = t.m_next) { 1199 assert(t.m_prev is p); 1200 assert(t.m_next || t is q.last); 1201 p = t; 1202 } 1203 1204 TaskFiber n; 1205 for (auto t = q.last; t; t = t.m_prev) { 1206 assert(t.m_next is n); 1207 assert(t.m_prev || t is q.first); 1208 n = t; 1209 } 1210 } 1211 1212 q.insertBackPred(f1, 0, delegate bool(tf) { assert(false); }); 1213 assert(q.first is f1 && q.last is f1); 1214 checkQueue(); 1215 1216 q.insertBackPred(f2, 0, delegate bool(tf) { assert(false); }); 1217 assert(q.first is f1 && q.last is f2); 1218 checkQueue(); 1219 1220 q.insertBackPred(f3, 1, (tf) => false); 1221 assert(q.first is f1 && q.last is f2); 1222 assert(f1.m_next is f3); 1223 assert(f3.m_prev is f1); 1224 checkQueue(); 1225 1226 q.insertBackPred(f4, 10, (tf) => false); 1227 assert(q.first is f4 && q.last is f2); 1228 checkQueue(); 1229 1230 q.insertBackPred(f5, 10, (tf) => true); 1231 assert(q.first is f4 && q.last is f5); 1232 checkQueue(); 1233 1234 q.insertBackPred(f6, 10, (tf) => tf is f4); 1235 assert(q.first is f4 && q.last is f5); 1236 assert(f4.m_next is f6); 1237 checkQueue(); 1238 } 1239 1240 private struct FLSInfo { 1241 void function(void[], size_t) fct; 1242 size_t offset; 1243 void destroy(void[] fls) { 1244 fct(fls, offset); 1245 } 1246 } 1247 1248 // mixin string helper to call a function with arguments that potentially have 1249 // to be moved 1250 package string callWithMove(ARGS...)(string func, string args) 1251 { 1252 import std..string; 1253 string ret = func ~ "("; 1254 foreach (i, T; ARGS) { 1255 if (i > 0) ret ~= ", "; 1256 ret ~= format("%s[%s]", args, i); 1257 static if (needsMove!T) ret ~= ".move"; 1258 } 1259 return ret ~ ");"; 1260 } 1261 1262 private template needsMove(T) 1263 { 1264 template isCopyable(T) 1265 { 1266 enum isCopyable = __traits(compiles, (T a) { return a; }); 1267 } 1268 1269 template isMoveable(T) 1270 { 1271 enum isMoveable = __traits(compiles, (T a) { return a.move; }); 1272 } 1273 1274 enum needsMove = !isCopyable!T; 1275 1276 static assert(isCopyable!T || isMoveable!T, 1277 "Non-copyable type "~T.stringof~" must be movable with a .move property."); 1278 } 1279 1280 unittest { 1281 enum E { a, move } 1282 static struct S { 1283 @disable this(this); 1284 @property S move() { return S.init; } 1285 } 1286 static struct T { @property T move() { return T.init; } } 1287 static struct U { } 1288 static struct V { 1289 @disable this(); 1290 @disable this(this); 1291 @property V move() { return V.init; } 1292 } 1293 static struct W { @disable this(); } 1294 1295 static assert(needsMove!S); 1296 static assert(!needsMove!int); 1297 static assert(!needsMove!string); 1298 static assert(!needsMove!E); 1299 static assert(!needsMove!T); 1300 static assert(!needsMove!U); 1301 static assert(needsMove!V); 1302 static assert(!needsMove!W); 1303 }