1 /** 2 This module contains the core functionality of the vibe.d framework. 3 4 Copyright: © 2012-2020 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.core.core; 9 10 public import vibe.core.task; 11 12 import eventcore.core; 13 import vibe.core.args; 14 import vibe.core.concurrency; 15 import vibe.core.internal.release; 16 import vibe.core.log; 17 import vibe.core.sync : ManualEvent, createSharedManualEvent; 18 import vibe.core.taskpool : TaskPool; 19 import vibe.internal.async; 20 import vibe.internal.array : FixedRingBuffer; 21 //import vibe.utils.array; 22 import std.algorithm; 23 import std.conv; 24 import std.encoding; 25 import core.exception; 26 import std.exception; 27 import std.functional; 28 import std.range : empty, front, popFront; 29 import std..string; 30 import std.traits : isFunctionPointer; 31 import std.typecons : Flag, Yes, Typedef, Tuple, tuple; 32 import core.atomic; 33 import core.sync.condition; 34 import core.sync.mutex; 35 import core.stdc.stdlib; 36 import core.thread; 37 38 version(Posix) 39 { 40 import core.sys.posix.signal; 41 import core.sys.posix.unistd; 42 import core.sys.posix.pwd; 43 44 static if (__traits(compiles, {import core.sys.posix.grp; getgrgid(0);})) { 45 import core.sys.posix.grp; 46 } else { 47 extern (C) { 48 struct group { 49 char* gr_name; 50 char* gr_passwd; 51 gid_t gr_gid; 52 char** gr_mem; 53 } 54 group* getgrgid(gid_t); 55 group* getgrnam(in char*); 56 } 57 } 58 } 59 60 version (Windows) 61 { 62 import core.stdc.signal; 63 } 64 65 66 /**************************************************************************************************/ 67 /* Public functions */ 68 /**************************************************************************************************/ 69 70 /** 71 Performs final initialization and runs the event loop. 72 73 This function performs three tasks: 74 $(OL 75 $(LI Makes sure that no unrecognized command line options are passed to 76 the application and potentially displays command line help. See also 77 `vibe.core.args.finalizeCommandLineOptions`.) 78 $(LI Performs privilege lowering if required.) 79 $(LI Runs the event loop and blocks until it finishes.) 80 ) 81 82 Params: 83 args_out = Optional parameter to receive unrecognized command line 84 arguments. If left to `null`, an error will be reported if 85 any unrecognized argument is passed. 86 87 See_also: ` vibe.core.args.finalizeCommandLineOptions`, `lowerPrivileges`, 88 `runEventLoop` 89 */ 90 int runApplication(string[]* args_out = null) 91 @safe { 92 try if (!() @trusted { return finalizeCommandLineOptions(args_out); } () ) return 0; 93 catch (Exception e) { 94 logDiagnostic("Error processing command line: %s", e.msg); 95 return 1; 96 } 97 98 lowerPrivileges(); 99 100 logDiagnostic("Running event loop..."); 101 int status; 102 version (VibeDebugCatchAll) { 103 try { 104 status = runEventLoop(); 105 } catch( Throwable th ){ 106 logError("Unhandled exception in event loop: %s", th.msg); 107 logDiagnostic("Full exception: %s", th.toString().sanitize()); 108 return 1; 109 } 110 } else { 111 status = runEventLoop(); 112 } 113 114 logDiagnostic("Event loop exited with status %d.", status); 115 return status; 116 } 117 118 /// A simple echo server, listening on a privileged TCP port. 119 unittest { 120 import vibe.core.core; 121 import vibe.core.net; 122 123 int main() 124 { 125 // first, perform any application specific setup (privileged ports still 126 // available if run as root) 127 listenTCP(7, (conn) { 128 try conn.write(conn); 129 catch (Exception e) { /* log error */ } 130 }); 131 132 // then use runApplication to perform the remaining initialization and 133 // to run the event loop 134 return runApplication(); 135 } 136 } 137 138 /** The same as above, but performing the initialization sequence manually. 139 140 This allows to skip any additional initialization (opening the listening 141 port) if an invalid command line argument or the `--help` switch is 142 passed to the application. 143 */ 144 unittest { 145 import vibe.core.core; 146 import vibe.core.net; 147 148 int main() 149 { 150 // process the command line first, to be able to skip the application 151 // setup if not required 152 if (!finalizeCommandLineOptions()) return 0; 153 154 // then set up the application 155 listenTCP(7, (conn) { 156 try conn.write(conn); 157 catch (Exception e) { /* log error */ } 158 }); 159 160 // finally, perform privilege lowering (safe to skip for non-server 161 // applications) 162 lowerPrivileges(); 163 164 // and start the event loop 165 return runEventLoop(); 166 } 167 } 168 169 /** 170 Starts the vibe.d event loop for the calling thread. 171 172 Note that this function is usually called automatically by the vibe.d 173 framework. However, if you provide your own `main()` function, you may need 174 to call either this or `runApplication` manually. 175 176 The event loop will by default continue running during the whole life time 177 of the application, but calling `runEventLoop` multiple times in sequence 178 is allowed. Tasks will be started and handled only while the event loop is 179 running. 180 181 Returns: 182 The returned value is the suggested code to return to the operating 183 system from the `main` function. 184 185 See_Also: `runApplication` 186 */ 187 int runEventLoop() 188 @safe nothrow { 189 setupSignalHandlers(); 190 191 logDebug("Starting event loop."); 192 s_eventLoopRunning = true; 193 scope (exit) { 194 eventDriver.core.clearExitFlag(); 195 s_eventLoopRunning = false; 196 s_exitEventLoop = false; 197 if (s_isMainThread) atomicStore(st_term, false); 198 () @trusted nothrow { 199 scope (failure) assert(false); // notifyAll is not marked nothrow 200 st_threadShutdownCondition.notifyAll(); 201 } (); 202 } 203 204 // runs any yield()ed tasks first 205 assert(!s_exitEventLoop, "Exit flag set before event loop has started."); 206 s_exitEventLoop = false; 207 performIdleProcessing(); 208 if (getExitFlag()) return 0; 209 210 Task exit_task; 211 212 // handle exit flag in the main thread to exit when 213 // exitEventLoop(true) is called from a thread) 214 () @trusted nothrow { 215 if (s_isMainThread) 216 exit_task = runTask(toDelegate(&watchExitFlag)); 217 } (); 218 219 while (true) { 220 auto er = s_scheduler.waitAndProcess(); 221 if (er != ExitReason.idle || s_exitEventLoop) { 222 logDebug("Event loop exit reason (exit flag=%s): %s", s_exitEventLoop, er); 223 break; 224 } 225 performIdleProcessing(); 226 } 227 228 // make sure the exit flag watch task finishes together with this loop 229 // TODO: would be nice to do this without exceptions 230 if (exit_task && exit_task.running) 231 exit_task.interrupt(); 232 233 logDebug("Event loop done (scheduled tasks=%s, waiters=%s, thread exit=%s).", 234 s_scheduler.scheduledTaskCount, eventDriver.core.waiterCount, s_exitEventLoop); 235 return 0; 236 } 237 238 /** 239 Stops the currently running event loop. 240 241 Calling this function will cause the event loop to stop event processing and 242 the corresponding call to runEventLoop() will return to its caller. 243 244 Params: 245 shutdown_all_threads = If true, exits event loops of all threads - 246 false by default. Note that the event loops of all threads are 247 automatically stopped when the main thread exits, so usually 248 there is no need to set shutdown_all_threads to true. 249 */ 250 void exitEventLoop(bool shutdown_all_threads = false) 251 @safe nothrow { 252 logDebug("exitEventLoop called (%s)", shutdown_all_threads); 253 254 assert(s_eventLoopRunning || shutdown_all_threads, "Exiting event loop when none is running."); 255 if (shutdown_all_threads) { 256 () @trusted nothrow { 257 shutdownWorkerPool(); 258 atomicStore(st_term, true); 259 st_threadsSignal.emit(); 260 } (); 261 } 262 263 // shutdown the calling thread 264 s_exitEventLoop = true; 265 if (s_eventLoopRunning) eventDriver.core.exit(); 266 } 267 268 /** 269 Process all pending events without blocking. 270 271 Checks if events are ready to trigger immediately, and run their callbacks if so. 272 273 Returns: Returns false $(I iff) exitEventLoop was called in the process. 274 */ 275 bool processEvents() 276 @safe nothrow { 277 return !s_scheduler.process().among(ExitReason.exited, ExitReason.outOfWaiters); 278 } 279 280 /** 281 Wait once for events and process them. 282 */ 283 ExitReason runEventLoopOnce() 284 @safe nothrow { 285 auto ret = s_scheduler.waitAndProcess(); 286 if (ret == ExitReason.idle) 287 performIdleProcessing(); 288 return ret; 289 } 290 291 /** 292 Sets a callback that is called whenever no events are left in the event queue. 293 294 The callback delegate is called whenever all events in the event queue have been 295 processed. Returning true from the callback will cause another idle event to 296 be triggered immediately after processing any events that have arrived in the 297 meantime. Returning false will instead wait until another event has arrived first. 298 */ 299 void setIdleHandler(void delegate() @safe nothrow del) 300 @safe nothrow { 301 s_idleHandler = () @safe nothrow { del(); return false; }; 302 } 303 /// ditto 304 void setIdleHandler(bool delegate() @safe nothrow del) 305 @safe nothrow { 306 s_idleHandler = del; 307 } 308 309 /** 310 Runs a new asynchronous task. 311 312 task will be called synchronously from within the vibeRunTask call. It will 313 continue to run until vibeYield() or any of the I/O or wait functions is 314 called. 315 316 Note that the maximum size of all args must not exceed `maxTaskParameterSize`. 317 */ 318 Task runTask(ARGS...)(void delegate(ARGS) @safe task, auto ref ARGS args) 319 { 320 return runTask_internal!((ref tfi) { tfi.set(task, args); }); 321 } 322 /// ditto 323 Task runTask(ARGS...)(void delegate(ARGS) @system task, auto ref ARGS args) 324 @system { 325 return runTask_internal!((ref tfi) { tfi.set(task, args); }); 326 } 327 /// ditto 328 Task runTask(CALLABLE, ARGS...)(CALLABLE task, auto ref ARGS args) 329 if (!is(CALLABLE : void delegate(ARGS)) && is(typeof(CALLABLE.init(ARGS.init)))) 330 { 331 return runTask_internal!((ref tfi) { tfi.set(task, args); }); 332 } 333 /// ditto 334 Task runTask(ARGS...)(TaskSettings settings, void delegate(ARGS) @safe task, auto ref ARGS args) 335 { 336 return runTask_internal!((ref tfi) { 337 tfi.settings = settings; 338 tfi.set(task, args); 339 }); 340 } 341 /// ditto 342 Task runTask(ARGS...)(TaskSettings settings, void delegate(ARGS) @system task, auto ref ARGS args) 343 @system { 344 return runTask_internal!((ref tfi) { 345 tfi.settings = settings; 346 tfi.set(task, args); 347 }); 348 } 349 /// ditto 350 Task runTask(CALLABLE, ARGS...)(TaskSettings settings, CALLABLE task, auto ref ARGS args) 351 if (!is(CALLABLE : void delegate(ARGS)) && is(typeof(CALLABLE.init(ARGS.init)))) 352 { 353 return runTask_internal!((ref tfi) { 354 tfi.settings = settings; 355 tfi.set(task, args); 356 }); 357 } 358 359 360 unittest { // test proportional priority scheduling 361 auto tm = setTimer(1000.msecs, { assert(false, "Test timeout"); }); 362 scope (exit) tm.stop(); 363 364 size_t a, b; 365 auto t1 = runTask(TaskSettings(1), { while (a + b < 100) { a++; yield(); } }); 366 auto t2 = runTask(TaskSettings(10), { while (a + b < 100) { b++; yield(); } }); 367 runTask({ 368 t1.join(); 369 t2.join(); 370 exitEventLoop(); 371 }); 372 runEventLoop(); 373 assert(a + b == 100); 374 assert(b.among(90, 91, 92)); // expect 1:10 ratio +-1 375 } 376 377 378 /** 379 Runs an asyncronous task that is guaranteed to finish before the caller's 380 scope is left. 381 */ 382 auto runTaskScoped(FT, ARGS)(scope FT callable, ARGS args) 383 { 384 static struct S { 385 Task handle; 386 387 @disable this(this); 388 389 ~this() 390 { 391 handle.joinUninterruptible(); 392 } 393 } 394 395 return S(runTask(callable, args)); 396 } 397 398 package Task runTask_internal(alias TFI_SETUP)() 399 { 400 import std.typecons : Tuple, tuple; 401 402 TaskFiber f; 403 while (!f && !s_availableFibers.empty) { 404 f = s_availableFibers.back; 405 s_availableFibers.popBack(); 406 if (() @trusted nothrow { return f.state; } () != Fiber.State.HOLD) f = null; 407 } 408 409 if (f is null) { 410 // if there is no fiber available, create one. 411 if (s_availableFibers.capacity == 0) s_availableFibers.capacity = 1024; 412 logDebugV("Creating new fiber..."); 413 f = new TaskFiber; 414 } 415 416 TFI_SETUP(f.m_taskFunc); 417 418 f.bumpTaskCounter(); 419 auto handle = f.task(); 420 421 debug if (TaskFiber.ms_taskCreationCallback) { 422 TaskCreationInfo info; 423 info.handle = handle; 424 info.functionPointer = () @trusted { return cast(void*)f.m_taskFunc.functionPointer; } (); 425 () @trusted { TaskFiber.ms_taskCreationCallback(info); } (); 426 } 427 428 debug if (TaskFiber.ms_taskEventCallback) { 429 () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.preStart, handle); } (); 430 } 431 432 switchToTask(handle); 433 434 debug if (TaskFiber.ms_taskEventCallback) { 435 () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.postStart, handle); } (); 436 } 437 438 return handle; 439 } 440 441 unittest { // ensure task.running is true directly after runTask 442 Task t; 443 bool hit = false; 444 { 445 auto l = yieldLock(); 446 t = runTask({ hit = true; }); 447 assert(!hit); 448 assert(t.running); 449 } 450 t.join(); 451 assert(!t.running); 452 assert(hit); 453 } 454 455 456 /** 457 Runs a new asynchronous task in a worker thread. 458 459 Only function pointers with weakly isolated arguments are allowed to be 460 able to guarantee thread-safety. 461 */ 462 void runWorkerTask(FT, ARGS...)(FT func, auto ref ARGS args) 463 if (isFunctionPointer!FT) 464 { 465 setupWorkerThreads(); 466 st_workerPool.runTask(func, args); 467 } 468 469 /// ditto 470 void runWorkerTask(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args) 471 if (is(typeof(__traits(getMember, object, __traits(identifier, method))))) 472 { 473 setupWorkerThreads(); 474 st_workerPool.runTask!method(object, args); 475 } 476 /// ditto 477 void runWorkerTask(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args) 478 if (isFunctionPointer!FT) 479 { 480 setupWorkerThreads(); 481 st_workerPool.runTask(settings, func, args); 482 } 483 484 /// ditto 485 void runWorkerTask(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args) 486 if (is(typeof(__traits(getMember, object, __traits(identifier, method))))) 487 { 488 setupWorkerThreads(); 489 st_workerPool.runTask!method(settings, object, args); 490 } 491 492 /** 493 Runs a new asynchronous task in a worker thread, returning the task handle. 494 495 This function will yield and wait for the new task to be created and started 496 in the worker thread, then resume and return it. 497 498 Only function pointers with weakly isolated arguments are allowed to be 499 able to guarantee thread-safety. 500 */ 501 Task runWorkerTaskH(FT, ARGS...)(FT func, auto ref ARGS args) 502 if (isFunctionPointer!FT) 503 { 504 setupWorkerThreads(); 505 return st_workerPool.runTaskH(func, args); 506 } 507 /// ditto 508 Task runWorkerTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args) 509 if (is(typeof(__traits(getMember, object, __traits(identifier, method))))) 510 { 511 setupWorkerThreads(); 512 return st_workerPool.runTaskH!method(object, args); 513 } 514 /// ditto 515 Task runWorkerTaskH(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args) 516 if (isFunctionPointer!FT) 517 { 518 setupWorkerThreads(); 519 return st_workerPool.runTaskH(settings, func, args); 520 } 521 /// ditto 522 Task runWorkerTaskH(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args) 523 if (is(typeof(__traits(getMember, object, __traits(identifier, method))))) 524 { 525 setupWorkerThreads(); 526 return st_workerPool.runTaskH!method(settings, object, args); 527 } 528 529 /// Running a worker task using a function 530 unittest { 531 static void workerFunc(int param) 532 { 533 logInfo("Param: %s", param); 534 } 535 536 static void test() 537 { 538 runWorkerTask(&workerFunc, 42); 539 runWorkerTask(&workerFunc, cast(ubyte)42); // implicit conversion #719 540 runWorkerTaskDist(&workerFunc, 42); 541 runWorkerTaskDist(&workerFunc, cast(ubyte)42); // implicit conversion #719 542 } 543 } 544 545 /// Running a worker task using a class method 546 unittest { 547 static class Test { 548 void workerMethod(int param) 549 shared { 550 logInfo("Param: %s", param); 551 } 552 } 553 554 static void test() 555 { 556 auto cls = new shared Test; 557 runWorkerTask!(Test.workerMethod)(cls, 42); 558 runWorkerTask!(Test.workerMethod)(cls, cast(ubyte)42); // #719 559 runWorkerTaskDist!(Test.workerMethod)(cls, 42); 560 runWorkerTaskDist!(Test.workerMethod)(cls, cast(ubyte)42); // #719 561 } 562 } 563 564 /// Running a worker task using a function and communicating with it 565 unittest { 566 static void workerFunc(Task caller) 567 { 568 int counter = 10; 569 while (receiveOnly!string() == "ping" && --counter) { 570 logInfo("pong"); 571 caller.send("pong"); 572 } 573 caller.send("goodbye"); 574 575 } 576 577 static void test() 578 { 579 Task callee = runWorkerTaskH(&workerFunc, Task.getThis); 580 do { 581 logInfo("ping"); 582 callee.send("ping"); 583 } while (receiveOnly!string() == "pong"); 584 } 585 586 static void work719(int) {} 587 static void test719() { runWorkerTaskH(&work719, cast(ubyte)42); } 588 } 589 590 /// Running a worker task using a class method and communicating with it 591 unittest { 592 static class Test { 593 void workerMethod(Task caller) shared { 594 int counter = 10; 595 while (receiveOnly!string() == "ping" && --counter) { 596 logInfo("pong"); 597 caller.send("pong"); 598 } 599 caller.send("goodbye"); 600 } 601 } 602 603 static void test() 604 { 605 auto cls = new shared Test; 606 Task callee = runWorkerTaskH!(Test.workerMethod)(cls, Task.getThis()); 607 do { 608 logInfo("ping"); 609 callee.send("ping"); 610 } while (receiveOnly!string() == "pong"); 611 } 612 613 static class Class719 { 614 void work(int) shared {} 615 } 616 static void test719() { 617 auto cls = new shared Class719; 618 runWorkerTaskH!(Class719.work)(cls, cast(ubyte)42); 619 } 620 } 621 622 unittest { // run and join local task from outside of a task 623 int i = 0; 624 auto t = runTask({ sleep(1.msecs); i = 1; }); 625 t.join(); 626 assert(i == 1); 627 } 628 629 unittest { // run and join worker task from outside of a task 630 __gshared int i = 0; 631 auto t = runWorkerTaskH({ sleep(5.msecs); i = 1; }); 632 t.join(); 633 assert(i == 1); 634 } 635 636 637 /** 638 Runs a new asynchronous task in all worker threads concurrently. 639 640 This function is mainly useful for long-living tasks that distribute their 641 work across all CPU cores. Only function pointers with weakly isolated 642 arguments are allowed to be able to guarantee thread-safety. 643 644 The number of tasks started is guaranteed to be equal to 645 `workerThreadCount`. 646 */ 647 void runWorkerTaskDist(FT, ARGS...)(FT func, auto ref ARGS args) 648 if (is(typeof(*func) == function)) 649 { 650 setupWorkerThreads(); 651 return st_workerPool.runTaskDist(func, args); 652 } 653 /// ditto 654 void runWorkerTaskDist(alias method, T, ARGS...)(shared(T) object, ARGS args) 655 { 656 setupWorkerThreads(); 657 return st_workerPool.runTaskDist!method(object, args); 658 } 659 /// ditto 660 void runWorkerTaskDist(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args) 661 if (is(typeof(*func) == function)) 662 { 663 setupWorkerThreads(); 664 return st_workerPool.runTaskDist(settings, func, args); 665 } 666 /// ditto 667 void runWorkerTaskDist(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, ARGS args) 668 { 669 setupWorkerThreads(); 670 return st_workerPool.runTaskDist!method(settings, object, args); 671 } 672 673 674 /** Runs a new asynchronous task in all worker threads and returns the handles. 675 676 `on_handle` is a callble that takes a `Task` as its only argument and is 677 called for every task instance that gets created. 678 679 See_also: `runWorkerTaskDist` 680 */ 681 void runWorkerTaskDistH(HCB, FT, ARGS...)(scope HCB on_handle, FT func, auto ref ARGS args) 682 if (is(typeof(*func) == function)) 683 { 684 setupWorkerThreads(); 685 st_workerPool.runTaskDistH(on_handle, func, args); 686 } 687 /// ditto 688 void runWorkerTaskDistH(HCB, FT, ARGS...)(TaskSettings settings, scope HCB on_handle, FT func, auto ref ARGS args) 689 if (is(typeof(*func) == function)) 690 { 691 setupWorkerThreads(); 692 st_workerPool.runTaskDistH(settings, on_handle, func, args); 693 } 694 695 696 /** 697 Sets up num worker threads. 698 699 This function gives explicit control over the number of worker threads. 700 Note, to have an effect the function must be called prior to related worker 701 tasks functions which set up the default number of worker threads 702 implicitly. 703 704 Params: 705 num = The number of worker threads to initialize. Defaults to 706 `logicalProcessorCount`. 707 See_also: `runWorkerTask`, `runWorkerTaskH`, `runWorkerTaskDist` 708 */ 709 public void setupWorkerThreads(uint num = logicalProcessorCount()) 710 @safe { 711 static bool s_workerThreadsStarted = false; 712 if (s_workerThreadsStarted) return; 713 s_workerThreadsStarted = true; 714 715 () @trusted { 716 synchronized (st_threadsMutex) { 717 if (!st_workerPool) 718 st_workerPool = new shared TaskPool(num); 719 } 720 } (); 721 } 722 723 724 /** 725 Determines the number of logical processors in the system. 726 727 This number includes virtual cores on hyper-threading enabled CPUs. 728 */ 729 public @property uint logicalProcessorCount() 730 { 731 import std.parallelism : totalCPUs; 732 return totalCPUs; 733 } 734 735 736 /** 737 Suspends the execution of the calling task to let other tasks and events be 738 handled. 739 740 Calling this function in short intervals is recommended if long CPU 741 computations are carried out by a task. It can also be used in conjunction 742 with Signals to implement cross-fiber events with no polling. 743 744 Throws: 745 May throw an `InterruptException` if `Task.interrupt()` gets called on 746 the calling task. 747 */ 748 void yield() 749 @safe { 750 auto t = Task.getThis(); 751 if (t != Task.init) { 752 auto tf = () @trusted { return t.taskFiber; } (); 753 tf.handleInterrupt(); 754 s_scheduler.yield(); 755 tf.handleInterrupt(); 756 } else { 757 // Let yielded tasks execute 758 assert(TaskFiber.getThis().m_yieldLockCount == 0, "May not yield within an active yieldLock()!"); 759 () @safe nothrow { performIdleProcessingOnce(true); } (); 760 } 761 } 762 763 unittest { 764 size_t ti; 765 auto t = runTask({ 766 for (ti = 0; ti < 10; ti++) 767 yield(); 768 }); 769 770 foreach (i; 0 .. 5) yield(); 771 assert(ti > 0 && ti < 10, "Task did not interleave with yield loop outside of task"); 772 773 t.join(); 774 assert(ti == 10); 775 } 776 777 778 /** 779 Suspends the execution of the calling task until `switchToTask` is called 780 manually. 781 782 This low-level scheduling function is usually only used internally. Failure 783 to call `switchToTask` will result in task starvation and resource leakage. 784 785 Params: 786 on_interrupt = If specified, is required to 787 788 See_Also: `switchToTask` 789 */ 790 void hibernate(scope void delegate() @safe nothrow on_interrupt = null) 791 @safe nothrow { 792 auto t = Task.getThis(); 793 if (t == Task.init) { 794 assert(TaskFiber.getThis().m_yieldLockCount == 0, "May not yield within an active yieldLock!"); 795 runEventLoopOnce(); 796 } else { 797 auto tf = () @trusted { return t.taskFiber; } (); 798 tf.handleInterrupt(on_interrupt); 799 s_scheduler.hibernate(); 800 tf.handleInterrupt(on_interrupt); 801 } 802 } 803 804 805 /** 806 Switches execution to the given task. 807 808 This function can be used in conjunction with `hibernate` to wake up a 809 task. The task must live in the same thread as the caller. 810 811 If no priority is specified, `TaskSwitchPriority.prioritized` or 812 `TaskSwitchPriority.immediate` will be used, depending on whether a 813 yield lock is currently active. 814 815 Note that it is illegal to use `TaskSwitchPriority.immediate` if a yield 816 lock is active. 817 818 This function must only be called on tasks that belong to the calling 819 thread and have previously been hibernated! 820 821 See_Also: `hibernate`, `yieldLock` 822 */ 823 void switchToTask(Task t) 824 @safe nothrow { 825 auto defer = TaskFiber.getThis().m_yieldLockCount > 0; 826 s_scheduler.switchTo(t, defer ? TaskSwitchPriority.prioritized : TaskSwitchPriority.immediate); 827 } 828 /// ditto 829 void switchToTask(Task t, TaskSwitchPriority priority) 830 @safe nothrow { 831 s_scheduler.switchTo(t, priority); 832 } 833 834 835 /** 836 Suspends the execution of the calling task for the specified amount of time. 837 838 Note that other tasks of the same thread will continue to run during the 839 wait time, in contrast to $(D core.thread.Thread.sleep), which shouldn't be 840 used in vibe.d applications. 841 842 Throws: May throw an `InterruptException` if the task gets interrupted using 843 `Task.interrupt()`. 844 */ 845 void sleep(Duration timeout) 846 @safe { 847 assert(timeout >= 0.seconds, "Argument to sleep must not be negative."); 848 if (timeout <= 0.seconds) return; 849 auto tm = setTimer(timeout, null); 850 tm.wait(); 851 } 852 /// 853 unittest { 854 import vibe.core.core : sleep; 855 import vibe.core.log : logInfo; 856 import core.time : msecs; 857 858 void test() 859 { 860 logInfo("Sleeping for half a second..."); 861 sleep(500.msecs); 862 logInfo("Done sleeping."); 863 } 864 } 865 866 867 /** 868 Returns a new armed timer. 869 870 Note that timers can only work if an event loop is running, explicitly or 871 implicitly by running a blocking operation, such as `sleep` or `File.read`. 872 873 Params: 874 timeout = Determines the minimum amount of time that elapses before the timer fires. 875 callback = If non-`null`, this delegate will be called when the timer fires 876 periodic = Speficies if the timer fires repeatedly or only once 877 878 Returns: 879 Returns a Timer object that can be used to identify and modify the timer. 880 881 See_also: `createTimer` 882 */ 883 Timer setTimer(Duration timeout, Timer.Callback callback, bool periodic = false) 884 @safe nothrow { 885 auto tm = createTimer(callback); 886 tm.rearm(timeout, periodic); 887 return tm; 888 } 889 /// 890 unittest { 891 void printTime() 892 @safe nothrow { 893 import std.datetime; 894 logInfo("The time is: %s", Clock.currTime()); 895 } 896 897 void test() 898 { 899 import vibe.core.core; 900 // start a periodic timer that prints the time every second 901 setTimer(1.seconds, toDelegate(&printTime), true); 902 } 903 } 904 905 /// Compatibility overload - use a `@safe nothrow` callback instead. 906 Timer setTimer(Duration timeout, void delegate() callback, bool periodic = false) 907 @system nothrow { 908 return setTimer(timeout, () @trusted nothrow { 909 try callback(); 910 catch (Exception e) { 911 logWarn("Timer callback failed: %s", e.msg); 912 scope (failure) assert(false); 913 logDebug("Full error: %s", e.toString().sanitize); 914 } 915 }, periodic); 916 } 917 918 919 /** Creates a new timer without arming it. 920 921 Each time `callback` gets invoked, it will be run inside of a newly started 922 task. 923 924 Params: 925 callback = If non-`null`, this delegate will be called when the timer 926 fires 927 928 See_also: `createLeanTimer`, `setTimer` 929 */ 930 Timer createTimer(void delegate() nothrow @safe callback = null) 931 @safe nothrow { 932 static struct C { 933 void delegate() nothrow @safe callback; 934 void opCall() nothrow @safe { runTask(callback); } 935 } 936 937 if (callback) { 938 C c = {callback}; 939 return createLeanTimer(c); 940 } 941 942 return createLeanTimer!(Timer.Callback)(null); 943 } 944 945 946 /** Creates a new timer with a lean callback mechanism. 947 948 In contrast to the standard `createTimer`, `callback` will not be called 949 in a new task, but is instead called directly in the context of the event 950 loop. 951 952 For this reason, the supplied callback is not allowed to perform any 953 operation that needs to block/yield execution. In this case, `runTask` 954 needs to be used explicitly to perform the operation asynchronously. 955 956 Additionally, `callback` can carry arbitrary state without requiring a heap 957 allocation. 958 959 See_also: `createTimer` 960 */ 961 Timer createLeanTimer(CALLABLE)(CALLABLE callback) 962 if (is(typeof(() @safe nothrow { callback(); } ()))) 963 { 964 return Timer.create(eventDriver.timers.create(), callback); 965 } 966 967 968 /** 969 Creates an event to wait on an existing file descriptor. 970 971 The file descriptor usually needs to be a non-blocking socket for this to 972 work. 973 974 Params: 975 file_descriptor = The Posix file descriptor to watch 976 event_mask = Specifies which events will be listened for 977 978 Returns: 979 Returns a newly created FileDescriptorEvent associated with the given 980 file descriptor. 981 */ 982 FileDescriptorEvent createFileDescriptorEvent(int file_descriptor, FileDescriptorEvent.Trigger event_mask) 983 @safe nothrow { 984 return FileDescriptorEvent(file_descriptor, event_mask); 985 } 986 987 988 /** 989 Sets the stack size to use for tasks. 990 991 The default stack size is set to 512 KiB on 32-bit systems and to 16 MiB 992 on 64-bit systems, which is sufficient for most tasks. Tuning this value 993 can be used to reduce memory usage for large numbers of concurrent tasks 994 or to avoid stack overflows for applications with heavy stack use. 995 996 Note that this function must be called at initialization time, before any 997 task is started to have an effect. 998 999 Also note that the stack will initially not consume actual physical memory - 1000 it just reserves virtual address space. Only once the stack gets actually 1001 filled up with data will physical memory then be reserved page by page. This 1002 means that the stack can safely be set to large sizes on 64-bit systems 1003 without having to worry about memory usage. 1004 */ 1005 void setTaskStackSize(size_t sz) 1006 nothrow { 1007 TaskFiber.ms_taskStackSize = sz; 1008 } 1009 1010 1011 /** 1012 The number of worker threads used for processing worker tasks. 1013 1014 Note that this function will cause the worker threads to be started, 1015 if they haven't already. 1016 1017 See_also: `runWorkerTask`, `runWorkerTaskH`, `runWorkerTaskDist`, 1018 `setupWorkerThreads` 1019 */ 1020 @property size_t workerThreadCount() 1021 out(count) { assert(count > 0, "No worker threads started after setupWorkerThreads!?"); } 1022 do { 1023 setupWorkerThreads(); 1024 synchronized (st_threadsMutex) 1025 return st_workerPool.threadCount; 1026 } 1027 1028 1029 /** 1030 Disables the signal handlers usually set up by vibe.d. 1031 1032 During the first call to `runEventLoop`, vibe.d usually sets up a set of 1033 event handlers for SIGINT, SIGTERM and SIGPIPE. Since in some situations 1034 this can be undesirable, this function can be called before the first 1035 invocation of the event loop to avoid this. 1036 1037 Calling this function after `runEventLoop` will have no effect. 1038 */ 1039 void disableDefaultSignalHandlers() 1040 { 1041 synchronized (st_threadsMutex) 1042 s_disableSignalHandlers = true; 1043 } 1044 1045 /** 1046 Sets the effective user and group ID to the ones configured for privilege lowering. 1047 1048 This function is useful for services run as root to give up on the privileges that 1049 they only need for initialization (such as listening on ports <= 1024 or opening 1050 system log files). 1051 */ 1052 void lowerPrivileges(string uname, string gname) 1053 @safe { 1054 if (!isRoot()) return; 1055 if (uname != "" || gname != "") { 1056 static bool tryParse(T)(string s, out T n) 1057 { 1058 import std.conv, std.ascii; 1059 if (!isDigit(s[0])) return false; 1060 n = parse!T(s); 1061 return s.length==0; 1062 } 1063 int uid = -1, gid = -1; 1064 if (uname != "" && !tryParse(uname, uid)) uid = getUID(uname); 1065 if (gname != "" && !tryParse(gname, gid)) gid = getGID(gname); 1066 setUID(uid, gid); 1067 } else logWarn("Vibe was run as root, and no user/group has been specified for privilege lowering. Running with full permissions."); 1068 } 1069 1070 // ditto 1071 void lowerPrivileges() 1072 @safe { 1073 lowerPrivileges(s_privilegeLoweringUserName, s_privilegeLoweringGroupName); 1074 } 1075 1076 1077 /** 1078 Sets a callback that is invoked whenever a task changes its status. 1079 1080 This function is useful mostly for implementing debuggers that 1081 analyze the life time of tasks, including task switches. Note that 1082 the callback will only be called for debug builds. 1083 */ 1084 void setTaskEventCallback(TaskEventCallback func) 1085 { 1086 debug TaskFiber.ms_taskEventCallback = func; 1087 } 1088 1089 /** 1090 Sets a callback that is invoked whenever new task is created. 1091 1092 The callback is guaranteed to be invoked before the one set by 1093 `setTaskEventCallback` for the same task handle. 1094 1095 This function is useful mostly for implementing debuggers that 1096 analyze the life time of tasks, including task switches. Note that 1097 the callback will only be called for debug builds. 1098 */ 1099 void setTaskCreationCallback(TaskCreationCallback func) 1100 { 1101 debug TaskFiber.ms_taskCreationCallback = func; 1102 } 1103 1104 1105 /** 1106 A version string representing the current vibe.d core version 1107 */ 1108 enum vibeVersionString = "1.9.3"; 1109 1110 1111 /** 1112 Generic file descriptor event. 1113 1114 This kind of event can be used to wait for events on a non-blocking 1115 file descriptor. Note that this can usually only be used on socket 1116 based file descriptors. 1117 */ 1118 struct FileDescriptorEvent { 1119 /** Event mask selecting the kind of events to listen for. 1120 */ 1121 enum Trigger { 1122 none = 0, /// Match no event (invalid value) 1123 read = 1<<0, /// React on read-ready events 1124 write = 1<<1, /// React on write-ready events 1125 any = read|write /// Match any kind of event 1126 } 1127 1128 private { 1129 static struct Context { 1130 Trigger trigger; 1131 shared(NativeEventDriver) driver; 1132 } 1133 1134 StreamSocketFD m_socket; 1135 Context* m_context; 1136 } 1137 1138 @safe: 1139 1140 private this(int fd, Trigger event_mask) 1141 nothrow { 1142 m_socket = eventDriver.sockets.adoptStream(fd); 1143 m_context = () @trusted { return &eventDriver.sockets.userData!Context(m_socket); } (); 1144 m_context.trigger = event_mask; 1145 m_context.driver = () @trusted { return cast(shared)eventDriver; } (); 1146 } 1147 1148 this(this) 1149 nothrow { 1150 if (m_socket != StreamSocketFD.invalid) 1151 eventDriver.sockets.addRef(m_socket); 1152 } 1153 1154 ~this() 1155 nothrow { 1156 if (m_socket != StreamSocketFD.invalid) 1157 releaseHandle!"sockets"(m_socket, m_context.driver); 1158 } 1159 1160 1161 /** Waits for the selected event to occur. 1162 1163 Params: 1164 which = Optional event mask to react only on certain events 1165 timeout = Maximum time to wait for an event 1166 1167 Returns: 1168 The overload taking the timeout parameter returns true if 1169 an event was received on time and false otherwise. 1170 */ 1171 void wait(Trigger which = Trigger.any) 1172 { 1173 wait(Duration.max, which); 1174 } 1175 /// ditto 1176 bool wait(Duration timeout, Trigger which = Trigger.any) 1177 { 1178 if ((which & m_context.trigger) == Trigger.none) return true; 1179 1180 assert((which & m_context.trigger) == Trigger.read, "Waiting for write event not yet supported."); 1181 1182 bool got_data; 1183 1184 alias readwaiter = Waitable!(IOCallback, 1185 cb => eventDriver.sockets.waitForData(m_socket, cb), 1186 cb => eventDriver.sockets.cancelRead(m_socket), 1187 (fd, st, nb) { got_data = st == IOStatus.ok; } 1188 ); 1189 1190 asyncAwaitAny!(true, readwaiter)(timeout); 1191 1192 return got_data; 1193 } 1194 } 1195 1196 1197 /** 1198 Represents a timer. 1199 */ 1200 struct Timer { 1201 private { 1202 NativeEventDriver m_driver; 1203 TimerID m_id; 1204 debug uint m_magicNumber = 0x4d34f916; 1205 } 1206 1207 alias Callback = void delegate() @safe nothrow; 1208 1209 @safe: 1210 1211 private static Timer create(CALLABLE)(TimerID id, CALLABLE callback) 1212 nothrow { 1213 assert(id != TimerID.init, "Invalid timer ID."); 1214 1215 Timer ret; 1216 ret.m_driver = eventDriver; 1217 ret.m_id = id; 1218 1219 static if (is(typeof(!callback))) 1220 if (!callback) 1221 return ret; 1222 1223 ret.m_driver.timers.userData!CALLABLE(id) = callback; 1224 ret.m_driver.timers.wait(id, &TimerCallbackHandler!CALLABLE.instance.handle); 1225 1226 return ret; 1227 } 1228 1229 this(this) 1230 nothrow { 1231 debug assert(m_magicNumber == 0x4d34f916, "Timer corrupted."); 1232 if (m_driver) m_driver.timers.addRef(m_id); 1233 } 1234 1235 ~this() 1236 nothrow { 1237 debug assert(m_magicNumber == 0x4d34f916, "Timer corrupted."); 1238 if (m_driver) releaseHandle!"timers"(m_id, () @trusted { return cast(shared)m_driver; } ()); 1239 } 1240 1241 /// True if the timer is yet to fire. 1242 @property bool pending() nothrow { return m_driver.timers.isPending(m_id); } 1243 1244 /// The internal ID of the timer. 1245 @property size_t id() const nothrow { return m_id; } 1246 1247 bool opCast() const nothrow { return m_driver !is null; } 1248 1249 /// Determines if this reference is the only one 1250 @property bool unique() const nothrow { return m_driver ? m_driver.timers.isUnique(m_id) : false; } 1251 1252 /** Resets the timer to the specified timeout 1253 */ 1254 void rearm(Duration dur, bool periodic = false) nothrow 1255 in { assert(dur >= 0.seconds, "Negative timer duration specified."); } 1256 do { m_driver.timers.set(m_id, dur, periodic ? dur : 0.seconds); } 1257 1258 /** Resets the timer and avoids any firing. 1259 */ 1260 void stop() nothrow { if (m_driver) m_driver.timers.stop(m_id); } 1261 1262 /** Waits until the timer fires. 1263 1264 This method may only be used if no timer callback has been specified. 1265 1266 Returns: 1267 `true` is returned $(I iff) the timer was fired. 1268 */ 1269 bool wait() 1270 { 1271 auto cb = m_driver.timers.userData!Callback(m_id); 1272 assert(cb is null, "Cannot wait on a timer that was created with a callback."); 1273 1274 auto res = asyncAwait!(TimerCallback2, 1275 cb => m_driver.timers.wait(m_id, cb), 1276 cb => m_driver.timers.cancelWait(m_id) 1277 ); 1278 return res[1]; 1279 } 1280 } 1281 1282 private struct TimerCallbackHandler(CALLABLE) { 1283 static __gshared TimerCallbackHandler ms_instance; 1284 static @property ref TimerCallbackHandler instance() @trusted nothrow { return ms_instance; } 1285 1286 void handle(TimerID timer, bool fired) 1287 @safe nothrow { 1288 if (fired) { 1289 auto cb = eventDriver.timers.userData!CALLABLE(timer); 1290 auto l = yieldLock(); 1291 cb(); 1292 } 1293 1294 if (!eventDriver.timers.isUnique(timer) || eventDriver.timers.isPending(timer)) 1295 eventDriver.timers.wait(timer, &handle); 1296 } 1297 } 1298 1299 1300 /** Returns an object that ensures that no task switches happen during its life time. 1301 1302 Any attempt to run the event loop or switching to another task will cause 1303 an assertion to be thrown within the scope that defines the lifetime of the 1304 returned object. 1305 1306 Multiple yield locks can appear in nested scopes. 1307 */ 1308 auto yieldLock() 1309 @safe nothrow { 1310 static struct YieldLock { 1311 @safe nothrow: 1312 private bool m_initialized; 1313 1314 private this(bool) { m_initialized = true; inc(); } 1315 @disable this(this); 1316 ~this() { if (m_initialized) dec(); } 1317 1318 private void inc() 1319 { 1320 TaskFiber.getThis().m_yieldLockCount++; 1321 } 1322 1323 private void dec() 1324 { 1325 assert(TaskFiber.getThis().m_yieldLockCount > 0); 1326 TaskFiber.getThis().m_yieldLockCount--; 1327 } 1328 } 1329 1330 return YieldLock(true); 1331 } 1332 1333 unittest { 1334 auto tf = TaskFiber.getThis(); 1335 assert(tf.m_yieldLockCount == 0); 1336 { 1337 auto lock = yieldLock(); 1338 assert(tf.m_yieldLockCount == 1); 1339 { 1340 auto lock2 = yieldLock(); 1341 assert(tf.m_yieldLockCount == 2); 1342 } 1343 assert(tf.m_yieldLockCount == 1); 1344 } 1345 assert(tf.m_yieldLockCount == 0); 1346 1347 { 1348 typeof(yieldLock()) l; 1349 assert(tf.m_yieldLockCount == 0); 1350 } 1351 assert(tf.m_yieldLockCount == 0); 1352 } 1353 1354 1355 /**************************************************************************************************/ 1356 /* private types */ 1357 /**************************************************************************************************/ 1358 1359 1360 private void setupGcTimer() 1361 { 1362 s_gcTimer = createTimer(() @trusted { 1363 import core.memory; 1364 logTrace("gc idle collect"); 1365 GC.collect(); 1366 GC.minimize(); 1367 s_ignoreIdleForGC = true; 1368 }); 1369 s_gcCollectTimeout = dur!"seconds"(2); 1370 } 1371 1372 package(vibe) void performIdleProcessing(bool force_process_events = false) 1373 @safe nothrow { 1374 bool again = !getExitFlag(); 1375 while (again) { 1376 again = performIdleProcessingOnce(force_process_events); 1377 force_process_events = true; 1378 } 1379 1380 if (s_scheduler.scheduledTaskCount) logDebug("Exiting from idle processing although there are still yielded tasks"); 1381 1382 if (s_exitEventLoop) return; 1383 1384 if (!s_ignoreIdleForGC && s_gcTimer) { 1385 s_gcTimer.rearm(s_gcCollectTimeout); 1386 } else s_ignoreIdleForGC = false; 1387 } 1388 1389 private bool performIdleProcessingOnce(bool process_events) 1390 @safe nothrow { 1391 if (process_events) { 1392 auto er = eventDriver.core.processEvents(0.seconds); 1393 if (er.among!(ExitReason.exited, ExitReason.outOfWaiters) && s_scheduler.scheduledTaskCount == 0) { 1394 if (s_eventLoopRunning) { 1395 logDebug("Setting exit flag due to driver signalling exit: %s", er); 1396 s_exitEventLoop = true; 1397 } 1398 return false; 1399 } 1400 } 1401 1402 bool again; 1403 if (s_idleHandler) 1404 again = s_idleHandler(); 1405 1406 return (s_scheduler.schedule() == ScheduleStatus.busy || again) && !getExitFlag(); 1407 } 1408 1409 1410 private struct ThreadContext { 1411 Thread thread; 1412 } 1413 1414 /**************************************************************************************************/ 1415 /* private functions */ 1416 /**************************************************************************************************/ 1417 1418 private { 1419 Duration s_gcCollectTimeout; 1420 Timer s_gcTimer; 1421 bool s_ignoreIdleForGC = false; 1422 1423 __gshared core.sync.mutex.Mutex st_threadsMutex; 1424 shared TaskPool st_workerPool; 1425 shared ManualEvent st_threadsSignal; 1426 __gshared ThreadContext[] st_threads; 1427 __gshared Condition st_threadShutdownCondition; 1428 shared bool st_term = false; 1429 1430 bool s_isMainThread = false; // set in shared static this 1431 bool s_exitEventLoop = false; 1432 package bool s_eventLoopRunning = false; 1433 bool delegate() @safe nothrow s_idleHandler; 1434 1435 TaskScheduler s_scheduler; 1436 FixedRingBuffer!TaskFiber s_availableFibers; 1437 size_t s_maxRecycledFibers = 100; 1438 1439 string s_privilegeLoweringUserName; 1440 string s_privilegeLoweringGroupName; 1441 __gshared bool s_disableSignalHandlers = false; 1442 } 1443 1444 private bool getExitFlag() 1445 @trusted nothrow { 1446 return s_exitEventLoop || atomicLoad(st_term); 1447 } 1448 1449 package @property bool isEventLoopRunning() @safe nothrow @nogc { return s_eventLoopRunning; } 1450 package @property ref TaskScheduler taskScheduler() @safe nothrow @nogc { return s_scheduler; } 1451 1452 package void recycleFiber(TaskFiber fiber) 1453 @safe nothrow { 1454 if (s_availableFibers.length >= s_maxRecycledFibers) { 1455 auto fl = s_availableFibers.front; 1456 s_availableFibers.popFront(); 1457 fl.shutdown(); 1458 () @trusted { 1459 try destroy(fl); 1460 catch (Exception e) logWarn("Failed to destroy fiber: %s", e.msg); 1461 } (); 1462 } 1463 1464 if (s_availableFibers.full) 1465 s_availableFibers.capacity = 2 * s_availableFibers.capacity; 1466 1467 s_availableFibers.put(fiber); 1468 } 1469 1470 private void setupSignalHandlers() 1471 @trusted nothrow { 1472 scope (failure) assert(false); // _d_monitorexit is not nothrow 1473 __gshared bool s_setup = false; 1474 1475 // only initialize in main thread 1476 synchronized (st_threadsMutex) { 1477 if (s_setup) return; 1478 s_setup = true; 1479 1480 if (s_disableSignalHandlers) return; 1481 1482 logTrace("setup signal handler"); 1483 version(Posix){ 1484 // support proper shutdown using signals 1485 sigset_t sigset; 1486 sigemptyset(&sigset); 1487 sigaction_t siginfo; 1488 siginfo.sa_handler = &onSignal; 1489 siginfo.sa_mask = sigset; 1490 siginfo.sa_flags = SA_RESTART; 1491 sigaction(SIGINT, &siginfo, null); 1492 sigaction(SIGTERM, &siginfo, null); 1493 1494 siginfo.sa_handler = &onBrokenPipe; 1495 sigaction(SIGPIPE, &siginfo, null); 1496 } 1497 1498 version(Windows){ 1499 // WORKAROUND: we don't care about viral @nogc attribute here! 1500 import std.traits; 1501 signal(SIGTERM, cast(ParameterTypeTuple!signal[1])&onSignal); 1502 signal(SIGINT, cast(ParameterTypeTuple!signal[1])&onSignal); 1503 } 1504 } 1505 } 1506 1507 // per process setup 1508 shared static this() 1509 { 1510 s_isMainThread = true; 1511 1512 // COMPILER BUG: Must be some kind of module constructor order issue: 1513 // without this, the stdout/stderr handles are not initialized before 1514 // the log module is set up. 1515 import std.stdio; File f; f.close(); 1516 1517 initializeLogModule(); 1518 1519 logTrace("create driver core"); 1520 1521 st_threadsMutex = new Mutex; 1522 st_threadShutdownCondition = new Condition(st_threadsMutex); 1523 1524 auto thisthr = Thread.getThis(); 1525 thisthr.name = "main"; 1526 assert(st_threads.length == 0, "Main thread not the first thread!?"); 1527 st_threads ~= ThreadContext(thisthr); 1528 1529 st_threadsSignal = createSharedManualEvent(); 1530 1531 version(VibeIdleCollect) { 1532 logTrace("setup gc"); 1533 setupGcTimer(); 1534 } 1535 1536 version (VibeNoDefaultArgs) {} 1537 else { 1538 readOption("uid|user", &s_privilegeLoweringUserName, "Sets the user name or id used for privilege lowering."); 1539 readOption("gid|group", &s_privilegeLoweringGroupName, "Sets the group name or id used for privilege lowering."); 1540 } 1541 1542 import std.concurrency; 1543 scheduler = new VibedScheduler; 1544 } 1545 1546 shared static ~this() 1547 { 1548 shutdownDriver(); 1549 1550 size_t tasks_left = s_scheduler.scheduledTaskCount; 1551 1552 if (tasks_left > 0) 1553 logWarn("There were still %d tasks running at exit.", tasks_left); 1554 } 1555 1556 // per thread setup 1557 static this() 1558 { 1559 /// workaround for: 1560 // object.Exception@src/rt/minfo.d(162): Aborting: Cycle detected between modules with ctors/dtors: 1561 // vibe.core.core -> vibe.core.drivers.native -> vibe.core.drivers.libasync -> vibe.core.core 1562 if (Thread.getThis().isDaemon && Thread.getThis().name == "CmdProcessor") return; 1563 1564 auto thisthr = Thread.getThis(); 1565 synchronized (st_threadsMutex) 1566 if (!st_threads.any!(c => c.thread is thisthr)) 1567 st_threads ~= ThreadContext(thisthr); 1568 } 1569 1570 static ~this() 1571 { 1572 auto thisthr = Thread.getThis(); 1573 1574 bool is_main_thread = s_isMainThread; 1575 1576 synchronized (st_threadsMutex) { 1577 auto idx = st_threads.countUntil!(c => c.thread is thisthr); 1578 logDebug("Thread exit %s (index %s) (main=%s)", thisthr.name, idx, is_main_thread); 1579 } 1580 1581 if (is_main_thread) { 1582 logDiagnostic("Main thread exiting"); 1583 shutdownWorkerPool(); 1584 } 1585 1586 synchronized (st_threadsMutex) { 1587 auto idx = st_threads.countUntil!(c => c.thread is thisthr); 1588 assert(idx >= 0, "No more threads registered"); 1589 if (idx >= 0) { 1590 st_threads[idx] = st_threads[$-1]; 1591 st_threads.length--; 1592 } 1593 } 1594 1595 // delay deletion of the main event driver to "~shared static this()" 1596 if (!is_main_thread) shutdownDriver(); 1597 1598 st_threadShutdownCondition.notifyAll(); 1599 } 1600 1601 private void shutdownWorkerPool() 1602 nothrow { 1603 shared(TaskPool) tpool; 1604 1605 try synchronized (st_threadsMutex) swap(tpool, st_workerPool); 1606 catch (Exception e) assert(false, e.msg); 1607 1608 if (tpool) { 1609 logDiagnostic("Still waiting for worker threads to exit."); 1610 tpool.terminate(); 1611 } 1612 } 1613 1614 private void shutdownDriver() 1615 { 1616 if (ManualEvent.ms_threadEvent != EventID.init) { 1617 eventDriver.events.releaseRef(ManualEvent.ms_threadEvent); 1618 ManualEvent.ms_threadEvent = EventID.init; 1619 } 1620 1621 eventDriver.dispose(); 1622 } 1623 1624 1625 private void watchExitFlag() 1626 { 1627 auto emit_count = st_threadsSignal.emitCount; 1628 while (true) { 1629 synchronized (st_threadsMutex) { 1630 if (getExitFlag()) break; 1631 } 1632 1633 try emit_count = st_threadsSignal.wait(emit_count); 1634 catch (InterruptException e) return; 1635 } 1636 1637 logDebug("main thread exit"); 1638 eventDriver.core.exit(); 1639 } 1640 1641 private extern(C) void extrap() 1642 @safe nothrow { 1643 logTrace("exception trap"); 1644 } 1645 1646 private extern(C) void onSignal(int signal) 1647 nothrow { 1648 logInfo("Received signal %d. Shutting down.", signal); 1649 atomicStore(st_term, true); 1650 try st_threadsSignal.emit(); 1651 catch (Throwable th) { 1652 logDebug("Failed to notify for event loop exit: %s", th.msg); 1653 } 1654 } 1655 1656 private extern(C) void onBrokenPipe(int signal) 1657 nothrow { 1658 logTrace("Broken pipe."); 1659 } 1660 1661 version(Posix) 1662 { 1663 private bool isRoot() @trusted { return geteuid() == 0; } 1664 1665 private void setUID(int uid, int gid) 1666 @trusted { 1667 logInfo("Lowering privileges to uid=%d, gid=%d...", uid, gid); 1668 if (gid >= 0) { 1669 enforce(getgrgid(gid) !is null, "Invalid group id!"); 1670 enforce(setegid(gid) == 0, "Error setting group id!"); 1671 } 1672 //if( initgroups(const char *user, gid_t group); 1673 if (uid >= 0) { 1674 enforce(getpwuid(uid) !is null, "Invalid user id!"); 1675 enforce(seteuid(uid) == 0, "Error setting user id!"); 1676 } 1677 } 1678 1679 private int getUID(string name) 1680 @trusted { 1681 auto pw = getpwnam(name.toStringz()); 1682 enforce(pw !is null, "Unknown user name: "~name); 1683 return pw.pw_uid; 1684 } 1685 1686 private int getGID(string name) 1687 @trusted { 1688 auto gr = getgrnam(name.toStringz()); 1689 enforce(gr !is null, "Unknown group name: "~name); 1690 return gr.gr_gid; 1691 } 1692 } else version(Windows){ 1693 private bool isRoot() @safe { return false; } 1694 1695 private void setUID(int uid, int gid) 1696 @safe { 1697 enforce(false, "UID/GID not supported on Windows."); 1698 } 1699 1700 private int getUID(string name) 1701 @safe { 1702 enforce(false, "Privilege lowering not supported on Windows."); 1703 assert(false); 1704 } 1705 1706 private int getGID(string name) 1707 @safe { 1708 enforce(false, "Privilege lowering not supported on Windows."); 1709 assert(false); 1710 } 1711 }