1 /**
2 	This module contains the core functionality of the vibe.d framework.
3 
4 	Copyright: © 2012-2020 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.core.core;
9 
10 public import vibe.core.task;
11 
12 import eventcore.core;
13 import vibe.core.args;
14 import vibe.core.concurrency;
15 import vibe.core.internal.release;
16 import vibe.core.log;
17 import vibe.core.sync : ManualEvent, createSharedManualEvent;
18 import vibe.core.taskpool : TaskPool;
19 import vibe.internal.async;
20 import vibe.internal.array : FixedRingBuffer;
21 //import vibe.utils.array;
22 import std.algorithm;
23 import std.conv;
24 import std.encoding;
25 import core.exception;
26 import std.exception;
27 import std.functional;
28 import std.range : empty, front, popFront;
29 import std..string;
30 import std.traits : isFunctionPointer;
31 import std.typecons : Flag, Yes, Typedef, Tuple, tuple;
32 import core.atomic;
33 import core.sync.condition;
34 import core.sync.mutex;
35 import core.stdc.stdlib;
36 import core.thread;
37 
38 version(Posix)
39 {
40 	import core.sys.posix.signal;
41 	import core.sys.posix.unistd;
42 	import core.sys.posix.pwd;
43 
44 	static if (__traits(compiles, {import core.sys.posix.grp; getgrgid(0);})) {
45 		import core.sys.posix.grp;
46 	} else {
47 		extern (C) {
48 			struct group {
49 				char*   gr_name;
50 				char*   gr_passwd;
51 				gid_t   gr_gid;
52 				char**  gr_mem;
53 			}
54 			group* getgrgid(gid_t);
55 			group* getgrnam(in char*);
56 		}
57 	}
58 }
59 
60 version (Windows)
61 {
62 	import core.stdc.signal;
63 }
64 
65 
66 /**************************************************************************************************/
67 /* Public functions                                                                               */
68 /**************************************************************************************************/
69 
70 /**
71 	Performs final initialization and runs the event loop.
72 
73 	This function performs three tasks:
74 	$(OL
75 		$(LI Makes sure that no unrecognized command line options are passed to
76 			the application and potentially displays command line help. See also
77 			`vibe.core.args.finalizeCommandLineOptions`.)
78 		$(LI Performs privilege lowering if required.)
79 		$(LI Runs the event loop and blocks until it finishes.)
80 	)
81 
82 	Params:
83 		args_out = Optional parameter to receive unrecognized command line
84 			arguments. If left to `null`, an error will be reported if
85 			any unrecognized argument is passed.
86 
87 	See_also: ` vibe.core.args.finalizeCommandLineOptions`, `lowerPrivileges`,
88 		`runEventLoop`
89 */
90 int runApplication(string[]* args_out = null)
91 @safe {
92 	try if (!() @trusted { return finalizeCommandLineOptions(args_out); } () ) return 0;
93 	catch (Exception e) {
94 		logDiagnostic("Error processing command line: %s", e.msg);
95 		return 1;
96 	}
97 
98 	lowerPrivileges();
99 
100 	logDiagnostic("Running event loop...");
101 	int status;
102 	version (VibeDebugCatchAll) {
103 		try {
104 			status = runEventLoop();
105 		} catch( Throwable th ){
106 			logError("Unhandled exception in event loop: %s", th.msg);
107 			logDiagnostic("Full exception: %s", th.toString().sanitize());
108 			return 1;
109 		}
110 	} else {
111 		status = runEventLoop();
112 	}
113 
114 	logDiagnostic("Event loop exited with status %d.", status);
115 	return status;
116 }
117 
118 /// A simple echo server, listening on a privileged TCP port.
119 unittest {
120 	import vibe.core.core;
121 	import vibe.core.net;
122 
123 	int main()
124 	{
125 		// first, perform any application specific setup (privileged ports still
126 		// available if run as root)
127 		listenTCP(7, (conn) {
128 			try conn.write(conn);
129 			catch (Exception e) { /* log error */ }
130 		});
131 
132 		// then use runApplication to perform the remaining initialization and
133 		// to run the event loop
134 		return runApplication();
135 	}
136 }
137 
138 /** The same as above, but performing the initialization sequence manually.
139 
140 	This allows to skip any additional initialization (opening the listening
141 	port) if an invalid command line argument or the `--help`  switch is
142 	passed to the application.
143 */
144 unittest {
145 	import vibe.core.core;
146 	import vibe.core.net;
147 
148 	int main()
149 	{
150 		// process the command line first, to be able to skip the application
151 		// setup if not required
152 		if (!finalizeCommandLineOptions()) return 0;
153 
154 		// then set up the application
155 		listenTCP(7, (conn) {
156 			try conn.write(conn);
157 			catch (Exception e) { /* log error */ }
158 		});
159 
160 		// finally, perform privilege lowering (safe to skip for non-server
161 		// applications)
162 		lowerPrivileges();
163 
164 		// and start the event loop
165 		return runEventLoop();
166 	}
167 }
168 
169 /**
170 	Starts the vibe.d event loop for the calling thread.
171 
172 	Note that this function is usually called automatically by the vibe.d
173 	framework. However, if you provide your own `main()` function, you may need
174 	to call either this or `runApplication` manually.
175 
176 	The event loop will by default continue running during the whole life time
177 	of the application, but calling `runEventLoop` multiple times in sequence
178 	is allowed. Tasks will be started and handled only while the event loop is
179 	running.
180 
181 	Returns:
182 		The returned value is the suggested code to return to the operating
183 		system from the `main` function.
184 
185 	See_Also: `runApplication`
186 */
187 int runEventLoop()
188 @safe nothrow {
189 	setupSignalHandlers();
190 
191 	logDebug("Starting event loop.");
192 	s_eventLoopRunning = true;
193 	scope (exit) {
194 		eventDriver.core.clearExitFlag();
195 		s_eventLoopRunning = false;
196 		s_exitEventLoop = false;
197 		if (s_isMainThread) atomicStore(st_term, false);
198 		() @trusted nothrow {
199 			scope (failure) assert(false); // notifyAll is not marked nothrow
200 			st_threadShutdownCondition.notifyAll();
201 		} ();
202 	}
203 
204 	// runs any yield()ed tasks first
205 	assert(!s_exitEventLoop, "Exit flag set before event loop has started.");
206 	s_exitEventLoop = false;
207 	performIdleProcessing();
208 	if (getExitFlag()) return 0;
209 
210 	Task exit_task;
211 
212 	// handle exit flag in the main thread to exit when
213 	// exitEventLoop(true) is called from a thread)
214 	() @trusted nothrow {
215 		if (s_isMainThread)
216 			exit_task = runTask(toDelegate(&watchExitFlag));
217 	} ();
218 
219 	while (true) {
220 		auto er = s_scheduler.waitAndProcess();
221 		if (er != ExitReason.idle || s_exitEventLoop) {
222 			logDebug("Event loop exit reason (exit flag=%s): %s", s_exitEventLoop, er);
223 			break;
224 		}
225 		performIdleProcessing();
226 	}
227 
228 	// make sure the exit flag watch task finishes together with this loop
229 	// TODO: would be nice to do this without exceptions
230 	if (exit_task && exit_task.running)
231 		exit_task.interrupt();
232 
233 	logDebug("Event loop done (scheduled tasks=%s, waiters=%s, thread exit=%s).",
234 		s_scheduler.scheduledTaskCount, eventDriver.core.waiterCount, s_exitEventLoop);
235 	return 0;
236 }
237 
238 /**
239 	Stops the currently running event loop.
240 
241 	Calling this function will cause the event loop to stop event processing and
242 	the corresponding call to runEventLoop() will return to its caller.
243 
244 	Params:
245 		shutdown_all_threads = If true, exits event loops of all threads -
246 			false by default. Note that the event loops of all threads are
247 			automatically stopped when the main thread exits, so usually
248 			there is no need to set shutdown_all_threads to true.
249 */
250 void exitEventLoop(bool shutdown_all_threads = false)
251 @safe nothrow {
252 	logDebug("exitEventLoop called (%s)", shutdown_all_threads);
253 
254 	assert(s_eventLoopRunning || shutdown_all_threads, "Exiting event loop when none is running.");
255 	if (shutdown_all_threads) {
256 		() @trusted nothrow {
257 			shutdownWorkerPool();
258 			atomicStore(st_term, true);
259 			st_threadsSignal.emit();
260 		} ();
261 	}
262 
263 	// shutdown the calling thread
264 	s_exitEventLoop = true;
265 	if (s_eventLoopRunning) eventDriver.core.exit();
266 }
267 
268 /**
269 	Process all pending events without blocking.
270 
271 	Checks if events are ready to trigger immediately, and run their callbacks if so.
272 
273 	Returns: Returns false $(I iff) exitEventLoop was called in the process.
274 */
275 bool processEvents()
276 @safe nothrow {
277 	return !s_scheduler.process().among(ExitReason.exited, ExitReason.outOfWaiters);
278 }
279 
280 /**
281 	Wait once for events and process them.
282 */
283 ExitReason runEventLoopOnce()
284 @safe nothrow {
285 	auto ret = s_scheduler.waitAndProcess();
286 	if (ret == ExitReason.idle)
287 		performIdleProcessing();
288 	return ret;
289 }
290 
291 /**
292 	Sets a callback that is called whenever no events are left in the event queue.
293 
294 	The callback delegate is called whenever all events in the event queue have been
295 	processed. Returning true from the callback will cause another idle event to
296 	be triggered immediately after processing any events that have arrived in the
297 	meantime. Returning false will instead wait until another event has arrived first.
298 */
299 void setIdleHandler(void delegate() @safe nothrow del)
300 @safe nothrow {
301 	s_idleHandler = () @safe nothrow { del(); return false; };
302 }
303 /// ditto
304 void setIdleHandler(bool delegate() @safe nothrow del)
305 @safe nothrow {
306 	s_idleHandler = del;
307 }
308 
309 /**
310 	Runs a new asynchronous task.
311 
312 	task will be called synchronously from within the vibeRunTask call. It will
313 	continue to run until vibeYield() or any of the I/O or wait functions is
314 	called.
315 
316 	Note that the maximum size of all args must not exceed `maxTaskParameterSize`.
317 */
318 Task runTask(ARGS...)(void delegate(ARGS) @safe task, auto ref ARGS args)
319 {
320 	return runTask_internal!((ref tfi) { tfi.set(task, args); });
321 }
322 /// ditto
323 Task runTask(ARGS...)(void delegate(ARGS) @system task, auto ref ARGS args)
324 @system {
325 	return runTask_internal!((ref tfi) { tfi.set(task, args); });
326 }
327 /// ditto
328 Task runTask(CALLABLE, ARGS...)(CALLABLE task, auto ref ARGS args)
329 	if (!is(CALLABLE : void delegate(ARGS)) && is(typeof(CALLABLE.init(ARGS.init))))
330 {
331 	return runTask_internal!((ref tfi) { tfi.set(task, args); });
332 }
333 /// ditto
334 Task runTask(ARGS...)(TaskSettings settings, void delegate(ARGS) @safe task, auto ref ARGS args)
335 {
336 	return runTask_internal!((ref tfi) {
337 		tfi.settings = settings;
338 		tfi.set(task, args);
339 	});
340 }
341 /// ditto
342 Task runTask(ARGS...)(TaskSettings settings, void delegate(ARGS) @system task, auto ref ARGS args)
343 @system {
344 	return runTask_internal!((ref tfi) {
345 		tfi.settings = settings;
346 		tfi.set(task, args);
347 	});
348 }
349 /// ditto
350 Task runTask(CALLABLE, ARGS...)(TaskSettings settings, CALLABLE task, auto ref ARGS args)
351 	if (!is(CALLABLE : void delegate(ARGS)) && is(typeof(CALLABLE.init(ARGS.init))))
352 {
353 	return runTask_internal!((ref tfi) {
354 		tfi.settings = settings;
355 		tfi.set(task, args);
356 	});
357 }
358 
359 
360 unittest { // test proportional priority scheduling
361 	auto tm = setTimer(1000.msecs, { assert(false, "Test timeout"); });
362 	scope (exit) tm.stop();
363 
364 	size_t a, b;
365 	auto t1 = runTask(TaskSettings(1), { while (a + b < 100) { a++; yield(); } });
366 	auto t2 = runTask(TaskSettings(10), { while (a + b < 100) { b++; yield(); } });
367 	runTask({
368 		t1.join();
369 		t2.join();
370 		exitEventLoop();
371 	});
372 	runEventLoop();
373 	assert(a + b == 100);
374 	assert(b.among(90, 91, 92)); // expect 1:10 ratio +-1
375 }
376 
377 
378 /**
379 	Runs an asyncronous task that is guaranteed to finish before the caller's
380 	scope is left.
381 */
382 auto runTaskScoped(FT, ARGS)(scope FT callable, ARGS args)
383 {
384 	static struct S {
385 		Task handle;
386 
387 		@disable this(this);
388 
389 		~this()
390 		{
391 			handle.joinUninterruptible();
392 		}
393 	}
394 
395 	return S(runTask(callable, args));
396 }
397 
398 package Task runTask_internal(alias TFI_SETUP)()
399 {
400 	import std.typecons : Tuple, tuple;
401 
402 	TaskFiber f;
403 	while (!f && !s_availableFibers.empty) {
404 		f = s_availableFibers.back;
405 		s_availableFibers.popBack();
406 		if (() @trusted nothrow { return f.state; } () != Fiber.State.HOLD) f = null;
407 	}
408 
409 	if (f is null) {
410 		// if there is no fiber available, create one.
411 		if (s_availableFibers.capacity == 0) s_availableFibers.capacity = 1024;
412 		logDebugV("Creating new fiber...");
413 		f = new TaskFiber;
414 	}
415 
416 	TFI_SETUP(f.m_taskFunc);
417 
418 	f.bumpTaskCounter();
419 	auto handle = f.task();
420 
421 	debug if (TaskFiber.ms_taskCreationCallback) {
422 		TaskCreationInfo info;
423 		info.handle = handle;
424 		info.functionPointer = () @trusted { return cast(void*)f.m_taskFunc.functionPointer; } ();
425 		() @trusted { TaskFiber.ms_taskCreationCallback(info); } ();
426 	}
427 
428 	debug if (TaskFiber.ms_taskEventCallback) {
429 		() @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.preStart, handle); } ();
430 	}
431 
432 	switchToTask(handle);
433 
434 	debug if (TaskFiber.ms_taskEventCallback) {
435 		() @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.postStart, handle); } ();
436 	}
437 
438 	return handle;
439 }
440 
441 unittest { // ensure task.running is true directly after runTask
442 	Task t;
443 	bool hit = false;
444 	{
445 		auto l = yieldLock();
446 		t = runTask({ hit = true; });
447 		assert(!hit);
448 		assert(t.running);
449 	}
450 	t.join();
451 	assert(!t.running);
452 	assert(hit);
453 }
454 
455 
456 /**
457 	Runs a new asynchronous task in a worker thread.
458 
459 	Only function pointers with weakly isolated arguments are allowed to be
460 	able to guarantee thread-safety.
461 */
462 void runWorkerTask(FT, ARGS...)(FT func, auto ref ARGS args)
463 	if (isFunctionPointer!FT)
464 {
465 	setupWorkerThreads();
466 	st_workerPool.runTask(func, args);
467 }
468 
469 /// ditto
470 void runWorkerTask(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args)
471 	if (is(typeof(__traits(getMember, object, __traits(identifier, method)))))
472 {
473 	setupWorkerThreads();
474 	st_workerPool.runTask!method(object, args);
475 }
476 /// ditto
477 void runWorkerTask(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args)
478 	if (isFunctionPointer!FT)
479 {
480 	setupWorkerThreads();
481 	st_workerPool.runTask(settings, func, args);
482 }
483 
484 /// ditto
485 void runWorkerTask(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args)
486 	if (is(typeof(__traits(getMember, object, __traits(identifier, method)))))
487 {
488 	setupWorkerThreads();
489 	st_workerPool.runTask!method(settings, object, args);
490 }
491 
492 /**
493 	Runs a new asynchronous task in a worker thread, returning the task handle.
494 
495 	This function will yield and wait for the new task to be created and started
496 	in the worker thread, then resume and return it.
497 
498 	Only function pointers with weakly isolated arguments are allowed to be
499 	able to guarantee thread-safety.
500 */
501 Task runWorkerTaskH(FT, ARGS...)(FT func, auto ref ARGS args)
502 	if (isFunctionPointer!FT)
503 {
504 	setupWorkerThreads();
505 	return st_workerPool.runTaskH(func, args);
506 }
507 /// ditto
508 Task runWorkerTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args)
509 	if (is(typeof(__traits(getMember, object, __traits(identifier, method)))))
510 {
511 	setupWorkerThreads();
512 	return st_workerPool.runTaskH!method(object, args);
513 }
514 /// ditto
515 Task runWorkerTaskH(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args)
516 	if (isFunctionPointer!FT)
517 {
518 	setupWorkerThreads();
519 	return st_workerPool.runTaskH(settings, func, args);
520 }
521 /// ditto
522 Task runWorkerTaskH(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args)
523 	if (is(typeof(__traits(getMember, object, __traits(identifier, method)))))
524 {
525 	setupWorkerThreads();
526 	return st_workerPool.runTaskH!method(settings, object, args);
527 }
528 
529 /// Running a worker task using a function
530 unittest {
531 	static void workerFunc(int param)
532 	{
533 		logInfo("Param: %s", param);
534 	}
535 
536 	static void test()
537 	{
538 		runWorkerTask(&workerFunc, 42);
539 		runWorkerTask(&workerFunc, cast(ubyte)42); // implicit conversion #719
540 		runWorkerTaskDist(&workerFunc, 42);
541 		runWorkerTaskDist(&workerFunc, cast(ubyte)42); // implicit conversion #719
542 	}
543 }
544 
545 /// Running a worker task using a class method
546 unittest {
547 	static class Test {
548 		void workerMethod(int param)
549 		shared {
550 			logInfo("Param: %s", param);
551 		}
552 	}
553 
554 	static void test()
555 	{
556 		auto cls = new shared Test;
557 		runWorkerTask!(Test.workerMethod)(cls, 42);
558 		runWorkerTask!(Test.workerMethod)(cls, cast(ubyte)42); // #719
559 		runWorkerTaskDist!(Test.workerMethod)(cls, 42);
560 		runWorkerTaskDist!(Test.workerMethod)(cls, cast(ubyte)42); // #719
561 	}
562 }
563 
564 /// Running a worker task using a function and communicating with it
565 unittest {
566 	static void workerFunc(Task caller)
567 	{
568 		int counter = 10;
569 		while (receiveOnly!string() == "ping" && --counter) {
570 			logInfo("pong");
571 			caller.send("pong");
572 		}
573 		caller.send("goodbye");
574 
575 	}
576 
577 	static void test()
578 	{
579 		Task callee = runWorkerTaskH(&workerFunc, Task.getThis);
580 		do {
581 			logInfo("ping");
582 			callee.send("ping");
583 		} while (receiveOnly!string() == "pong");
584 	}
585 
586 	static void work719(int) {}
587 	static void test719() { runWorkerTaskH(&work719, cast(ubyte)42); }
588 }
589 
590 /// Running a worker task using a class method and communicating with it
591 unittest {
592 	static class Test {
593 		void workerMethod(Task caller) shared {
594 			int counter = 10;
595 			while (receiveOnly!string() == "ping" && --counter) {
596 				logInfo("pong");
597 				caller.send("pong");
598 			}
599 			caller.send("goodbye");
600 		}
601 	}
602 
603 	static void test()
604 	{
605 		auto cls = new shared Test;
606 		Task callee = runWorkerTaskH!(Test.workerMethod)(cls, Task.getThis());
607 		do {
608 			logInfo("ping");
609 			callee.send("ping");
610 		} while (receiveOnly!string() == "pong");
611 	}
612 
613 	static class Class719 {
614 		void work(int) shared {}
615 	}
616 	static void test719() {
617 		auto cls = new shared Class719;
618 		runWorkerTaskH!(Class719.work)(cls, cast(ubyte)42);
619 	}
620 }
621 
622 unittest { // run and join local task from outside of a task
623 	int i = 0;
624 	auto t = runTask({ sleep(1.msecs); i = 1; });
625 	t.join();
626 	assert(i == 1);
627 }
628 
629 unittest { // run and join worker task from outside of a task
630 	__gshared int i = 0;
631 	auto t = runWorkerTaskH({ sleep(5.msecs); i = 1; });
632 	t.join();
633 	assert(i == 1);
634 }
635 
636 
637 /**
638 	Runs a new asynchronous task in all worker threads concurrently.
639 
640 	This function is mainly useful for long-living tasks that distribute their
641 	work across all CPU cores. Only function pointers with weakly isolated
642 	arguments are allowed to be able to guarantee thread-safety.
643 
644 	The number of tasks started is guaranteed to be equal to
645 	`workerThreadCount`.
646 */
647 void runWorkerTaskDist(FT, ARGS...)(FT func, auto ref ARGS args)
648 	if (is(typeof(*func) == function))
649 {
650 	setupWorkerThreads();
651 	return st_workerPool.runTaskDist(func, args);
652 }
653 /// ditto
654 void runWorkerTaskDist(alias method, T, ARGS...)(shared(T) object, ARGS args)
655 {
656 	setupWorkerThreads();
657 	return st_workerPool.runTaskDist!method(object, args);
658 }
659 /// ditto
660 void runWorkerTaskDist(FT, ARGS...)(TaskSettings settings, FT func, auto ref ARGS args)
661 	if (is(typeof(*func) == function))
662 {
663 	setupWorkerThreads();
664 	return st_workerPool.runTaskDist(settings, func, args);
665 }
666 /// ditto
667 void runWorkerTaskDist(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, ARGS args)
668 {
669 	setupWorkerThreads();
670 	return st_workerPool.runTaskDist!method(settings, object, args);
671 }
672 
673 
674 /** Runs a new asynchronous task in all worker threads and returns the handles.
675 
676 	`on_handle` is a callble that takes a `Task` as its only argument and is
677 	called for every task instance that gets created.
678 
679 	See_also: `runWorkerTaskDist`
680 */
681 void runWorkerTaskDistH(HCB, FT, ARGS...)(scope HCB on_handle, FT func, auto ref ARGS args)
682 	if (is(typeof(*func) == function))
683 {
684 	setupWorkerThreads();
685 	st_workerPool.runTaskDistH(on_handle, func, args);
686 }
687 /// ditto
688 void runWorkerTaskDistH(HCB, FT, ARGS...)(TaskSettings settings, scope HCB on_handle, FT func, auto ref ARGS args)
689 	if (is(typeof(*func) == function))
690 {
691 	setupWorkerThreads();
692 	st_workerPool.runTaskDistH(settings, on_handle, func, args);
693 }
694 
695 
696 /**
697 	Sets up num worker threads.
698 
699 	This function gives explicit control over the number of worker threads.
700 	Note, to have an effect the function must be called prior to related worker
701 	tasks functions which set up the default number of worker threads
702 	implicitly.
703 
704 	Params:
705 		num = The number of worker threads to initialize. Defaults to
706 			`logicalProcessorCount`.
707 	See_also: `runWorkerTask`, `runWorkerTaskH`, `runWorkerTaskDist`
708 */
709 public void setupWorkerThreads(uint num = logicalProcessorCount())
710 @safe {
711 	static bool s_workerThreadsStarted = false;
712 	if (s_workerThreadsStarted) return;
713 	s_workerThreadsStarted = true;
714 
715 	() @trusted {
716 		synchronized (st_threadsMutex) {
717 			if (!st_workerPool)
718 				st_workerPool = new shared TaskPool(num);
719 		}
720 	} ();
721 }
722 
723 
724 /**
725 	Determines the number of logical processors in the system.
726 
727 	This number includes virtual cores on hyper-threading enabled CPUs.
728 */
729 public @property uint logicalProcessorCount()
730 {
731 	import std.parallelism : totalCPUs;
732 	return totalCPUs;
733 }
734 
735 
736 /**
737 	Suspends the execution of the calling task to let other tasks and events be
738 	handled.
739 
740 	Calling this function in short intervals is recommended if long CPU
741 	computations are carried out by a task. It can also be used in conjunction
742 	with Signals to implement cross-fiber events with no polling.
743 
744 	Throws:
745 		May throw an `InterruptException` if `Task.interrupt()` gets called on
746 		the calling task.
747 */
748 void yield()
749 @safe {
750 	auto t = Task.getThis();
751 	if (t != Task.init) {
752 		auto tf = () @trusted { return t.taskFiber; } ();
753 		tf.handleInterrupt();
754 		s_scheduler.yield();
755 		tf.handleInterrupt();
756 	} else {
757 		// Let yielded tasks execute
758 		assert(TaskFiber.getThis().m_yieldLockCount == 0, "May not yield within an active yieldLock()!");
759 		() @safe nothrow { performIdleProcessingOnce(true); } ();
760 	}
761 }
762 
763 unittest {
764 	size_t ti;
765 	auto t = runTask({
766 		for (ti = 0; ti < 10; ti++)
767 			yield();
768 	});
769 
770 	foreach (i; 0 .. 5) yield();
771 	assert(ti > 0 && ti < 10, "Task did not interleave with yield loop outside of task");
772 
773 	t.join();
774 	assert(ti == 10);
775 }
776 
777 
778 /**
779 	Suspends the execution of the calling task until `switchToTask` is called
780 	manually.
781 
782 	This low-level scheduling function is usually only used internally. Failure
783 	to call `switchToTask` will result in task starvation and resource leakage.
784 
785 	Params:
786 		on_interrupt = If specified, is required to
787 
788 	See_Also: `switchToTask`
789 */
790 void hibernate(scope void delegate() @safe nothrow on_interrupt = null)
791 @safe nothrow {
792 	auto t = Task.getThis();
793 	if (t == Task.init) {
794 		assert(TaskFiber.getThis().m_yieldLockCount == 0, "May not yield within an active yieldLock!");
795 		runEventLoopOnce();
796 	} else {
797 		auto tf = () @trusted { return t.taskFiber; } ();
798 		tf.handleInterrupt(on_interrupt);
799 		s_scheduler.hibernate();
800 		tf.handleInterrupt(on_interrupt);
801 	}
802 }
803 
804 
805 /**
806 	Switches execution to the given task.
807 
808 	This function can be used in conjunction with `hibernate` to wake up a
809 	task. The task must live in the same thread as the caller.
810 
811 	If no priority is specified, `TaskSwitchPriority.prioritized` or
812 	`TaskSwitchPriority.immediate` will be used, depending on whether a
813 	yield lock is currently active.
814 
815 	Note that it is illegal to use `TaskSwitchPriority.immediate` if a yield
816 	lock is active.
817 
818 	This function must only be called on tasks that belong to the calling
819 	thread and have previously been hibernated!
820 
821 	See_Also: `hibernate`, `yieldLock`
822 */
823 void switchToTask(Task t)
824 @safe nothrow {
825 	auto defer = TaskFiber.getThis().m_yieldLockCount > 0;
826 	s_scheduler.switchTo(t, defer ? TaskSwitchPriority.prioritized : TaskSwitchPriority.immediate);
827 }
828 /// ditto
829 void switchToTask(Task t, TaskSwitchPriority priority)
830 @safe nothrow {
831 	s_scheduler.switchTo(t, priority);
832 }
833 
834 
835 /**
836 	Suspends the execution of the calling task for the specified amount of time.
837 
838 	Note that other tasks of the same thread will continue to run during the
839 	wait time, in contrast to $(D core.thread.Thread.sleep), which shouldn't be
840 	used in vibe.d applications.
841 
842 	Throws: May throw an `InterruptException` if the task gets interrupted using
843 		`Task.interrupt()`.
844 */
845 void sleep(Duration timeout)
846 @safe {
847 	assert(timeout >= 0.seconds, "Argument to sleep must not be negative.");
848 	if (timeout <= 0.seconds) return;
849 	auto tm = setTimer(timeout, null);
850 	tm.wait();
851 }
852 ///
853 unittest {
854 	import vibe.core.core : sleep;
855 	import vibe.core.log : logInfo;
856 	import core.time : msecs;
857 
858 	void test()
859 	{
860 		logInfo("Sleeping for half a second...");
861 		sleep(500.msecs);
862 		logInfo("Done sleeping.");
863 	}
864 }
865 
866 
867 /**
868 	Returns a new armed timer.
869 
870 	Note that timers can only work if an event loop is running, explicitly or
871 	implicitly by running a blocking operation, such as `sleep` or `File.read`.
872 
873 	Params:
874 		timeout = Determines the minimum amount of time that elapses before the timer fires.
875 		callback = If non-`null`, this delegate will be called when the timer fires
876 		periodic = Speficies if the timer fires repeatedly or only once
877 
878 	Returns:
879 		Returns a Timer object that can be used to identify and modify the timer.
880 
881 	See_also: `createTimer`
882 */
883 Timer setTimer(Duration timeout, Timer.Callback callback, bool periodic = false)
884 @safe nothrow {
885 	auto tm = createTimer(callback);
886 	tm.rearm(timeout, periodic);
887 	return tm;
888 }
889 ///
890 unittest {
891 	void printTime()
892 	@safe nothrow {
893 		import std.datetime;
894 		logInfo("The time is: %s", Clock.currTime());
895 	}
896 
897 	void test()
898 	{
899 		import vibe.core.core;
900 		// start a periodic timer that prints the time every second
901 		setTimer(1.seconds, toDelegate(&printTime), true);
902 	}
903 }
904 
905 /// Compatibility overload - use a `@safe nothrow` callback instead.
906 Timer setTimer(Duration timeout, void delegate() callback, bool periodic = false)
907 @system nothrow {
908 	return setTimer(timeout, () @trusted nothrow {
909 		try callback();
910 		catch (Exception e) {
911 			logWarn("Timer callback failed: %s", e.msg);
912 			scope (failure) assert(false);
913 			logDebug("Full error: %s", e.toString().sanitize);
914 		}
915 	}, periodic);
916 }
917 
918 
919 /** Creates a new timer without arming it.
920 
921 	Each time `callback` gets invoked, it will be run inside of a newly started
922 	task.
923 
924 	Params:
925 		callback = If non-`null`, this delegate will be called when the timer
926 			fires
927 
928 	See_also: `createLeanTimer`, `setTimer`
929 */
930 Timer createTimer(void delegate() nothrow @safe callback = null)
931 @safe nothrow {
932 	static struct C {
933 		void delegate() nothrow @safe callback;
934 		void opCall() nothrow @safe { runTask(callback); }
935 	}
936 
937 	if (callback) {
938 		C c = {callback};
939 		return createLeanTimer(c);
940 	}
941 
942 	return createLeanTimer!(Timer.Callback)(null);
943 }
944 
945 
946 /** Creates a new timer with a lean callback mechanism.
947 
948 	In contrast to the standard `createTimer`, `callback` will not be called
949 	in a new task, but is instead called directly in the context of the event
950 	loop.
951 
952 	For this reason, the supplied callback is not allowed to perform any
953 	operation that needs to block/yield execution. In this case, `runTask`
954 	needs to be used explicitly to perform the operation asynchronously.
955 
956 	Additionally, `callback` can carry arbitrary state without requiring a heap
957 	allocation.
958 
959 	See_also: `createTimer`
960 */
961 Timer createLeanTimer(CALLABLE)(CALLABLE callback)
962 	if (is(typeof(() @safe nothrow { callback(); } ())))
963 {
964 	return Timer.create(eventDriver.timers.create(), callback);
965 }
966 
967 
968 /**
969 	Creates an event to wait on an existing file descriptor.
970 
971 	The file descriptor usually needs to be a non-blocking socket for this to
972 	work.
973 
974 	Params:
975 		file_descriptor = The Posix file descriptor to watch
976 		event_mask = Specifies which events will be listened for
977 
978 	Returns:
979 		Returns a newly created FileDescriptorEvent associated with the given
980 		file descriptor.
981 */
982 FileDescriptorEvent createFileDescriptorEvent(int file_descriptor, FileDescriptorEvent.Trigger event_mask)
983 @safe nothrow {
984 	return FileDescriptorEvent(file_descriptor, event_mask);
985 }
986 
987 
988 /**
989 	Sets the stack size to use for tasks.
990 
991 	The default stack size is set to 512 KiB on 32-bit systems and to 16 MiB
992 	on 64-bit systems, which is sufficient for most tasks. Tuning this value
993 	can be used to reduce memory usage for large numbers of concurrent tasks
994 	or to avoid stack overflows for applications with heavy stack use.
995 
996 	Note that this function must be called at initialization time, before any
997 	task is started to have an effect.
998 
999 	Also note that the stack will initially not consume actual physical memory -
1000 	it just reserves virtual address space. Only once the stack gets actually
1001 	filled up with data will physical memory then be reserved page by page. This
1002 	means that the stack can safely be set to large sizes on 64-bit systems
1003 	without having to worry about memory usage.
1004 */
1005 void setTaskStackSize(size_t sz)
1006 nothrow {
1007 	TaskFiber.ms_taskStackSize = sz;
1008 }
1009 
1010 
1011 /**
1012 	The number of worker threads used for processing worker tasks.
1013 
1014 	Note that this function will cause the worker threads to be started,
1015 	if they haven't	already.
1016 
1017 	See_also: `runWorkerTask`, `runWorkerTaskH`, `runWorkerTaskDist`,
1018 	`setupWorkerThreads`
1019 */
1020 @property size_t workerThreadCount()
1021 	out(count) { assert(count > 0, "No worker threads started after setupWorkerThreads!?"); }
1022 do {
1023 	setupWorkerThreads();
1024 	synchronized (st_threadsMutex)
1025 		return st_workerPool.threadCount;
1026 }
1027 
1028 
1029 /**
1030 	Disables the signal handlers usually set up by vibe.d.
1031 
1032 	During the first call to `runEventLoop`, vibe.d usually sets up a set of
1033 	event handlers for SIGINT, SIGTERM and SIGPIPE. Since in some situations
1034 	this can be undesirable, this function can be called before the first
1035 	invocation of the event loop to avoid this.
1036 
1037 	Calling this function after `runEventLoop` will have no effect.
1038 */
1039 void disableDefaultSignalHandlers()
1040 {
1041 	synchronized (st_threadsMutex)
1042 		s_disableSignalHandlers = true;
1043 }
1044 
1045 /**
1046 	Sets the effective user and group ID to the ones configured for privilege lowering.
1047 
1048 	This function is useful for services run as root to give up on the privileges that
1049 	they only need for initialization (such as listening on ports <= 1024 or opening
1050 	system log files).
1051 */
1052 void lowerPrivileges(string uname, string gname)
1053 @safe {
1054 	if (!isRoot()) return;
1055 	if (uname != "" || gname != "") {
1056 		static bool tryParse(T)(string s, out T n)
1057 		{
1058 			import std.conv, std.ascii;
1059 			if (!isDigit(s[0])) return false;
1060 			n = parse!T(s);
1061 			return s.length==0;
1062 		}
1063 		int uid = -1, gid = -1;
1064 		if (uname != "" && !tryParse(uname, uid)) uid = getUID(uname);
1065 		if (gname != "" && !tryParse(gname, gid)) gid = getGID(gname);
1066 		setUID(uid, gid);
1067 	} else logWarn("Vibe was run as root, and no user/group has been specified for privilege lowering. Running with full permissions.");
1068 }
1069 
1070 // ditto
1071 void lowerPrivileges()
1072 @safe {
1073 	lowerPrivileges(s_privilegeLoweringUserName, s_privilegeLoweringGroupName);
1074 }
1075 
1076 
1077 /**
1078 	Sets a callback that is invoked whenever a task changes its status.
1079 
1080 	This function is useful mostly for implementing debuggers that
1081 	analyze the life time of tasks, including task switches. Note that
1082 	the callback will only be called for debug builds.
1083 */
1084 void setTaskEventCallback(TaskEventCallback func)
1085 {
1086 	debug TaskFiber.ms_taskEventCallback = func;
1087 }
1088 
1089 /**
1090 	Sets a callback that is invoked whenever new task is created.
1091 
1092 	The callback is guaranteed to be invoked before the one set by
1093 	`setTaskEventCallback` for the same task handle.
1094 
1095 	This function is useful mostly for implementing debuggers that
1096 	analyze the life time of tasks, including task switches. Note that
1097 	the callback will only be called for debug builds.
1098 */
1099 void setTaskCreationCallback(TaskCreationCallback func)
1100 {
1101 	debug TaskFiber.ms_taskCreationCallback = func;
1102 }
1103 
1104 
1105 /**
1106 	A version string representing the current vibe.d core version
1107 */
1108 enum vibeVersionString = "1.9.3";
1109 
1110 
1111 /**
1112 	Generic file descriptor event.
1113 
1114 	This kind of event can be used to wait for events on a non-blocking
1115 	file descriptor. Note that this can usually only be used on socket
1116 	based file descriptors.
1117 */
1118 struct FileDescriptorEvent {
1119 	/** Event mask selecting the kind of events to listen for.
1120 	*/
1121 	enum Trigger {
1122 		none = 0,         /// Match no event (invalid value)
1123 		read = 1<<0,      /// React on read-ready events
1124 		write = 1<<1,     /// React on write-ready events
1125 		any = read|write  /// Match any kind of event
1126 	}
1127 
1128 	private {
1129 		static struct Context {
1130 			Trigger trigger;
1131 			shared(NativeEventDriver) driver;
1132 		}
1133 
1134 		StreamSocketFD m_socket;
1135 		Context* m_context;
1136 	}
1137 
1138 	@safe:
1139 
1140 	private this(int fd, Trigger event_mask)
1141 	nothrow {
1142 		m_socket = eventDriver.sockets.adoptStream(fd);
1143 		m_context = () @trusted { return &eventDriver.sockets.userData!Context(m_socket); } ();
1144 		m_context.trigger = event_mask;
1145 		m_context.driver = () @trusted { return cast(shared)eventDriver; } ();
1146 	}
1147 
1148 	this(this)
1149 	nothrow {
1150 		if (m_socket != StreamSocketFD.invalid)
1151 			eventDriver.sockets.addRef(m_socket);
1152 	}
1153 
1154 	~this()
1155 	nothrow {
1156 		if (m_socket != StreamSocketFD.invalid)
1157 			releaseHandle!"sockets"(m_socket, m_context.driver);
1158 	}
1159 
1160 
1161 	/** Waits for the selected event to occur.
1162 
1163 		Params:
1164 			which = Optional event mask to react only on certain events
1165 			timeout = Maximum time to wait for an event
1166 
1167 		Returns:
1168 			The overload taking the timeout parameter returns true if
1169 			an event was received on time and false otherwise.
1170 	*/
1171 	void wait(Trigger which = Trigger.any)
1172 	{
1173 		wait(Duration.max, which);
1174 	}
1175 	/// ditto
1176 	bool wait(Duration timeout, Trigger which = Trigger.any)
1177 	{
1178 		if ((which & m_context.trigger) == Trigger.none) return true;
1179 
1180 		assert((which & m_context.trigger) == Trigger.read, "Waiting for write event not yet supported.");
1181 
1182 		bool got_data;
1183 
1184 		alias readwaiter = Waitable!(IOCallback,
1185 			cb => eventDriver.sockets.waitForData(m_socket, cb),
1186 			cb => eventDriver.sockets.cancelRead(m_socket),
1187 			(fd, st, nb) { got_data = st == IOStatus.ok; }
1188 		);
1189 
1190 		asyncAwaitAny!(true, readwaiter)(timeout);
1191 
1192 		return got_data;
1193 	}
1194 }
1195 
1196 
1197 /**
1198 	Represents a timer.
1199 */
1200 struct Timer {
1201 	private {
1202 		NativeEventDriver m_driver;
1203 		TimerID m_id;
1204 		debug uint m_magicNumber = 0x4d34f916;
1205 	}
1206 
1207 	alias Callback = void delegate() @safe nothrow;
1208 
1209 	@safe:
1210 
1211 	private static Timer create(CALLABLE)(TimerID id, CALLABLE callback)
1212 	nothrow {
1213 		assert(id != TimerID.init, "Invalid timer ID.");
1214 
1215 		Timer ret;
1216 		ret.m_driver = eventDriver;
1217 		ret.m_id = id;
1218 
1219 		static if (is(typeof(!callback)))
1220 			if (!callback)
1221 				return ret;
1222 
1223 		ret.m_driver.timers.userData!CALLABLE(id) = callback;
1224 		ret.m_driver.timers.wait(id, &TimerCallbackHandler!CALLABLE.instance.handle);
1225 
1226 		return ret;
1227 	}
1228 
1229 	this(this)
1230 	nothrow {
1231 		debug assert(m_magicNumber == 0x4d34f916, "Timer corrupted.");
1232 		if (m_driver) m_driver.timers.addRef(m_id);
1233 	}
1234 
1235 	~this()
1236 	nothrow {
1237 		debug assert(m_magicNumber == 0x4d34f916, "Timer corrupted.");
1238 		if (m_driver) releaseHandle!"timers"(m_id, () @trusted { return cast(shared)m_driver; } ());
1239 	}
1240 
1241 	/// True if the timer is yet to fire.
1242 	@property bool pending() nothrow { return m_driver.timers.isPending(m_id); }
1243 
1244 	/// The internal ID of the timer.
1245 	@property size_t id() const nothrow { return m_id; }
1246 
1247 	bool opCast() const nothrow { return m_driver !is null; }
1248 
1249 	/// Determines if this reference is the only one
1250 	@property bool unique() const nothrow { return m_driver ? m_driver.timers.isUnique(m_id) : false; }
1251 
1252 	/** Resets the timer to the specified timeout
1253 	*/
1254 	void rearm(Duration dur, bool periodic = false) nothrow
1255 		in { assert(dur >= 0.seconds, "Negative timer duration specified."); }
1256 	    do { m_driver.timers.set(m_id, dur, periodic ? dur : 0.seconds); }
1257 
1258 	/** Resets the timer and avoids any firing.
1259 	*/
1260 	void stop() nothrow { if (m_driver) m_driver.timers.stop(m_id); }
1261 
1262 	/** Waits until the timer fires.
1263 
1264 		This method may only be used if no timer callback has been specified.
1265 
1266 		Returns:
1267 			`true` is returned $(I iff) the timer was fired.
1268 	*/
1269 	bool wait()
1270 	{
1271 		auto cb = m_driver.timers.userData!Callback(m_id);
1272 		assert(cb is null, "Cannot wait on a timer that was created with a callback.");
1273 
1274 		auto res = asyncAwait!(TimerCallback2,
1275 			cb => m_driver.timers.wait(m_id, cb),
1276 			cb => m_driver.timers.cancelWait(m_id)
1277 		);
1278 		return res[1];
1279 	}
1280 }
1281 
1282 private struct TimerCallbackHandler(CALLABLE) {
1283 	static __gshared TimerCallbackHandler ms_instance;
1284 	static @property ref TimerCallbackHandler instance() @trusted nothrow { return ms_instance; }
1285 
1286 	void handle(TimerID timer, bool fired)
1287 	@safe nothrow {
1288 		if (fired) {
1289 			auto cb = eventDriver.timers.userData!CALLABLE(timer);
1290 			auto l = yieldLock();
1291 			cb();
1292 		}
1293 
1294 		if (!eventDriver.timers.isUnique(timer) || eventDriver.timers.isPending(timer))
1295 			eventDriver.timers.wait(timer, &handle);
1296 	}
1297 }
1298 
1299 
1300 /** Returns an object that ensures that no task switches happen during its life time.
1301 
1302 	Any attempt to run the event loop or switching to another task will cause
1303 	an assertion to be thrown within the scope that defines the lifetime of the
1304 	returned object.
1305 
1306 	Multiple yield locks can appear in nested scopes.
1307 */
1308 auto yieldLock()
1309 @safe nothrow {
1310 	static struct YieldLock {
1311 	@safe nothrow:
1312 		private bool m_initialized;
1313 
1314 		private this(bool) { m_initialized = true; inc(); }
1315 		@disable this(this);
1316 		~this() { if (m_initialized) dec(); }
1317 
1318 		private void inc()
1319 		{
1320 			TaskFiber.getThis().m_yieldLockCount++;
1321 		}
1322 
1323 		private void dec()
1324 		{
1325 			assert(TaskFiber.getThis().m_yieldLockCount > 0);
1326 			TaskFiber.getThis().m_yieldLockCount--;
1327 		}
1328 	}
1329 
1330 	return YieldLock(true);
1331 }
1332 
1333 unittest {
1334 	auto tf = TaskFiber.getThis();
1335 	assert(tf.m_yieldLockCount == 0);
1336 	{
1337 		auto lock = yieldLock();
1338 		assert(tf.m_yieldLockCount == 1);
1339 		{
1340 			auto lock2 = yieldLock();
1341 			assert(tf.m_yieldLockCount == 2);
1342 		}
1343 		assert(tf.m_yieldLockCount == 1);
1344 	}
1345 	assert(tf.m_yieldLockCount == 0);
1346 
1347 	{
1348 		typeof(yieldLock()) l;
1349 		assert(tf.m_yieldLockCount == 0);
1350 	}
1351 	assert(tf.m_yieldLockCount == 0);
1352 }
1353 
1354 
1355 /**************************************************************************************************/
1356 /* private types                                                                                  */
1357 /**************************************************************************************************/
1358 
1359 
1360 private void setupGcTimer()
1361 {
1362 	s_gcTimer = createTimer(() @trusted {
1363 		import core.memory;
1364 		logTrace("gc idle collect");
1365 		GC.collect();
1366 		GC.minimize();
1367 		s_ignoreIdleForGC = true;
1368 	});
1369 	s_gcCollectTimeout = dur!"seconds"(2);
1370 }
1371 
1372 package(vibe) void performIdleProcessing(bool force_process_events = false)
1373 @safe nothrow {
1374 	bool again = !getExitFlag();
1375 	while (again) {
1376 		again = performIdleProcessingOnce(force_process_events);
1377 		force_process_events = true;
1378 	}
1379 
1380 	if (s_scheduler.scheduledTaskCount) logDebug("Exiting from idle processing although there are still yielded tasks");
1381 
1382 	if (s_exitEventLoop) return;
1383 
1384 	if (!s_ignoreIdleForGC && s_gcTimer) {
1385 		s_gcTimer.rearm(s_gcCollectTimeout);
1386 	} else s_ignoreIdleForGC = false;
1387 }
1388 
1389 private bool performIdleProcessingOnce(bool process_events)
1390 @safe nothrow {
1391 	if (process_events) {
1392 		auto er = eventDriver.core.processEvents(0.seconds);
1393 		if (er.among!(ExitReason.exited, ExitReason.outOfWaiters) && s_scheduler.scheduledTaskCount == 0) {
1394 			if (s_eventLoopRunning) {
1395 				logDebug("Setting exit flag due to driver signalling exit: %s", er);
1396 				s_exitEventLoop = true;
1397 			}
1398 			return false;
1399 		}
1400 	}
1401 
1402 	bool again;
1403 	if (s_idleHandler)
1404 		again = s_idleHandler();
1405 
1406 	return (s_scheduler.schedule() == ScheduleStatus.busy || again) && !getExitFlag();
1407 }
1408 
1409 
1410 private struct ThreadContext {
1411 	Thread thread;
1412 }
1413 
1414 /**************************************************************************************************/
1415 /* private functions                                                                              */
1416 /**************************************************************************************************/
1417 
1418 private {
1419 	Duration s_gcCollectTimeout;
1420 	Timer s_gcTimer;
1421 	bool s_ignoreIdleForGC = false;
1422 
1423 	__gshared core.sync.mutex.Mutex st_threadsMutex;
1424 	shared TaskPool st_workerPool;
1425 	shared ManualEvent st_threadsSignal;
1426 	__gshared ThreadContext[] st_threads;
1427 	__gshared Condition st_threadShutdownCondition;
1428 	shared bool st_term = false;
1429 
1430 	bool s_isMainThread = false; // set in shared static this
1431 	bool s_exitEventLoop = false;
1432 	package bool s_eventLoopRunning = false;
1433 	bool delegate() @safe nothrow s_idleHandler;
1434 
1435 	TaskScheduler s_scheduler;
1436 	FixedRingBuffer!TaskFiber s_availableFibers;
1437 	size_t s_maxRecycledFibers = 100;
1438 
1439 	string s_privilegeLoweringUserName;
1440 	string s_privilegeLoweringGroupName;
1441 	__gshared bool s_disableSignalHandlers = false;
1442 }
1443 
1444 private bool getExitFlag()
1445 @trusted nothrow {
1446 	return s_exitEventLoop || atomicLoad(st_term);
1447 }
1448 
1449 package @property bool isEventLoopRunning() @safe nothrow @nogc { return s_eventLoopRunning; }
1450 package @property ref TaskScheduler taskScheduler() @safe nothrow @nogc { return s_scheduler; }
1451 
1452 package void recycleFiber(TaskFiber fiber)
1453 @safe nothrow {
1454 	if (s_availableFibers.length >= s_maxRecycledFibers) {
1455 		auto fl = s_availableFibers.front;
1456 		s_availableFibers.popFront();
1457 		fl.shutdown();
1458 		() @trusted {
1459 			try destroy(fl);
1460 			catch (Exception e) logWarn("Failed to destroy fiber: %s", e.msg);
1461 		} ();
1462 	}
1463 
1464 	if (s_availableFibers.full)
1465 		s_availableFibers.capacity = 2 * s_availableFibers.capacity;
1466 
1467 	s_availableFibers.put(fiber);
1468 }
1469 
1470 private void setupSignalHandlers()
1471 @trusted nothrow {
1472 	scope (failure) assert(false); // _d_monitorexit is not nothrow
1473 	__gshared bool s_setup = false;
1474 
1475 	// only initialize in main thread
1476 	synchronized (st_threadsMutex) {
1477 		if (s_setup) return;
1478 		s_setup = true;
1479 
1480 		if (s_disableSignalHandlers) return;
1481 
1482 		logTrace("setup signal handler");
1483 		version(Posix){
1484 			// support proper shutdown using signals
1485 			sigset_t sigset;
1486 			sigemptyset(&sigset);
1487 			sigaction_t siginfo;
1488 			siginfo.sa_handler = &onSignal;
1489 			siginfo.sa_mask = sigset;
1490 			siginfo.sa_flags = SA_RESTART;
1491 			sigaction(SIGINT, &siginfo, null);
1492 			sigaction(SIGTERM, &siginfo, null);
1493 
1494 			siginfo.sa_handler = &onBrokenPipe;
1495 			sigaction(SIGPIPE, &siginfo, null);
1496 		}
1497 
1498 		version(Windows){
1499 			// WORKAROUND: we don't care about viral @nogc attribute here!
1500 			import std.traits;
1501 			signal(SIGTERM, cast(ParameterTypeTuple!signal[1])&onSignal);
1502 			signal(SIGINT, cast(ParameterTypeTuple!signal[1])&onSignal);
1503 		}
1504 	}
1505 }
1506 
1507 // per process setup
1508 shared static this()
1509 {
1510 	s_isMainThread = true;
1511 
1512 	// COMPILER BUG: Must be some kind of module constructor order issue:
1513 	//    without this, the stdout/stderr handles are not initialized before
1514 	//    the log module is set up.
1515 	import std.stdio; File f; f.close();
1516 
1517 	initializeLogModule();
1518 
1519 	logTrace("create driver core");
1520 
1521 	st_threadsMutex = new Mutex;
1522 	st_threadShutdownCondition = new Condition(st_threadsMutex);
1523 
1524 	auto thisthr = Thread.getThis();
1525 	thisthr.name = "main";
1526 	assert(st_threads.length == 0, "Main thread not the first thread!?");
1527 	st_threads ~= ThreadContext(thisthr);
1528 
1529 	st_threadsSignal = createSharedManualEvent();
1530 
1531 	version(VibeIdleCollect) {
1532 		logTrace("setup gc");
1533 		setupGcTimer();
1534 	}
1535 
1536 	version (VibeNoDefaultArgs) {}
1537 	else {
1538 		readOption("uid|user", &s_privilegeLoweringUserName, "Sets the user name or id used for privilege lowering.");
1539 		readOption("gid|group", &s_privilegeLoweringGroupName, "Sets the group name or id used for privilege lowering.");
1540 	}
1541 
1542 	import std.concurrency;
1543 	scheduler = new VibedScheduler;
1544 }
1545 
1546 shared static ~this()
1547 {
1548 	shutdownDriver();
1549 
1550 	size_t tasks_left = s_scheduler.scheduledTaskCount;
1551 
1552 	if (tasks_left > 0)
1553 		logWarn("There were still %d tasks running at exit.", tasks_left);
1554 }
1555 
1556 // per thread setup
1557 static this()
1558 {
1559 	/// workaround for:
1560 	// object.Exception@src/rt/minfo.d(162): Aborting: Cycle detected between modules with ctors/dtors:
1561 	// vibe.core.core -> vibe.core.drivers.native -> vibe.core.drivers.libasync -> vibe.core.core
1562 	if (Thread.getThis().isDaemon && Thread.getThis().name == "CmdProcessor") return;
1563 
1564 	auto thisthr = Thread.getThis();
1565 	synchronized (st_threadsMutex)
1566 		if (!st_threads.any!(c => c.thread is thisthr))
1567 			st_threads ~= ThreadContext(thisthr);
1568 }
1569 
1570 static ~this()
1571 {
1572 	auto thisthr = Thread.getThis();
1573 
1574 	bool is_main_thread = s_isMainThread;
1575 
1576 	synchronized (st_threadsMutex) {
1577 		auto idx = st_threads.countUntil!(c => c.thread is thisthr);
1578 		logDebug("Thread exit %s (index %s) (main=%s)", thisthr.name, idx, is_main_thread);
1579 	}
1580 
1581 	if (is_main_thread) {
1582 		logDiagnostic("Main thread exiting");
1583 		shutdownWorkerPool();
1584 	}
1585 
1586 	synchronized (st_threadsMutex) {
1587 		auto idx = st_threads.countUntil!(c => c.thread is thisthr);
1588 		assert(idx >= 0, "No more threads registered");
1589 		if (idx >= 0) {
1590 			st_threads[idx] = st_threads[$-1];
1591 			st_threads.length--;
1592 		}
1593 	}
1594 
1595 	// delay deletion of the main event driver to "~shared static this()"
1596 	if (!is_main_thread) shutdownDriver();
1597 
1598 	st_threadShutdownCondition.notifyAll();
1599 }
1600 
1601 private void shutdownWorkerPool()
1602 nothrow {
1603 	shared(TaskPool) tpool;
1604 
1605 	try synchronized (st_threadsMutex) swap(tpool, st_workerPool);
1606 	catch (Exception e) assert(false, e.msg);
1607 
1608 	if (tpool) {
1609 		logDiagnostic("Still waiting for worker threads to exit.");
1610 		tpool.terminate();
1611 	}
1612 }
1613 
1614 private void shutdownDriver()
1615 {
1616 	if (ManualEvent.ms_threadEvent != EventID.init) {
1617 		eventDriver.events.releaseRef(ManualEvent.ms_threadEvent);
1618 		ManualEvent.ms_threadEvent = EventID.init;
1619 	}
1620 
1621 	eventDriver.dispose();
1622 }
1623 
1624 
1625 private void watchExitFlag()
1626 {
1627 	auto emit_count = st_threadsSignal.emitCount;
1628 	while (true) {
1629 		synchronized (st_threadsMutex) {
1630 			if (getExitFlag()) break;
1631 		}
1632 
1633 		try emit_count = st_threadsSignal.wait(emit_count);
1634 		catch (InterruptException e) return;
1635 	}
1636 
1637 	logDebug("main thread exit");
1638 	eventDriver.core.exit();
1639 }
1640 
1641 private extern(C) void extrap()
1642 @safe nothrow {
1643 	logTrace("exception trap");
1644 }
1645 
1646 private extern(C) void onSignal(int signal)
1647 nothrow {
1648 	logInfo("Received signal %d. Shutting down.", signal);
1649 	atomicStore(st_term, true);
1650 	try st_threadsSignal.emit();
1651 	catch (Throwable th) {
1652 		logDebug("Failed to notify for event loop exit: %s", th.msg);
1653 	}
1654 }
1655 
1656 private extern(C) void onBrokenPipe(int signal)
1657 nothrow {
1658 	logTrace("Broken pipe.");
1659 }
1660 
1661 version(Posix)
1662 {
1663 	private bool isRoot() @trusted { return geteuid() == 0; }
1664 
1665 	private void setUID(int uid, int gid)
1666 	@trusted {
1667 		logInfo("Lowering privileges to uid=%d, gid=%d...", uid, gid);
1668 		if (gid >= 0) {
1669 			enforce(getgrgid(gid) !is null, "Invalid group id!");
1670 			enforce(setegid(gid) == 0, "Error setting group id!");
1671 		}
1672 		//if( initgroups(const char *user, gid_t group);
1673 		if (uid >= 0) {
1674 			enforce(getpwuid(uid) !is null, "Invalid user id!");
1675 			enforce(seteuid(uid) == 0, "Error setting user id!");
1676 		}
1677 	}
1678 
1679 	private int getUID(string name)
1680 	@trusted {
1681 		auto pw = getpwnam(name.toStringz());
1682 		enforce(pw !is null, "Unknown user name: "~name);
1683 		return pw.pw_uid;
1684 	}
1685 
1686 	private int getGID(string name)
1687 	@trusted {
1688 		auto gr = getgrnam(name.toStringz());
1689 		enforce(gr !is null, "Unknown group name: "~name);
1690 		return gr.gr_gid;
1691 	}
1692 } else version(Windows){
1693 	private bool isRoot() @safe { return false; }
1694 
1695 	private void setUID(int uid, int gid)
1696 	@safe {
1697 		enforce(false, "UID/GID not supported on Windows.");
1698 	}
1699 
1700 	private int getUID(string name)
1701 	@safe {
1702 		enforce(false, "Privilege lowering not supported on Windows.");
1703 		assert(false);
1704 	}
1705 
1706 	private int getGID(string name)
1707 	@safe {
1708 		enforce(false, "Privilege lowering not supported on Windows.");
1709 		assert(false);
1710 	}
1711 }