1 /**
2 	This module contains the core functionality of the vibe.d framework.
3 
4 	Copyright: © 2012-2020 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.core.core;
9 
10 public import vibe.core.task;
11 
12 import eventcore.core;
13 import vibe.core.args;
14 import vibe.core.concurrency;
15 import vibe.core.internal.release;
16 import vibe.core.log;
17 import vibe.core.sync : ManualEvent, createSharedManualEvent;
18 import vibe.core.taskpool : TaskPool;
19 import vibe.internal.async;
20 import vibe.internal.array : FixedRingBuffer;
21 //import vibe.utils.array;
22 import std.algorithm;
23 import std.conv;
24 import std.encoding;
25 import core.exception;
26 import std.exception;
27 import std.functional;
28 import std.range : empty, front, popFront;
29 import std.string;
30 import std.traits : isFunctionPointer;
31 import std.typecons : Flag, Yes, Typedef, Tuple, tuple;
32 import core.atomic;
33 import core.sync.condition;
34 import core.sync.mutex;
35 import core.stdc.stdlib;
36 import core.thread;
37 
38 version(Posix)
39 {
40 	import core.sys.posix.signal;
41 	import core.sys.posix.unistd;
42 	import core.sys.posix.pwd;
43 
44 	static if (__traits(compiles, {import core.sys.posix.grp; getgrgid(0);})) {
45 		import core.sys.posix.grp;
46 	} else {
47 		extern (C) {
48 			struct group {
49 				char*   gr_name;
50 				char*   gr_passwd;
51 				gid_t   gr_gid;
52 				char**  gr_mem;
53 			}
54 			group* getgrgid(gid_t);
55 			group* getgrnam(in char*);
56 		}
57 	}
58 }
59 
60 version (Windows)
61 {
62 	import core.stdc.signal;
63 }
64 
65 
66 /**************************************************************************************************/
67 /* Public functions                                                                               */
68 /**************************************************************************************************/
69 
70 /**
71 	Performs final initialization and runs the event loop.
72 
73 	This function performs three tasks:
74 	$(OL
75 		$(LI Makes sure that no unrecognized command line options are passed to
76 			the application and potentially displays command line help. See also
77 			`vibe.core.args.finalizeCommandLineOptions`.)
78 		$(LI Performs privilege lowering if required.)
79 		$(LI Runs the event loop and blocks until it finishes.)
80 	)
81 
82 	Params:
83 		args_out = Optional parameter to receive unrecognized command line
84 			arguments. If left to `null`, an error will be reported if
85 			any unrecognized argument is passed.
86 
87 	See_also: ` vibe.core.args.finalizeCommandLineOptions`, `lowerPrivileges`,
88 		`runEventLoop`
89 */
90 int runApplication(string[]* args_out = null)
91 @safe {
92 	try if (!() @trusted { return finalizeCommandLineOptions(args_out); } () ) return 0;
93 	catch (Exception e) {
94 		logDiagnostic("Error processing command line: %s", e.msg);
95 		return 1;
96 	}
97 
98 	lowerPrivileges();
99 
100 	logDiagnostic("Running event loop...");
101 	int status;
102 	version (VibeDebugCatchAll) {
103 		try {
104 			status = runEventLoop();
105 		} catch (Throwable th) {
106 			th.logException("Unhandled exception in event loop");
107 			return 1;
108 		}
109 	} else {
110 		status = runEventLoop();
111 	}
112 
113 	logDiagnostic("Event loop exited with status %d.", status);
114 	return status;
115 }
116 
117 /// A simple echo server, listening on a privileged TCP port.
118 unittest {
119 	import vibe.core.core;
120 	import vibe.core.net;
121 
122 	int main()
123 	{
124 		// first, perform any application specific setup (privileged ports still
125 		// available if run as root)
126 		listenTCP(7, (conn) {
127 			try conn.write(conn);
128 			catch (Exception e) { /* log error */ }
129 		});
130 
131 		// then use runApplication to perform the remaining initialization and
132 		// to run the event loop
133 		return runApplication();
134 	}
135 }
136 
137 /** The same as above, but performing the initialization sequence manually.
138 
139 	This allows to skip any additional initialization (opening the listening
140 	port) if an invalid command line argument or the `--help`  switch is
141 	passed to the application.
142 */
143 unittest {
144 	import vibe.core.core;
145 	import vibe.core.net;
146 
147 	int main()
148 	{
149 		// process the command line first, to be able to skip the application
150 		// setup if not required
151 		if (!finalizeCommandLineOptions()) return 0;
152 
153 		// then set up the application
154 		listenTCP(7, (conn) {
155 			try conn.write(conn);
156 			catch (Exception e) { /* log error */ }
157 		});
158 
159 		// finally, perform privilege lowering (safe to skip for non-server
160 		// applications)
161 		lowerPrivileges();
162 
163 		// and start the event loop
164 		return runEventLoop();
165 	}
166 }
167 
168 /**
169 	Starts the vibe.d event loop for the calling thread.
170 
171 	Note that this function is usually called automatically by the vibe.d
172 	framework. However, if you provide your own `main()` function, you may need
173 	to call either this or `runApplication` manually.
174 
175 	The event loop will by default continue running during the whole life time
176 	of the application, but calling `runEventLoop` multiple times in sequence
177 	is allowed. Tasks will be started and handled only while the event loop is
178 	running.
179 
180 	Returns:
181 		The returned value is the suggested code to return to the operating
182 		system from the `main` function.
183 
184 	See_Also: `runApplication`
185 */
186 int runEventLoop()
187 @safe nothrow {
188 	setupSignalHandlers();
189 
190 	logDebug("Starting event loop.");
191 	s_eventLoopRunning = true;
192 	scope (exit) {
193 		eventDriver.core.clearExitFlag();
194 		s_eventLoopRunning = false;
195 		s_exitEventLoop = false;
196 		if (s_isMainThread) atomicStore(st_term, false);
197 		() @trusted nothrow {
198 			scope (failure) assert(false); // notifyAll is not marked nothrow
199 			st_threadShutdownCondition.notifyAll();
200 		} ();
201 	}
202 
203 	// runs any yield()ed tasks first
204 	assert(!s_exitEventLoop, "Exit flag set before event loop has started.");
205 	s_exitEventLoop = false;
206 	performIdleProcessing();
207 	if (getExitFlag()) return 0;
208 
209 	Task exit_task;
210 
211 	// handle exit flag in the main thread to exit when
212 	// exitEventLoop(true) is called from a thread)
213 	() @trusted nothrow {
214 		if (s_isMainThread)
215 			exit_task = runTask(toDelegate(&watchExitFlag));
216 	} ();
217 
218 	while (true) {
219 		auto er = s_scheduler.waitAndProcess();
220 		if (er != ExitReason.idle || s_exitEventLoop) {
221 			logDebug("Event loop exit reason (exit flag=%s): %s", s_exitEventLoop, er);
222 			break;
223 		}
224 		performIdleProcessing();
225 	}
226 
227 	// make sure the exit flag watch task finishes together with this loop
228 	// TODO: would be nice to do this without exceptions
229 	if (exit_task && exit_task.running)
230 		exit_task.interrupt();
231 
232 	logDebug("Event loop done (scheduled tasks=%s, waiters=%s, thread exit=%s).",
233 		s_scheduler.scheduledTaskCount, eventDriver.core.waiterCount, s_exitEventLoop);
234 	return 0;
235 }
236 
237 /**
238 	Stops the currently running event loop.
239 
240 	Calling this function will cause the event loop to stop event processing and
241 	the corresponding call to runEventLoop() will return to its caller.
242 
243 	Params:
244 		shutdown_all_threads = If true, exits event loops of all threads -
245 			false by default. Note that the event loops of all threads are
246 			automatically stopped when the main thread exits, so usually
247 			there is no need to set shutdown_all_threads to true.
248 */
249 void exitEventLoop(bool shutdown_all_threads = false)
250 @safe nothrow {
251 	logDebug("exitEventLoop called (%s)", shutdown_all_threads);
252 
253 	assert(s_eventLoopRunning || shutdown_all_threads, "Exiting event loop when none is running.");
254 	if (shutdown_all_threads) {
255 		() @trusted nothrow {
256 			shutdownWorkerPool();
257 			atomicStore(st_term, true);
258 			st_threadsSignal.emit();
259 		} ();
260 	}
261 
262 	// shutdown the calling thread
263 	s_exitEventLoop = true;
264 	if (s_eventLoopRunning) eventDriver.core.exit();
265 }
266 
267 /**
268 	Process all pending events without blocking.
269 
270 	Checks if events are ready to trigger immediately, and run their callbacks if so.
271 
272 	Returns: Returns false $(I iff) exitEventLoop was called in the process.
273 */
274 bool processEvents()
275 @safe nothrow {
276 	return !s_scheduler.process().among(ExitReason.exited, ExitReason.outOfWaiters);
277 }
278 
279 /**
280 	Wait once for events and process them.
281 */
282 ExitReason runEventLoopOnce()
283 @safe nothrow {
284 	auto ret = s_scheduler.waitAndProcess();
285 	if (ret == ExitReason.idle)
286 		performIdleProcessing();
287 	return ret;
288 }
289 
290 /**
291 	Sets a callback that is called whenever no events are left in the event queue.
292 
293 	The callback delegate is called whenever all events in the event queue have been
294 	processed. Returning true from the callback will cause another idle event to
295 	be triggered immediately after processing any events that have arrived in the
296 	meantime. Returning false will instead wait until another event has arrived first.
297 */
298 void setIdleHandler(void delegate() @safe nothrow del)
299 @safe nothrow {
300 	s_idleHandler = () @safe nothrow { del(); return false; };
301 }
302 /// ditto
303 void setIdleHandler(bool delegate() @safe nothrow del)
304 @safe nothrow {
305 	s_idleHandler = del;
306 }
307 
308 /**
309 	Runs a new asynchronous task.
310 
311 	task will be called synchronously from within the vibeRunTask call. It will
312 	continue to run until vibeYield() or any of the I/O or wait functions is
313 	called.
314 
315 	Note that the maximum size of all args must not exceed `maxTaskParameterSize`.
316 */
317 Task runTask(ARGS...)(void delegate(ARGS) @safe nothrow task, auto ref ARGS args)
318 {
319 	return runTask_internal!((ref tfi) { tfi.set(task, args); });
320 }
321 ///
322 Task runTask(ARGS...)(void delegate(ARGS) @system nothrow task, auto ref ARGS args)
323 @system {
324 	return runTask_internal!((ref tfi) { tfi.set(task, args); });
325 }
326 /// ditto
327 Task runTask(CALLABLE, ARGS...)(CALLABLE task, auto ref ARGS args)
328 	if (!is(CALLABLE : void delegate(ARGS)) && isNothrowCallable!(CALLABLE, ARGS))
329 {
330 	return runTask_internal!((ref tfi) { tfi.set(task, args); });
331 }
332 /// ditto
333 deprecated("The `task` argument should be nothrow")
334 Task runTask(ARGS...)(void delegate(ARGS) @safe task, auto ref ARGS args)
335 {
336 	return runTask_internal!((ref tfi) { tfi.set(task, args); });
337 }
338 /// ditto
339 deprecated("The `task` argument should be nothrow")
340 Task runTask(ARGS...)(void delegate(ARGS) @system task, auto ref ARGS args)
341 @system {
342 	return runTask_internal!((ref tfi) { tfi.set(task, args); });
343 }
344 /// ditto
345 deprecated("The `task` argument should be nothrow")
346 Task runTask(CALLABLE, ARGS...)(CALLABLE task, auto ref ARGS args)
347 	if (!is(CALLABLE : void delegate(ARGS)) && isCallable!(CALLABLE, ARGS) && !isNothrowCallable!(CALLABLE, ARGS))
348 {
349 	return runTask_internal!((ref tfi) { tfi.set(task, args); });
350 }
351 /// ditto
352 Task runTask(ARGS...)(TaskSettings settings, void delegate(ARGS) @safe nothrow task, auto ref ARGS args)
353 {
354 	return runTask_internal!((ref tfi) {
355 		tfi.settings = settings;
356 		tfi.set(task, args);
357 	});
358 }
359 /// ditto
360 Task runTask(ARGS...)(TaskSettings settings, void delegate(ARGS) @system nothrow task, auto ref ARGS args)
361 @system {
362 	return runTask_internal!((ref tfi) {
363 		tfi.settings = settings;
364 		tfi.set(task, args);
365 	});
366 }
367 /// ditto
368 Task runTask(CALLABLE, ARGS...)(TaskSettings settings, CALLABLE task, auto ref ARGS args)
369 	if (!is(CALLABLE : void delegate(ARGS)) && isNothrowCallable!(CALLABLE, ARGS))
370 {
371 	return runTask_internal!((ref tfi) {
372 		tfi.settings = settings;
373 		tfi.set(task, args);
374 	});
375 }
376 /// ditto
377 deprecated("The `task` argument should be nothrow")
378 Task runTask(ARGS...)(TaskSettings settings, void delegate(ARGS) @safe task, auto ref ARGS args)
379 {
380 	return runTask_internal!((ref tfi) {
381 		tfi.settings = settings;
382 		tfi.set(task, args);
383 	});
384 }
385 /// ditto
386 deprecated("The `task` argument should be nothrow")
387 Task runTask(ARGS...)(TaskSettings settings, void delegate(ARGS) @system task, auto ref ARGS args)
388 @system {
389 	return runTask_internal!((ref tfi) {
390 		tfi.settings = settings;
391 		tfi.set(task, args);
392 	});
393 }
394 /// ditto
395 deprecated("The `task` argument should be nothrow")
396 Task runTask(CALLABLE, ARGS...)(TaskSettings settings, CALLABLE task, auto ref ARGS args)
397 	if (!is(CALLABLE : void delegate(ARGS)) && isCallable!(CALLABLE, ARGS) && !isNothrowCallable!(CALLABLE, ARGS))
398 {
399 	return runTask_internal!((ref tfi) {
400 		tfi.settings = settings;
401 		tfi.set(task, args);
402 	});
403 }
404 
405 
406 unittest { // test proportional priority scheduling
407 	auto tm = setTimer(1000.msecs, { assert(false, "Test timeout"); });
408 	scope (exit) tm.stop();
409 
410 	size_t a, b;
411 	auto t1 = runTask(TaskSettings(1), { while (a + b < 100) { a++; try yield(); catch (Exception e) assert(false); } });
412 	auto t2 = runTask(TaskSettings(10), { while (a + b < 100) { b++; try yield(); catch (Exception e) assert(false); } });
413 	runTask({
414 		t1.joinUninterruptible();
415 		t2.joinUninterruptible();
416 		exitEventLoop();
417 	});
418 	runEventLoop();
419 	assert(a + b == 100);
420 	assert(b.among(90, 91, 92)); // expect 1:10 ratio +-1
421 }
422 
423 
424 /**
425 	Runs an asyncronous task that is guaranteed to finish before the caller's
426 	scope is left.
427 */
428 auto runTaskScoped(FT, ARGS)(scope FT callable, ARGS args)
429 {
430 	static struct S {
431 		Task handle;
432 
433 		@disable this(this);
434 
435 		~this()
436 		{
437 			handle.joinUninterruptible();
438 		}
439 	}
440 
441 	return S(runTask(callable, args));
442 }
443 
444 package Task runTask_internal(alias TFI_SETUP)()
445 {
446 	import std.typecons : Tuple, tuple;
447 
448 	TaskFiber f;
449 	while (!f && !s_availableFibers.empty) {
450 		f = s_availableFibers.back;
451 		s_availableFibers.popBack();
452 		if (() @trusted nothrow { return f.state; } () != Fiber.State.HOLD) f = null;
453 	}
454 
455 	if (f is null) {
456 		// if there is no fiber available, create one.
457 		if (s_availableFibers.capacity == 0) s_availableFibers.capacity = 1024;
458 		logDebugV("Creating new fiber...");
459 		f = new TaskFiber;
460 	}
461 
462 	TFI_SETUP(f.m_taskFunc);
463 
464 	f.bumpTaskCounter();
465 	auto handle = f.task();
466 
467 	debug if (TaskFiber.ms_taskCreationCallback) {
468 		TaskCreationInfo info;
469 		info.handle = handle;
470 		info.functionPointer = () @trusted { return cast(void*)f.m_taskFunc.functionPointer; } ();
471 		() @trusted { TaskFiber.ms_taskCreationCallback(info); } ();
472 	}
473 
474 	debug if (TaskFiber.ms_taskEventCallback) {
475 		() @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.preStart, handle); } ();
476 	}
477 
478 	debug (VibeTaskLog) logTrace("Switching to newly created task");
479 	switchToTask(handle);
480 
481 	debug if (TaskFiber.ms_taskEventCallback) {
482 		() @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.postStart, handle); } ();
483 	}
484 
485 	return handle;
486 }
487 
488 unittest { // ensure task.running is true directly after runTask
489 	Task t;
490 	bool hit = false;
491 	{
492 		auto l = yieldLock();
493 		t = runTask({ hit = true; });
494 		assert(!hit);
495 		assert(t.running);
496 	}
497 	t.join();
498 	assert(!t.running);
499 	assert(hit);
500 }
501 
502 
503 /**
504 	Runs a new asynchronous task in a worker thread.
505 
506 	Only function pointers with weakly isolated arguments are allowed to be
507 	able to guarantee thread-safety.
508 */
509 void runWorkerTask(FT, ARGS...)(FT func, auto ref ARGS args)
510 	if (isFunctionPointer!FT && isNothrowCallable!(FT, ARGS))
511 {
512 	setupWorkerThreads();
513 	st_workerPool.runTask(func, args);
514 }
515 /// ditto
516 void runWorkerTask(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args)
517 	if (isNothrowMethod!(shared(T), method, ARGS))
518 {
519 	setupWorkerThreads();
520 	st_workerPool.runTask!method(object, args);
521 }
522 /// ditto
523 void runWorkerTask(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args)
524 	if (isFunctionPointer!FT && isNothrowCallable!(FT, ARGS))
525 {
526 	setupWorkerThreads();
527 	st_workerPool.runTask(settings, func, args);
528 }
529 /// ditto
530 void runWorkerTask(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args)
531 	if (isNothrowMethod!(shared(T), method, ARGS))
532 {
533 	setupWorkerThreads();
534 	st_workerPool.runTask!method(settings, object, args);
535 }
536 /// ditto
537 deprecated("The `func` argument should be nothrow")
538 void runWorkerTask(FT, ARGS...)(FT func, auto ref ARGS args)
539 	if (isFunctionPointer!FT && isCallable!(FT, ARGS) && !isNothrowCallable!(FT, ARGS))
540 {
541 	setupWorkerThreads();
542 	st_workerPool.runTask(func, args);
543 }
544 /// ditto
545 deprecated("The `method` argument should be nothrow")
546 void runWorkerTask(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args)
547 	if (isMethod!(shared(T), method, ARGS) && !isNothrowMethod!(shared(T), method, ARGS))
548 {
549 	setupWorkerThreads();
550 	st_workerPool.runTask!method(object, args);
551 }
552 /// ditto
553 deprecated("The `func` argument should be nothrow")
554 void runWorkerTask(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args)
555 	if (isFunctionPointer!FT && isCallable!(FT, ARGS) && !isNothrowCallable!(FT, ARGS))
556 {
557 	setupWorkerThreads();
558 	st_workerPool.runTask(settings, func, args);
559 }
560 /// ditto
561 deprecated("The `method` argument should be nothrow")
562 void runWorkerTask(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args)
563 	if (isMethod!(shared(T), method, ARGS) && !isNothrowMethod!(shared(T), method, ARGS))
564 {
565 	setupWorkerThreads();
566 	st_workerPool.runTask!method(settings, object, args);
567 }
568 
569 
570 /**
571 	Runs a new asynchronous task in a worker thread, returning the task handle.
572 
573 	This function will yield and wait for the new task to be created and started
574 	in the worker thread, then resume and return it.
575 
576 	Only function pointers with weakly isolated arguments are allowed to be
577 	able to guarantee thread-safety.
578 */
579 Task runWorkerTaskH(FT, ARGS...)(FT func, auto ref ARGS args)
580 	if (isFunctionPointer!FT && isNothrowCallable!(FT, ARGS))
581 {
582 	setupWorkerThreads();
583 	return st_workerPool.runTaskH(func, args);
584 }
585 /// ditto
586 Task runWorkerTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args)
587 	if (isNothrowMethod!(shared(T), method, ARGS))
588 {
589 	setupWorkerThreads();
590 	return st_workerPool.runTaskH!method(object, args);
591 }
592 /// ditto
593 Task runWorkerTaskH(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args)
594 	if (isFunctionPointer!FT && isNothrowCallable!(FT, ARGS))
595 {
596 	setupWorkerThreads();
597 	return st_workerPool.runTaskH(settings, func, args);
598 }
599 /// ditto
600 Task runWorkerTaskH(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args)
601 	if (isNothrowMethod!(shared(T), method, ARGS))
602 {
603 	setupWorkerThreads();
604 	return st_workerPool.runTaskH!method(settings, object, args);
605 }
606 /// ditto
607 deprecated("The `func` argument should be nothrow")
608 Task runWorkerTaskH(FT, ARGS...)(FT func, auto ref ARGS args)
609 	if (isFunctionPointer!FT && isCallable!(FT, ARGS) && !isNothrowCallable!(FT, ARGS))
610 {
611 	setupWorkerThreads();
612 	return st_workerPool.runTaskH(func, args);
613 }
614 /// ditto
615 deprecated("The `method` argument should be nothrow")
616 Task runWorkerTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args)
617 	if (isMethod!(shared(T), method, ARGS) && !isNothrowMethod!(shared(T), method, ARGS))
618 {
619 	setupWorkerThreads();
620 	return st_workerPool.runTaskH!method(object, args);
621 }
622 /// ditto
623 deprecated("The `func` argument should be nothrow")
624 Task runWorkerTaskH(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args)
625 	if (isFunctionPointer!FT && isCallable!(FT, ARGS) && !isNothrowCallable!(FT, ARGS))
626 {
627 	setupWorkerThreads();
628 	return st_workerPool.runTaskH(settings, func, args);
629 }
630 /// ditto
631 deprecated("The `method` argument should be nothrow")
632 Task runWorkerTaskH(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args)
633 	if (isMethod!(shared(T), method, ARGS) && !isNothrowMethod!(shared(T), method, ARGS))
634 {
635 	setupWorkerThreads();
636 	return st_workerPool.runTaskH!method(settings, object, args);
637 }
638 
639 /// Running a worker task using a function
640 unittest {
641 	static void workerFunc(int param)
642 	{
643 		logInfo("Param: %s", param);
644 	}
645 
646 	static void test()
647 	{
648 		runWorkerTask(&workerFunc, 42);
649 		runWorkerTask(&workerFunc, cast(ubyte)42); // implicit conversion #719
650 		runWorkerTaskDist(&workerFunc, 42);
651 		runWorkerTaskDist(&workerFunc, cast(ubyte)42); // implicit conversion #719
652 	}
653 }
654 
655 /// Running a worker task using a class method
656 unittest {
657 	static class Test {
658 		void workerMethod(int param)
659 		shared nothrow {
660 			logInfo("Param: %s", param);
661 		}
662 	}
663 
664 	static void test()
665 	{
666 		auto cls = new shared Test;
667 		runWorkerTask!(Test.workerMethod)(cls, 42);
668 		runWorkerTask!(Test.workerMethod)(cls, cast(ubyte)42); // #719
669 		runWorkerTaskDist!(Test.workerMethod)(cls, 42);
670 		runWorkerTaskDist!(Test.workerMethod)(cls, cast(ubyte)42); // #719
671 	}
672 }
673 
674 /// Running a worker task using a function and communicating with it
675 unittest {
676 	static void workerFunc(Task caller)
677 	nothrow {
678 		int counter = 10;
679 		try {
680 			while (receiveOnly!string() == "ping" && --counter) {
681 				logInfo("pong");
682 				caller.send("pong");
683 			}
684 			caller.send("goodbye");
685 		} catch (Exception e) assert(false, e.msg);
686 	}
687 
688 	static void test()
689 	{
690 		Task callee = runWorkerTaskH(&workerFunc, Task.getThis);
691 		do {
692 			logInfo("ping");
693 			callee.send("ping");
694 		} while (receiveOnly!string() == "pong");
695 	}
696 
697 	static void work719(int) nothrow {}
698 	static void test719() { runWorkerTaskH(&work719, cast(ubyte)42); }
699 }
700 
701 /// Running a worker task using a class method and communicating with it
702 unittest {
703 	static class Test {
704 		void workerMethod(Task caller)
705 		shared nothrow {
706 			int counter = 10;
707 			try {
708 				while (receiveOnly!string() == "ping" && --counter) {
709 					logInfo("pong");
710 					caller.send("pong");
711 				}
712 				caller.send("goodbye");
713 			} catch (Exception e) assert(false, e.msg);
714 		}
715 	}
716 
717 	static void test()
718 	{
719 		auto cls = new shared Test;
720 		Task callee = runWorkerTaskH!(Test.workerMethod)(cls, Task.getThis());
721 		do {
722 			logInfo("ping");
723 			callee.send("ping");
724 		} while (receiveOnly!string() == "pong");
725 	}
726 
727 	static class Class719 {
728 		void work(int) shared nothrow {}
729 	}
730 	static void test719() {
731 		auto cls = new shared Class719;
732 		runWorkerTaskH!(Class719.work)(cls, cast(ubyte)42);
733 	}
734 }
735 
736 unittest { // run and join local task from outside of a task
737 	int i = 0;
738 	auto t = runTask({
739 		try sleep(1.msecs);
740 		catch (Exception e) assert(false, e.msg);
741 		i = 1;
742 	});
743 	t.join();
744 	assert(i == 1);
745 }
746 
747 unittest { // run and join worker task from outside of a task
748 	__gshared int i = 0;
749 	auto t = runWorkerTaskH({
750 		try sleep(5.msecs);
751 		catch (Exception e) assert(false, e.msg);
752 		i = 1;
753 	});
754 	t.join();
755 	assert(i == 1);
756 }
757 
758 
759 /**
760 	Runs a new asynchronous task in all worker threads concurrently.
761 
762 	This function is mainly useful for long-living tasks that distribute their
763 	work across all CPU cores. Only function pointers with weakly isolated
764 	arguments are allowed to be able to guarantee thread-safety.
765 
766 	The number of tasks started is guaranteed to be equal to
767 	`workerThreadCount`.
768 */
769 void runWorkerTaskDist(FT, ARGS...)(FT func, auto ref ARGS args)
770 	if (isFunctionPointer!FT && isNothrowCallable!(FT, ARGS))
771 {
772 	setupWorkerThreads();
773 	return st_workerPool.runTaskDist(func, args);
774 }
775 /// ditto
776 void runWorkerTaskDist(alias method, T, ARGS...)(shared(T) object, ARGS args)
777 	if (isNothrowMethod!(shared(T), method, ARGS))
778 {
779 	setupWorkerThreads();
780 	return st_workerPool.runTaskDist!method(object, args);
781 }
782 /// ditto
783 void runWorkerTaskDist(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args)
784 	if (isFunctionPointer!FT && isNothrowCallable!(FT, ARGS))
785 {
786 	setupWorkerThreads();
787 	return st_workerPool.runTaskDist(settings, func, args);
788 }
789 /// ditto
790 void runWorkerTaskDist(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, ARGS args)
791 	if (isNothrowMethod!(shared(T), method, ARGS))
792 {
793 	setupWorkerThreads();
794 	return st_workerPool.runTaskDist!method(settings, object, args);
795 }
796 /// ditto
797 deprecated("The `func` argument should be nothrow")
798 void runWorkerTaskDist(FT, ARGS...)(FT func, auto ref ARGS args)
799 	if (isFunctionPointer!FT && isCallable!(FT, ARGS) && !isNothrowCallable!(FT, ARGS))
800 {
801 	setupWorkerThreads();
802 	return st_workerPool.runTaskDist(func, args);
803 }
804 /// ditto
805 deprecated("The `method` argument should be nothrow")
806 void runWorkerTaskDist(alias method, T, ARGS...)(shared(T) object, ARGS args)
807 	if (isMethod!(shared(T), method, ARGS) && !isNothrowMethod!(shared(T), method, ARGS))
808 {
809 	setupWorkerThreads();
810 	return st_workerPool.runTaskDist!method(object, args);
811 }
812 /// ditto
813 deprecated("The `func` argument should be nothrow")
814 void runWorkerTaskDist(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args)
815 	if (isFunctionPointer!FT && isCallable!(FT, ARGS) && !isNothrowCallable!(FT, ARGS))
816 {
817 	setupWorkerThreads();
818 	return st_workerPool.runTaskDist(settings, func, args);
819 }
820 /// ditto
821 deprecated("The `method` argument should be nothrow")
822 void runWorkerTaskDist(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, ARGS args)
823 	if (isMethod!(shared(T), method, ARGS) && !isNothrowMethod!(shared(T), method, ARGS))
824 {
825 	setupWorkerThreads();
826 	return st_workerPool.runTaskDist!method(settings, object, args);
827 }
828 
829 
830 /** Runs a new asynchronous task in all worker threads and returns the handles.
831 
832 	`on_handle` is a callble that takes a `Task` as its only argument and is
833 	called for every task instance that gets created.
834 
835 	See_also: `runWorkerTaskDist`
836 */
837 void runWorkerTaskDistH(HCB, FT, ARGS...)(scope HCB on_handle, FT func, auto ref ARGS args)
838 	if (isFunctionPointer!FT && isNothrowCallable!(FT, ARGS))
839 {
840 	setupWorkerThreads();
841 	st_workerPool.runTaskDistH(on_handle, func, args);
842 }
843 /// ditto
844 void runWorkerTaskDistH(HCB, FT, ARGS...)(TaskSettings settings, scope HCB on_handle, FT func, auto ref ARGS args)
845 	if (isFunctionPointer!FT && isNothrowCallable!(FT, ARGS))
846 {
847 	setupWorkerThreads();
848 	st_workerPool.runTaskDistH(settings, on_handle, func, args);
849 }
850 /// ditto
851 deprecated("The `func` argument should be nothrow")
852 void runWorkerTaskDistH(HCB, FT, ARGS...)(scope HCB on_handle, FT func, auto ref ARGS args)
853 	if (isFunctionPointer!FT && isCallable!(FT, ARGS) && !isNothrowCallable!(FT, ARGS))
854 {
855 	setupWorkerThreads();
856 	st_workerPool.runTaskDistH(on_handle, func, args);
857 }
858 /// ditto
859 deprecated("The `func` argument should be nothrow")
860 void runWorkerTaskDistH(HCB, FT, ARGS...)(TaskSettings settings, scope HCB on_handle, FT func, auto ref ARGS args)
861 	if (isFunctionPointer!FT && isCallable!(FT, ARGS) && !isNothrowCallable!(FT, ARGS))
862 {
863 	setupWorkerThreads();
864 	st_workerPool.runTaskDistH(settings, on_handle, func, args);
865 }
866 
867 
868 enum isCallable(CALLABLE, ARGS...) = is(typeof({ mixin(testCall!ARGS("CALLABLE.init")); }));
869 enum isNothrowCallable(CALLABLE, ARGS...) = is(typeof(() nothrow { mixin(testCall!ARGS("CALLABLE.init")); }));
870 enum isMethod(T, alias method, ARGS...) = is(typeof({ mixin(testCall!ARGS("__traits(getMember, T.init, __traits(identifier, method))")); }));
871 enum isNothrowMethod(T, alias method, ARGS...) = is(typeof(() nothrow { mixin(testCall!ARGS("__traits(getMember, T.init, __traits(identifier, method))")); }));
872 private string testCall(ARGS...)(string callable) {
873 	auto ret = callable ~ "(";
874 	foreach (i, Ti; ARGS) {
875 		if (i > 0) ret ~= ", ";
876 		static if (is(typeof((Ti a) => a)))
877 			ret ~= "(function ref ARGS["~i.stringof~"]() { static ARGS["~i.stringof~"] ret; return ret; }) ()";
878 		else
879 			ret ~= "ARGS["~i.stringof~"].init";
880 	}
881 	return ret ~ ");";
882 }
883 
884 unittest {
885 	static assert(isCallable!(void function() @system));
886 	static assert(isCallable!(void function(int) @system, int));
887 	static assert(isCallable!(void function(ref int) @system, int));
888 	static assert(isNothrowCallable!(void function() nothrow @system));
889 	static assert(!isNothrowCallable!(void function() @system));
890 
891 	struct S { @disable this(this); }
892 	static assert(isCallable!(void function(S) @system, S));
893 	static assert(isNothrowCallable!(void function(S) @system nothrow, S));
894 }
895 
896 
897 /**
898 	Sets up num worker threads.
899 
900 	This function gives explicit control over the number of worker threads.
901 	Note, to have an effect the function must be called prior to related worker
902 	tasks functions which set up the default number of worker threads
903 	implicitly.
904 
905 	Params:
906 		num = The number of worker threads to initialize. Defaults to
907 			`logicalProcessorCount`.
908 	See_also: `runWorkerTask`, `runWorkerTaskH`, `runWorkerTaskDist`
909 */
910 public void setupWorkerThreads(uint num = 0)
911 @safe nothrow {
912 	static bool s_workerThreadsStarted = false;
913 	if (s_workerThreadsStarted) return;
914 	s_workerThreadsStarted = true;
915 
916 	if (num == 0) {
917 		try num = () @trusted { return logicalProcessorCount(); } ();
918 		catch (Exception e) {
919 			logException(e, "Failed to get logical processor count, assuming 4.");
920 			num = 4;
921 		}
922 	}
923 
924 	() @trusted nothrow {
925 		st_threadsMutex.lock_nothrow();
926 		scope (exit) st_threadsMutex.unlock_nothrow();
927 
928 		if (!st_workerPool)
929 			st_workerPool = new shared TaskPool(num);
930 	} ();
931 }
932 
933 
934 /** Returns the default worker task pool.
935 
936 	This pool is used by `runWorkerTask`, `runWorkerTaskH` and
937 	`runWorkerTaskDist`.
938 */
939 @property shared(TaskPool) workerTaskPool()
940 {
941 	setupWorkerThreads();
942 	return st_workerPool;
943 }
944 
945 
946 /**
947 	Determines the number of logical processors in the system.
948 
949 	This number includes virtual cores on hyper-threading enabled CPUs.
950 */
951 public @property uint logicalProcessorCount()
952 {
953 	import std.parallelism : totalCPUs;
954 	return totalCPUs;
955 }
956 
957 
958 /**
959 	Suspends the execution of the calling task to let other tasks and events be
960 	handled.
961 
962 	Calling this function in short intervals is recommended if long CPU
963 	computations are carried out by a task. It can also be used in conjunction
964 	with Signals to implement cross-fiber events with no polling.
965 
966 	Throws:
967 		May throw an `InterruptException` if `Task.interrupt()` gets called on
968 		the calling task.
969 */
970 void yield()
971 @safe {
972 	auto t = Task.getThis();
973 	if (t != Task.init) {
974 		auto tf = () @trusted { return t.taskFiber; } ();
975 		tf.handleInterrupt();
976 		s_scheduler.yield();
977 		tf.handleInterrupt();
978 	} else {
979 		// Let yielded tasks execute
980 		assert(TaskFiber.getThis().m_yieldLockCount == 0, "May not yield within an active yieldLock()!");
981 		() @safe nothrow { performIdleProcessingOnce(true); } ();
982 	}
983 }
984 
985 unittest {
986 	size_t ti;
987 	auto t = runTask({
988 		for (ti = 0; ti < 10; ti++)
989 			try yield();
990 			catch (Exception e) assert(false, e.msg);
991 	});
992 
993 	foreach (i; 0 .. 5) yield();
994 	assert(ti > 0 && ti < 10, "Task did not interleave with yield loop outside of task");
995 
996 	t.join();
997 	assert(ti == 10);
998 }
999 
1000 
1001 /**
1002 	Suspends the execution of the calling task until `switchToTask` is called
1003 	manually.
1004 
1005 	This low-level scheduling function is usually only used internally. Failure
1006 	to call `switchToTask` will result in task starvation and resource leakage.
1007 
1008 	Params:
1009 		on_interrupt = If specified, is required to
1010 
1011 	See_Also: `switchToTask`
1012 */
1013 void hibernate(scope void delegate() @safe nothrow on_interrupt = null)
1014 @safe nothrow {
1015 	auto t = Task.getThis();
1016 	if (t == Task.init) {
1017 		assert(TaskFiber.getThis().m_yieldLockCount == 0, "May not yield within an active yieldLock!");
1018 		runEventLoopOnce();
1019 	} else {
1020 		auto tf = () @trusted { return t.taskFiber; } ();
1021 		tf.handleInterrupt(on_interrupt);
1022 		s_scheduler.hibernate();
1023 		tf.handleInterrupt(on_interrupt);
1024 	}
1025 }
1026 
1027 
1028 /**
1029 	Switches execution to the given task.
1030 
1031 	This function can be used in conjunction with `hibernate` to wake up a
1032 	task. The task must live in the same thread as the caller.
1033 
1034 	If no priority is specified, `TaskSwitchPriority.prioritized` or
1035 	`TaskSwitchPriority.immediate` will be used, depending on whether a
1036 	yield lock is currently active.
1037 
1038 	Note that it is illegal to use `TaskSwitchPriority.immediate` if a yield
1039 	lock is active.
1040 
1041 	This function must only be called on tasks that belong to the calling
1042 	thread and have previously been hibernated!
1043 
1044 	See_Also: `hibernate`, `yieldLock`
1045 */
1046 void switchToTask(Task t)
1047 @safe nothrow {
1048 	auto defer = TaskFiber.getThis().m_yieldLockCount > 0;
1049 	s_scheduler.switchTo(t, defer ? TaskSwitchPriority.prioritized : TaskSwitchPriority.immediate);
1050 }
1051 /// ditto
1052 void switchToTask(Task t, TaskSwitchPriority priority)
1053 @safe nothrow {
1054 	s_scheduler.switchTo(t, priority);
1055 }
1056 
1057 
1058 /**
1059 	Suspends the execution of the calling task for the specified amount of time.
1060 
1061 	Note that other tasks of the same thread will continue to run during the
1062 	wait time, in contrast to $(D core.thread.Thread.sleep), which shouldn't be
1063 	used in vibe.d applications.
1064 
1065 	Repeated_sleep:
1066 	  As this method creates a new `Timer` every time, it is not recommended to
1067 	  use it in a tight loop. For functions that calls `sleep` frequently,
1068 	  it is preferable to instantiate a single `Timer` and reuse it,
1069 	  as shown in the following example:
1070 	  ---
1071 	  void myPollingFunction () {
1072 		  Timer waiter = createTimer(null); // Create a re-usable timer
1073 		  while (true) {
1074 			  // Your awesome code goes here
1075 			  timer.rearm(timeout, false);
1076 			  timer.wait();
1077 		  }
1078 	  }
1079 	  ---
1080 
1081 	Throws: May throw an `InterruptException` if the task gets interrupted using
1082 		`Task.interrupt()`.
1083 */
1084 void sleep(Duration timeout)
1085 @safe {
1086 	assert(timeout >= 0.seconds, "Argument to sleep must not be negative.");
1087 	if (timeout <= 0.seconds) return;
1088 	auto tm = setTimer(timeout, null);
1089 	tm.wait();
1090 }
1091 ///
1092 unittest {
1093 	import vibe.core.core : sleep;
1094 	import vibe.core.log : logInfo;
1095 	import core.time : msecs;
1096 
1097 	void test()
1098 	{
1099 		logInfo("Sleeping for half a second...");
1100 		sleep(500.msecs);
1101 		logInfo("Done sleeping.");
1102 	}
1103 }
1104 
1105 
1106 /** Suspends the execution of the calling task an an uninterruptible manner.
1107 
1108 	This function behaves the same as `sleep`, except that invoking
1109 	`Task.interrupt` on the calling task will not result in an
1110 	`InterruptException` being thrown from `sleepUninterruptible`. Instead,
1111 	if any, a later interruptible wait state will throw the exception.
1112 */
1113 void sleepUninterruptible(Duration timeout)
1114 @safe nothrow {
1115 	assert(timeout >= 0.seconds, "Argument to sleep must not be negative.");
1116 	if (timeout <= 0.seconds) return;
1117 	auto tm = setTimer(timeout, null);
1118 	tm.waitUninterruptible();
1119 }
1120 
1121 
1122 /**
1123 	Creates a new timer, that will fire `callback` after `timeout`
1124 
1125 	Timers can be be separated into two categories: one-off or periodic.
1126 	One-off timers fire only once, after a specific amount of time,
1127 	while periodic timer fire at a regular interval.
1128 
1129 	One-off_timers:
1130 	One-off timers can be used for performing a task after a specific delay,
1131 	or to schedule a time without interrupting the currently running code.
1132 	For example, the following is a way to emulate a 'schedule' primitive,
1133 	a way to schedule a task without starting it immediately (unlike `runTask`):
1134 	---
1135 	void handleRequest (scope HTTPServerRequest req, scope HTTPServerResponse res) {
1136 		Payload payload = parse(req);
1137 		if (payload.isValid())
1138 		  // Don't immediately yield, finish processing the data and the query
1139 		  setTimer(0.msecs, () => sendToPeers(payload));
1140 		process(payload);
1141 		res.writeVoidBody();
1142 	}
1143 	---
1144 
1145 	In this example, the server delays the network communication that
1146 	will be	 performed by `sendToPeers` until after the request is fully
1147 	processed, ensuring the client doesn't wait more than the actual processing
1148 	time for the response.
1149 
1150 	Periodic_timers:
1151 	Periodic timers will trigger for the first time after `timeout`,
1152 	then at best every `timeout` period after this. Periodic timers may be
1153 	explicitly stopped by calling the `Timer.stop()` method on the return value
1154 	of this function.
1155 
1156 	As timer are non-preemtive (see the "Preemption" section), user code does
1157 	not need to compensate for time drift, as the time spent in the function
1158 	will not affect the frequency, unless the function takes longer to complete
1159 	than the timer.
1160 
1161 	Preemption:
1162 	Like other events in Vibe.d, timers are non-preemptive, meaning that
1163 	the currently executing function will not be interrupted to let a timer run.
1164 	This is usually not a problem in server applications, as any blocking code
1165 	will be easily noticed (the server will stop to handle requests), but might
1166 	come at a surprise in code that doesn't handle request.
1167 	If this is a problem, the solution is usually to either explicitly give
1168 	control to the event loop (by calling `yield`) or ensuring operations are
1169 	asynchronous (e.g. call functions from `vibe.core.file` instead of `std.file`).
1170 
1171 	Reentrancy:
1172 	The event loop guarantees that the same timer will never be called more than
1173 	once at a time. Hence, functions run on a timer do not need to be re-entrant,
1174 	even if they execute for longer than the timer frequency.
1175 
1176 	Params:
1177 		timeout = Determines the minimum amount of time that elapses before the timer fires.
1178 		callback = A delegate to be called when the timer fires. Can be `null`,
1179 				   in which case the timer will not do anything.
1180 		periodic = Speficies if the timer fires repeatedly or only once
1181 
1182 	Returns:
1183 		Returns a `Timer` object that can be used to identify and modify the timer.
1184 
1185 	See_also: `createTimer`
1186 */
1187 Timer setTimer(Duration timeout, Timer.Callback callback, bool periodic = false)
1188 @safe nothrow {
1189 	auto tm = createTimer(callback);
1190 	tm.rearm(timeout, periodic);
1191 	return tm;
1192 }
1193 ///
1194 unittest {
1195 	void printTime()
1196 	@safe nothrow {
1197 		import std.datetime;
1198 		logInfo("The time is: %s", Clock.currTime());
1199 	}
1200 
1201 	void test()
1202 	{
1203 		import vibe.core.core;
1204 		// start a periodic timer that prints the time every second
1205 		setTimer(1.seconds, toDelegate(&printTime), true);
1206 	}
1207 }
1208 
1209 /// Compatibility overload - use a `@safe nothrow` callback instead.
1210 Timer setTimer(Duration timeout, void delegate() callback, bool periodic = false)
1211 @system nothrow {
1212 	return setTimer(timeout, () @trusted nothrow {
1213 		try callback();
1214 		catch (Exception e) {
1215 			e.logException!(LogLevel.warn)("Timer callback failed");
1216 		}
1217 	}, periodic);
1218 }
1219 
1220 unittest { // make sure that periodic timer calls never overlap
1221 	int ccount = 0;
1222 	int fcount = 0;
1223 	Timer tm;
1224 
1225 	tm = setTimer(10.msecs, {
1226 		ccount++;
1227 		scope (exit) ccount--;
1228 		assert(ccount == 1); // no concurrency allowed
1229 		assert(fcount < 5);
1230 		sleep(100.msecs);
1231 		if (++fcount >= 5)
1232 			tm.stop();
1233 	}, true);
1234 
1235 	while (tm.pending) sleep(50.msecs);
1236 
1237 	sleep(50.msecs);
1238 
1239 	assert(fcount == 5);
1240 }
1241 
1242 
1243 /** Creates a new timer without arming it.
1244 
1245 	Each time `callback` gets invoked, it will be run inside of a newly started
1246 	task.
1247 
1248 	Params:
1249 		callback = If non-`null`, this delegate will be called when the timer
1250 			fires
1251 
1252 	See_also: `createLeanTimer`, `setTimer`
1253 */
1254 Timer createTimer(void delegate() nothrow @safe callback = null)
1255 @safe nothrow {
1256 	static struct C {
1257 		void delegate() nothrow @safe m_callback;
1258 		bool m_running = false;
1259 		bool m_pendingFire = false;
1260 
1261 		void opCall(Timer tm)
1262 		nothrow @safe {
1263 			if (m_running) {
1264 				m_pendingFire = true;
1265 				return;
1266 			}
1267 
1268 			m_running = true;
1269 
1270 			runTask((Timer tm) nothrow {
1271 				assert(m_running);
1272 				scope (exit) m_running = false;
1273 
1274 				do {
1275 					m_pendingFire = false;
1276 					m_callback();
1277 
1278 					// make sure that no callbacks are fired after the timer
1279 					// has been actively stopped
1280 					if (m_pendingFire && !tm.pending)
1281 						m_pendingFire = false;
1282 				} while (m_pendingFire);
1283 			}, tm);
1284 		}
1285 	}
1286 
1287 	if (callback) {
1288 		C c = {callback};
1289 		return createLeanTimer(c);
1290 	}
1291 
1292 	return createLeanTimer!(Timer.Callback)(null);
1293 }
1294 
1295 
1296 /** Creates a new timer with a lean callback mechanism.
1297 
1298 	In contrast to the standard `createTimer`, `callback` will not be called
1299 	in a new task, but is instead called directly in the context of the event
1300 	loop.
1301 
1302 	For this reason, the supplied callback is not allowed to perform any
1303 	operation that needs to block/yield execution. In this case, `runTask`
1304 	needs to be used explicitly to perform the operation asynchronously.
1305 
1306 	Additionally, `callback` can carry arbitrary state without requiring a heap
1307 	allocation.
1308 
1309 	See_also: `createTimer`
1310 */
1311 Timer createLeanTimer(CALLABLE)(CALLABLE callback)
1312 	if (is(typeof(() @safe nothrow { callback(); } ()))
1313 		|| is(typeof(() @safe nothrow { callback(Timer.init); } ())))
1314 {
1315 	return Timer.create(eventDriver.timers.create(), callback);
1316 }
1317 
1318 
1319 /**
1320 	Creates an event to wait on an existing file descriptor.
1321 
1322 	The file descriptor usually needs to be a non-blocking socket for this to
1323 	work.
1324 
1325 	Params:
1326 		file_descriptor = The Posix file descriptor to watch
1327 		event_mask = Specifies which events will be listened for
1328 
1329 	Returns:
1330 		Returns a newly created FileDescriptorEvent associated with the given
1331 		file descriptor.
1332 */
1333 FileDescriptorEvent createFileDescriptorEvent(int file_descriptor, FileDescriptorEvent.Trigger event_mask)
1334 @safe nothrow {
1335 	return FileDescriptorEvent(file_descriptor, event_mask);
1336 }
1337 
1338 
1339 /**
1340 	Sets the stack size to use for tasks.
1341 
1342 	The default stack size is set to 512 KiB on 32-bit systems and to 16 MiB
1343 	on 64-bit systems, which is sufficient for most tasks. Tuning this value
1344 	can be used to reduce memory usage for large numbers of concurrent tasks
1345 	or to avoid stack overflows for applications with heavy stack use.
1346 
1347 	Note that this function must be called at initialization time, before any
1348 	task is started to have an effect.
1349 
1350 	Also note that the stack will initially not consume actual physical memory -
1351 	it just reserves virtual address space. Only once the stack gets actually
1352 	filled up with data will physical memory then be reserved page by page. This
1353 	means that the stack can safely be set to large sizes on 64-bit systems
1354 	without having to worry about memory usage.
1355 */
1356 void setTaskStackSize(size_t sz)
1357 nothrow {
1358 	TaskFiber.ms_taskStackSize = sz;
1359 }
1360 
1361 
1362 /**
1363 	The number of worker threads used for processing worker tasks.
1364 
1365 	Note that this function will cause the worker threads to be started,
1366 	if they haven't	already.
1367 
1368 	See_also: `runWorkerTask`, `runWorkerTaskH`, `runWorkerTaskDist`,
1369 	`setupWorkerThreads`
1370 */
1371 @property size_t workerThreadCount() nothrow
1372 	out(count) { assert(count > 0, "No worker threads started after setupWorkerThreads!?"); }
1373 do {
1374 	setupWorkerThreads();
1375 	st_threadsMutex.lock_nothrow();
1376 	scope (exit) st_threadsMutex.unlock_nothrow();
1377 
1378 	return st_workerPool.threadCount;
1379 }
1380 
1381 
1382 /**
1383 	Disables the signal handlers usually set up by vibe.d.
1384 
1385 	During the first call to `runEventLoop`, vibe.d usually sets up a set of
1386 	event handlers for SIGINT, SIGTERM and SIGPIPE. Since in some situations
1387 	this can be undesirable, this function can be called before the first
1388 	invocation of the event loop to avoid this.
1389 
1390 	Calling this function after `runEventLoop` will have no effect.
1391 */
1392 void disableDefaultSignalHandlers()
1393 {
1394 	synchronized (st_threadsMutex)
1395 		s_disableSignalHandlers = true;
1396 }
1397 
1398 /**
1399 	Sets the effective user and group ID to the ones configured for privilege lowering.
1400 
1401 	This function is useful for services run as root to give up on the privileges that
1402 	they only need for initialization (such as listening on ports <= 1024 or opening
1403 	system log files).
1404 */
1405 void lowerPrivileges(string uname, string gname)
1406 @safe {
1407 	if (!isRoot()) return;
1408 	if (uname != "" || gname != "") {
1409 		static bool tryParse(T)(string s, out T n)
1410 		{
1411 			import std.conv, std.ascii;
1412 			if (!isDigit(s[0])) return false;
1413 			n = parse!T(s);
1414 			return s.length==0;
1415 		}
1416 		int uid = -1, gid = -1;
1417 		if (uname != "" && !tryParse(uname, uid)) uid = getUID(uname);
1418 		if (gname != "" && !tryParse(gname, gid)) gid = getGID(gname);
1419 		setUID(uid, gid);
1420 	} else logWarn("Vibe was run as root, and no user/group has been specified for privilege lowering. Running with full permissions.");
1421 }
1422 
1423 // ditto
1424 void lowerPrivileges()
1425 @safe {
1426 	lowerPrivileges(s_privilegeLoweringUserName, s_privilegeLoweringGroupName);
1427 }
1428 
1429 
1430 /**
1431 	Sets a callback that is invoked whenever a task changes its status.
1432 
1433 	This function is useful mostly for implementing debuggers that
1434 	analyze the life time of tasks, including task switches. Note that
1435 	the callback will only be called for debug builds.
1436 */
1437 void setTaskEventCallback(TaskEventCallback func)
1438 {
1439 	debug TaskFiber.ms_taskEventCallback = func;
1440 }
1441 
1442 /**
1443 	Sets a callback that is invoked whenever new task is created.
1444 
1445 	The callback is guaranteed to be invoked before the one set by
1446 	`setTaskEventCallback` for the same task handle.
1447 
1448 	This function is useful mostly for implementing debuggers that
1449 	analyze the life time of tasks, including task switches. Note that
1450 	the callback will only be called for debug builds.
1451 */
1452 void setTaskCreationCallback(TaskCreationCallback func)
1453 {
1454 	debug TaskFiber.ms_taskCreationCallback = func;
1455 }
1456 
1457 
1458 /**
1459 	A version string representing the current vibe.d core version
1460 */
1461 enum vibeVersionString = "1.22.3";
1462 
1463 
1464 /**
1465 	Generic file descriptor event.
1466 
1467 	This kind of event can be used to wait for events on a non-blocking
1468 	file descriptor. Note that this can usually only be used on socket
1469 	based file descriptors.
1470 */
1471 struct FileDescriptorEvent {
1472 	/** Event mask selecting the kind of events to listen for.
1473 	*/
1474 	enum Trigger {
1475 		none = 0,         /// Match no event (invalid value)
1476 		read = 1<<0,      /// React on read-ready events
1477 		write = 1<<1,     /// React on write-ready events
1478 		any = read|write  /// Match any kind of event
1479 	}
1480 
1481 	private {
1482 		static struct Context {
1483 			Trigger trigger;
1484 			shared(NativeEventDriver) driver;
1485 		}
1486 
1487 		StreamSocketFD m_socket;
1488 		Context* m_context;
1489 	}
1490 
1491 	@safe:
1492 
1493 	private this(int fd, Trigger event_mask)
1494 	nothrow {
1495 		m_socket = eventDriver.sockets.adoptStream(fd);
1496 		m_context = () @trusted { return &eventDriver.sockets.userData!Context(m_socket); } ();
1497 		m_context.trigger = event_mask;
1498 		m_context.driver = () @trusted { return cast(shared)eventDriver; } ();
1499 	}
1500 
1501 	this(this)
1502 	nothrow {
1503 		if (m_socket != StreamSocketFD.invalid)
1504 			eventDriver.sockets.addRef(m_socket);
1505 	}
1506 
1507 	~this()
1508 	nothrow {
1509 		if (m_socket != StreamSocketFD.invalid)
1510 			releaseHandle!"sockets"(m_socket, m_context.driver);
1511 	}
1512 
1513 
1514 	/** Waits for the selected event to occur.
1515 
1516 		Params:
1517 			which = Optional event mask to react only on certain events
1518 			timeout = Maximum time to wait for an event
1519 
1520 		Returns:
1521 			The overload taking the timeout parameter returns true if
1522 			an event was received on time and false otherwise.
1523 	*/
1524 	void wait(Trigger which = Trigger.any)
1525 	{
1526 		wait(Duration.max, which);
1527 	}
1528 	/// ditto
1529 	bool wait(Duration timeout, Trigger which = Trigger.any)
1530 	{
1531 		if ((which & m_context.trigger) == Trigger.none) return true;
1532 
1533 		assert((which & m_context.trigger) == Trigger.read, "Waiting for write event not yet supported.");
1534 
1535 		bool got_data;
1536 
1537 		alias readwaiter = Waitable!(IOCallback,
1538 			cb => eventDriver.sockets.waitForData(m_socket, cb),
1539 			cb => eventDriver.sockets.cancelRead(m_socket),
1540 			(fd, st, nb) { got_data = st == IOStatus.ok; }
1541 		);
1542 
1543 		asyncAwaitAny!(true, readwaiter)(timeout);
1544 
1545 		return got_data;
1546 	}
1547 }
1548 
1549 
1550 /**
1551 	Represents a timer.
1552 */
1553 struct Timer {
1554 	private {
1555 		NativeEventDriver m_driver;
1556 		TimerID m_id;
1557 		debug uint m_magicNumber = 0x4d34f916;
1558 	}
1559 
1560 	alias Callback = void delegate() @safe nothrow;
1561 
1562 	@safe:
1563 
1564 	private static Timer create(CALLABLE)(TimerID id, CALLABLE callback)
1565 	nothrow {
1566 		assert(id != TimerID.init, "Invalid timer ID.");
1567 
1568 		Timer ret;
1569 		ret.m_driver = eventDriver;
1570 		ret.m_id = id;
1571 
1572 		static if (is(typeof(!callback)))
1573 			if (!callback)
1574 				return ret;
1575 
1576 		ret.m_driver.timers.userData!CALLABLE(id) = callback;
1577 		ret.m_driver.timers.wait(id, &TimerCallbackHandler!CALLABLE.instance.handle);
1578 
1579 		return ret;
1580 	}
1581 
1582 	this(this)
1583 	nothrow {
1584 		debug assert(m_magicNumber == 0x4d34f916, "Timer corrupted.");
1585 		if (m_driver) m_driver.timers.addRef(m_id);
1586 	}
1587 
1588 	~this()
1589 	nothrow {
1590 		debug assert(m_magicNumber == 0x4d34f916, "Timer corrupted.");
1591 		if (m_driver) releaseHandle!"timers"(m_id, () @trusted { return cast(shared)m_driver; } ());
1592 	}
1593 
1594 	/// True if the timer is yet to fire.
1595 	@property bool pending() nothrow { return m_driver.timers.isPending(m_id); }
1596 
1597 	/// The internal ID of the timer.
1598 	@property size_t id() const nothrow { return m_id; }
1599 
1600 	bool opCast() const nothrow { return m_driver !is null; }
1601 
1602 	/// Determines if this reference is the only one
1603 	@property bool unique() const nothrow { return m_driver ? m_driver.timers.isUnique(m_id) : false; }
1604 
1605 	/** Resets the timer to the specified timeout
1606 	*/
1607 	void rearm(Duration dur, bool periodic = false) nothrow
1608 		in { assert(dur >= 0.seconds, "Negative timer duration specified."); }
1609 	    do { m_driver.timers.set(m_id, dur, periodic ? dur : 0.seconds); }
1610 
1611 	/** Resets the timer and avoids any firing.
1612 	*/
1613 	void stop() nothrow { if (m_driver) m_driver.timers.stop(m_id); }
1614 
1615 	/** Waits until the timer fires.
1616 
1617 		This method may only be used if no timer callback has been specified.
1618 
1619 		Returns:
1620 			`true` is returned $(I iff) the timer was fired.
1621 	*/
1622 	bool wait()
1623 	{
1624 		auto cb = m_driver.timers.userData!Callback(m_id);
1625 		assert(cb is null, "Cannot wait on a timer that was created with a callback.");
1626 
1627 		auto res = asyncAwait!(TimerCallback2,
1628 			cb => m_driver.timers.wait(m_id, cb),
1629 			cb => m_driver.timers.cancelWait(m_id)
1630 		);
1631 		return res[1];
1632 	}
1633 
1634 	/** Waits until the timer fires.
1635 
1636 		Same as `wait`, except that `Task.interrupt` has no effect on the wait.
1637 	*/
1638 	bool waitUninterruptible()
1639 	nothrow {
1640 		auto cb = m_driver.timers.userData!Callback(m_id);
1641 		assert(cb is null, "Cannot wait on a timer that was created with a callback.");
1642 
1643 		auto res = asyncAwaitUninterruptible!(TimerCallback2,
1644 			cb => m_driver.timers.wait(m_id, cb)
1645 		);
1646 		return res[1];
1647 	}
1648 }
1649 
1650 private struct TimerCallbackHandler(CALLABLE) {
1651 	static __gshared TimerCallbackHandler ms_instance;
1652 	static @property ref TimerCallbackHandler instance() @trusted nothrow { return ms_instance; }
1653 
1654 	void handle(TimerID timer, bool fired)
1655 	@safe nothrow {
1656 		if (fired) {
1657 			auto l = yieldLock();
1658 			auto cb = () @trusted { return &eventDriver.timers.userData!CALLABLE(timer); } ();
1659 			static if (is(typeof(CALLABLE.init(Timer.init)))) {
1660 				Timer tm;
1661 				tm.m_driver = eventDriver;
1662 				tm.m_id = timer;
1663 				eventDriver.timers.addRef(timer);
1664 				(*cb)(tm);
1665 			} else (*cb)();
1666 		}
1667 
1668 		if (!eventDriver.timers.isUnique(timer) || eventDriver.timers.isPending(timer))
1669 			eventDriver.timers.wait(timer, &handle);
1670 	}
1671 }
1672 
1673 
1674 /** Returns an object that ensures that no task switches happen during its life time.
1675 
1676 	Any attempt to run the event loop or switching to another task will cause
1677 	an assertion to be thrown within the scope that defines the lifetime of the
1678 	returned object.
1679 
1680 	Multiple yield locks can appear in nested scopes.
1681 */
1682 auto yieldLock()
1683 @safe nothrow {
1684 	static struct YieldLock {
1685 	@safe nothrow:
1686 		private bool m_initialized;
1687 
1688 		private this(bool) { m_initialized = true; inc(); }
1689 		@disable this(this);
1690 		~this() { if (m_initialized) dec(); }
1691 
1692 		private void inc()
1693 		{
1694 			TaskFiber.getThis().m_yieldLockCount++;
1695 		}
1696 
1697 		private void dec()
1698 		{
1699 			assert(TaskFiber.getThis().m_yieldLockCount > 0);
1700 			TaskFiber.getThis().m_yieldLockCount--;
1701 		}
1702 	}
1703 
1704 	return YieldLock(true);
1705 }
1706 
1707 unittest {
1708 	auto tf = TaskFiber.getThis();
1709 	assert(tf.m_yieldLockCount == 0);
1710 	{
1711 		auto lock = yieldLock();
1712 		assert(tf.m_yieldLockCount == 1);
1713 		{
1714 			auto lock2 = yieldLock();
1715 			assert(tf.m_yieldLockCount == 2);
1716 		}
1717 		assert(tf.m_yieldLockCount == 1);
1718 	}
1719 	assert(tf.m_yieldLockCount == 0);
1720 
1721 	{
1722 		typeof(yieldLock()) l;
1723 		assert(tf.m_yieldLockCount == 0);
1724 	}
1725 	assert(tf.m_yieldLockCount == 0);
1726 }
1727 
1728 debug (VibeRunningTasks) {
1729 	/** Dumps a list of all active tasks of the calling thread.
1730 	*/
1731 	void printRunningTasks()
1732 	@safe nothrow {
1733 		string threadname = "unknown";
1734 		try threadname = Thread.getThis.name;
1735 		catch (Exception e) {}
1736 		size_t cnt = 0;
1737 		foreach (kv; TaskFiber.s_runningTasks.byKeyValue) {
1738 			auto t = kv.key.task;
1739 			logInfo("%s (%s): %s", t.getDebugID, threadname, kv.value);
1740 			cnt++;
1741 		}
1742 		logInfo("===================================================");
1743 		logInfo("%s: %s tasks currently active, %s available",
1744 			threadname, cnt, s_availableFibers.length);
1745 	}
1746 }
1747 
1748 
1749 /**************************************************************************************************/
1750 /* private types                                                                                  */
1751 /**************************************************************************************************/
1752 
1753 
1754 private void setupGcTimer()
1755 {
1756 	s_gcTimer = createTimer(() @trusted {
1757 		import core.memory;
1758 		logTrace("gc idle collect");
1759 		GC.collect();
1760 		GC.minimize();
1761 		s_ignoreIdleForGC = true;
1762 	});
1763 	s_gcCollectTimeout = dur!"seconds"(2);
1764 }
1765 
1766 package(vibe) void performIdleProcessing(bool force_process_events = false)
1767 @safe nothrow {
1768 	debug (VibeTaskLog) logTrace("Performing idle processing...");
1769 
1770 	bool again = !getExitFlag();
1771 	while (again) {
1772 		again = performIdleProcessingOnce(force_process_events);
1773 		force_process_events = true;
1774 	}
1775 
1776 	if (s_scheduler.scheduledTaskCount) logDebug("Exiting from idle processing although there are still yielded tasks");
1777 
1778 	if (s_exitEventLoop) return;
1779 
1780 	if (!s_ignoreIdleForGC && s_gcTimer) {
1781 		s_gcTimer.rearm(s_gcCollectTimeout);
1782 	} else s_ignoreIdleForGC = false;
1783 }
1784 
1785 private bool performIdleProcessingOnce(bool process_events)
1786 @safe nothrow {
1787 	if (process_events) {
1788 		auto er = eventDriver.core.processEvents(0.seconds);
1789 		if (er.among!(ExitReason.exited, ExitReason.outOfWaiters) && s_scheduler.scheduledTaskCount == 0) {
1790 			if (s_eventLoopRunning) {
1791 				logDebug("Setting exit flag due to driver signalling exit: %s", er);
1792 				s_exitEventLoop = true;
1793 			}
1794 			return false;
1795 		}
1796 	}
1797 
1798 	bool again;
1799 	if (s_idleHandler)
1800 		again = s_idleHandler();
1801 
1802 	return (s_scheduler.schedule() == ScheduleStatus.busy || again) && !getExitFlag();
1803 }
1804 
1805 
1806 private struct ThreadContext {
1807 	Thread thread;
1808 }
1809 
1810 /**************************************************************************************************/
1811 /* private functions                                                                              */
1812 /**************************************************************************************************/
1813 
1814 private {
1815 	Duration s_gcCollectTimeout;
1816 	Timer s_gcTimer;
1817 	bool s_ignoreIdleForGC = false;
1818 
1819 	__gshared core.sync.mutex.Mutex st_threadsMutex;
1820 	shared TaskPool st_workerPool;
1821 	shared ManualEvent st_threadsSignal;
1822 	__gshared ThreadContext[] st_threads;
1823 	__gshared Condition st_threadShutdownCondition;
1824 	shared bool st_term = false;
1825 
1826 	bool s_isMainThread = false; // set in shared static this
1827 	bool s_exitEventLoop = false;
1828 	package bool s_eventLoopRunning = false;
1829 	bool delegate() @safe nothrow s_idleHandler;
1830 
1831 	TaskScheduler s_scheduler;
1832 	FixedRingBuffer!TaskFiber s_availableFibers;
1833 	size_t s_maxRecycledFibers = 100;
1834 
1835 	string s_privilegeLoweringUserName;
1836 	string s_privilegeLoweringGroupName;
1837 	__gshared bool s_disableSignalHandlers = false;
1838 }
1839 
1840 private bool getExitFlag()
1841 @trusted nothrow {
1842 	return s_exitEventLoop || atomicLoad(st_term);
1843 }
1844 
1845 package @property bool isEventLoopRunning() @safe nothrow @nogc { return s_eventLoopRunning; }
1846 package @property ref TaskScheduler taskScheduler() @safe nothrow @nogc { return s_scheduler; }
1847 
1848 package void recycleFiber(TaskFiber fiber)
1849 @safe nothrow {
1850 	if (s_availableFibers.length >= s_maxRecycledFibers) {
1851 		auto fl = s_availableFibers.front;
1852 		s_availableFibers.popFront();
1853 		fl.shutdown();
1854 		() @trusted {
1855 			try destroy(fl);
1856 			catch (Exception e) logWarn("Failed to destroy fiber: %s", e.msg);
1857 		} ();
1858 	}
1859 
1860 	if (s_availableFibers.full)
1861 		s_availableFibers.capacity = 2 * s_availableFibers.capacity;
1862 
1863 	s_availableFibers.put(fiber);
1864 }
1865 
1866 private void setupSignalHandlers()
1867 @trusted nothrow {
1868 	scope (failure) assert(false); // _d_monitorexit is not nothrow
1869 	__gshared bool s_setup = false;
1870 
1871 	// only initialize in main thread
1872 	synchronized (st_threadsMutex) {
1873 		if (s_setup) return;
1874 		s_setup = true;
1875 
1876 		if (s_disableSignalHandlers) return;
1877 
1878 		logTrace("setup signal handler");
1879 		version(Posix){
1880 			// support proper shutdown using signals
1881 			sigset_t sigset;
1882 			sigemptyset(&sigset);
1883 			sigaction_t siginfo;
1884 			siginfo.sa_handler = &onSignal;
1885 			siginfo.sa_mask = sigset;
1886 			siginfo.sa_flags = SA_RESTART;
1887 			sigaction(SIGINT, &siginfo, null);
1888 			sigaction(SIGTERM, &siginfo, null);
1889 
1890 			siginfo.sa_handler = &onBrokenPipe;
1891 			sigaction(SIGPIPE, &siginfo, null);
1892 		}
1893 
1894 		version(Windows){
1895 			// WORKAROUND: we don't care about viral @nogc attribute here!
1896 			import std.traits;
1897 			signal(SIGTERM, cast(ParameterTypeTuple!signal[1])&onSignal);
1898 			signal(SIGINT, cast(ParameterTypeTuple!signal[1])&onSignal);
1899 		}
1900 	}
1901 }
1902 
1903 // per process setup
1904 shared static this()
1905 {
1906 	s_isMainThread = true;
1907 
1908 	// COMPILER BUG: Must be some kind of module constructor order issue:
1909 	//    without this, the stdout/stderr handles are not initialized before
1910 	//    the log module is set up.
1911 	import std.stdio; File f; f.close();
1912 
1913 	initializeLogModule();
1914 
1915 	logTrace("create driver core");
1916 
1917 	st_threadsMutex = new Mutex;
1918 	st_threadShutdownCondition = new Condition(st_threadsMutex);
1919 
1920 	auto thisthr = Thread.getThis();
1921 	thisthr.name = "main";
1922 	assert(st_threads.length == 0, "Main thread not the first thread!?");
1923 	st_threads ~= ThreadContext(thisthr);
1924 
1925 	st_threadsSignal = createSharedManualEvent();
1926 
1927 	version(VibeIdleCollect) {
1928 		logTrace("setup gc");
1929 		setupGcTimer();
1930 	}
1931 
1932 	version (VibeNoDefaultArgs) {}
1933 	else {
1934 		readOption("uid|user", &s_privilegeLoweringUserName, "Sets the user name or id used for privilege lowering.");
1935 		readOption("gid|group", &s_privilegeLoweringGroupName, "Sets the group name or id used for privilege lowering.");
1936 	}
1937 
1938 	import std.concurrency;
1939 	scheduler = new VibedScheduler;
1940 }
1941 
1942 shared static ~this()
1943 {
1944 	shutdownDriver();
1945 
1946 	size_t tasks_left = s_scheduler.scheduledTaskCount;
1947 
1948 	if (tasks_left > 0) {
1949 		logWarn("There were still %d tasks running at exit.", tasks_left);
1950 
1951 		debug (VibeRunningTasks)
1952 			printRunningTasks();
1953 	}
1954 }
1955 
1956 // per thread setup
1957 static this()
1958 {
1959 	/// workaround for:
1960 	// object.Exception@src/rt/minfo.d(162): Aborting: Cycle detected between modules with ctors/dtors:
1961 	// vibe.core.core -> vibe.core.drivers.native -> vibe.core.drivers.libasync -> vibe.core.core
1962 	if (Thread.getThis().isDaemon && Thread.getThis().name == "CmdProcessor") return;
1963 
1964 	auto thisthr = Thread.getThis();
1965 	synchronized (st_threadsMutex)
1966 		if (!st_threads.any!(c => c.thread is thisthr))
1967 			st_threads ~= ThreadContext(thisthr);
1968 }
1969 
1970 static ~this()
1971 {
1972 	auto thisthr = Thread.getThis();
1973 
1974 	bool is_main_thread = s_isMainThread;
1975 
1976 	synchronized (st_threadsMutex) {
1977 		auto idx = st_threads.countUntil!(c => c.thread is thisthr);
1978 		logDebug("Thread exit %s (index %s) (main=%s)", thisthr.name, idx, is_main_thread);
1979 	}
1980 
1981 	if (is_main_thread) {
1982 		logDiagnostic("Main thread exiting");
1983 		shutdownWorkerPool();
1984 	}
1985 
1986 	synchronized (st_threadsMutex) {
1987 		auto idx = st_threads.countUntil!(c => c.thread is thisthr);
1988 		assert(idx >= 0, "No more threads registered");
1989 		if (idx >= 0) {
1990 			st_threads[idx] = st_threads[$-1];
1991 			st_threads.length--;
1992 		}
1993 	}
1994 
1995 	// delay deletion of the main event driver to "~shared static this()"
1996 	if (!is_main_thread) shutdownDriver();
1997 
1998 	st_threadShutdownCondition.notifyAll();
1999 }
2000 
2001 private void shutdownWorkerPool()
2002 nothrow {
2003 	shared(TaskPool) tpool;
2004 
2005 	try synchronized (st_threadsMutex) swap(tpool, st_workerPool);
2006 	catch (Exception e) assert(false, e.msg);
2007 
2008 	if (tpool) {
2009 		logDiagnostic("Still waiting for worker threads to exit.");
2010 		tpool.terminate();
2011 	}
2012 }
2013 
2014 private void shutdownDriver()
2015 {
2016 	if (ManualEvent.ms_threadEvent != EventID.init) {
2017 		eventDriver.events.releaseRef(ManualEvent.ms_threadEvent);
2018 		ManualEvent.ms_threadEvent = EventID.init;
2019 	}
2020 
2021 	static if (is(typeof(tryGetEventDriver()))) {
2022 		// avoid creating an event driver on threads that don't actually have one
2023 		if (auto drv = tryGetEventDriver())
2024 			drv.dispose();
2025 	} else eventDriver.dispose();
2026 }
2027 
2028 
2029 private void watchExitFlag()
2030 nothrow {
2031 	auto emit_count = st_threadsSignal.emitCount;
2032 	while (true) {
2033 		{
2034 			st_threadsMutex.lock_nothrow();
2035 			scope (exit) st_threadsMutex.unlock_nothrow();
2036 			if (getExitFlag()) break;
2037 		}
2038 
2039 		try emit_count = st_threadsSignal.wait(emit_count);
2040 		catch (InterruptException e) return;
2041 		catch (Exception e) assert(false, e.msg);
2042 	}
2043 
2044 	logDebug("main thread exit");
2045 	eventDriver.core.exit();
2046 }
2047 
2048 private extern(C) void extrap()
2049 @safe nothrow {
2050 	logTrace("exception trap");
2051 }
2052 
2053 private extern(C) void onSignal(int signal)
2054 nothrow {
2055 	logInfo("Received signal %d. Shutting down.", signal);
2056 	atomicStore(st_term, true);
2057 	try st_threadsSignal.emit();
2058 	catch (Throwable th) {
2059 		logDebug("Failed to notify for event loop exit: %s", th.msg);
2060 	}
2061 
2062 	// Stop the event loop of the current thread directly instead of relying on
2063 	// the st_term mechanic, as the event loop might be in a waiting state, so
2064 	// that no tasks can get scheduled before an event arrives. Explicitly
2065 	// exiting the event loop on the other hand always terminates the wait state.
2066 	if (auto drv = tryGetEventDriver())
2067 		drv.core.exit();
2068 }
2069 
2070 private extern(C) void onBrokenPipe(int signal)
2071 nothrow {
2072 	logTrace("Broken pipe.");
2073 }
2074 
2075 version(Posix)
2076 {
2077 	private bool isRoot() @trusted { return geteuid() == 0; }
2078 
2079 	private void setUID(int uid, int gid)
2080 	@trusted {
2081 		logInfo("Lowering privileges to uid=%d, gid=%d...", uid, gid);
2082 		if (gid >= 0) {
2083 			enforce(getgrgid(gid) !is null, "Invalid group id!");
2084 			enforce(setegid(gid) == 0, "Error setting group id!");
2085 		}
2086 		//if( initgroups(const char *user, gid_t group);
2087 		if (uid >= 0) {
2088 			enforce(getpwuid(uid) !is null, "Invalid user id!");
2089 			enforce(seteuid(uid) == 0, "Error setting user id!");
2090 		}
2091 	}
2092 
2093 	private int getUID(string name)
2094 	@trusted {
2095 		auto pw = getpwnam(name.toStringz());
2096 		enforce(pw !is null, "Unknown user name: "~name);
2097 		return pw.pw_uid;
2098 	}
2099 
2100 	private int getGID(string name)
2101 	@trusted {
2102 		auto gr = getgrnam(name.toStringz());
2103 		enforce(gr !is null, "Unknown group name: "~name);
2104 		return gr.gr_gid;
2105 	}
2106 } else version(Windows){
2107 	private bool isRoot() @safe { return false; }
2108 
2109 	private void setUID(int uid, int gid)
2110 	@safe {
2111 		enforce(false, "UID/GID not supported on Windows.");
2112 	}
2113 
2114 	private int getUID(string name)
2115 	@safe {
2116 		enforce(false, "Privilege lowering not supported on Windows.");
2117 		assert(false);
2118 	}
2119 
2120 	private int getGID(string name)
2121 	@safe {
2122 		enforce(false, "Privilege lowering not supported on Windows.");
2123 		assert(false);
2124 	}
2125 }