1 module vibe.internal.async;
2 
3 import std.traits : ParameterTypeTuple, ReturnType;
4 import std.typecons : tuple;
5 import vibe.core.core : hibernate, switchToTask;
6 import vibe.core.task : InterruptException, Task, TaskSwitchPriority;
7 import vibe.core.log;
8 import core.time : Duration, seconds;
9 
10 
11 auto asyncAwait(Callback, alias action, alias cancel)(string func = __FUNCTION__)
12 if (!is(Object == Duration)) {
13 	ParameterTypeTuple!Callback results;
14 	alias waitable = Waitable!(Callback, action, cancel, (ParameterTypeTuple!Callback r) { results = r; });
15 	asyncAwaitAny!(true, waitable)(func);
16 	return tuple(results);
17 }
18 
19 auto asyncAwait(Callback, alias action, alias cancel)(Duration timeout, string func = __FUNCTION__)
20 {
21 	static struct R {
22 		bool completed = true;
23 		ParameterTypeTuple!Callback results;
24 	}
25 	R ret;
26 	static if (is(ReturnType!action == void)) {
27 		alias waitable = Waitable!(Callback,
28 			action,
29 			(cb) { ret.completed = false; cancel(cb); },
30 			(ParameterTypeTuple!Callback r) { ret.results = r; }
31 		);
32 	}
33 	else {
34 		alias waitable = Waitable!(Callback,
35 			action,
36 			(cb, waitres) { ret.completed = false; cancel(cb, waitres); },
37 			(ParameterTypeTuple!Callback r) { ret.results = r; }
38 		);
39 	}
40 	asyncAwaitAny!(true, waitable)(timeout, func);
41 	return ret;
42 }
43 
44 auto asyncAwaitUninterruptible(Callback, alias action)(string func = __FUNCTION__)
45 nothrow {
46 	static if (is(typeof(action(Callback.init)) == void)) void cancel(Callback) { assert(false, "Action cannot be cancelled."); }
47 	else void cancel(Callback, typeof(action(Callback.init))) @safe @nogc nothrow { assert(false, "Action cannot be cancelled."); }
48 	ParameterTypeTuple!Callback results;
49 	alias waitable = Waitable!(Callback, action, cancel, (ParameterTypeTuple!Callback r) { results = r; });
50 	asyncAwaitAny!(false, waitable)(func);
51 	return tuple(results);
52 }
53 
54 auto asyncAwaitUninterruptible(Callback, alias action, alias cancel)(Duration timeout, string func = __FUNCTION__)
55 nothrow {
56 	ParameterTypeTuple!Callback results;
57 	alias waitable = Waitable!(Callback, action, cancel, (ParameterTypeTuple!Callback r) { results = r; });
58 	asyncAwaitAny!(false, waitable)(timeout, func);
59 	return tuple(results);
60 }
61 
62 template Waitable(CB, alias WAIT, alias CANCEL, alias DONE)
63 {
64 	import std.traits : ReturnType;
65 
66 	static assert(is(typeof(WAIT(CB.init))), "WAIT must be callable with a parameter of type "~CB.stringof);
67 	static if (is(typeof(WAIT(CB.init)) == void))
68 		static assert(is(typeof(CANCEL(CB.init))),
69 			"CANCEL must be callable with a parameter of type "~CB.stringof);
70 	else
71 		static assert(is(typeof(CANCEL(CB.init, typeof(WAIT(CB.init)).init))),
72 			"CANCEL must be callable with parameters "~CB.stringof~
73 			" and "~typeof(WAIT(CB.init)).stringof);
74 	static assert(is(typeof(DONE(ParameterTypeTuple!CB.init))),
75 		"DONE must be callable with types "~ParameterTypeTuple!CB.stringof);
76 
77 	alias Callback = CB;
78 	alias wait = WAIT;
79 	alias cancel = CANCEL;
80 	alias done = DONE;
81 }
82 
83 void asyncAwaitAny(bool interruptible, Waitables...)(Duration timeout, string func = __FUNCTION__)
84 {
85 	if (timeout == Duration.max) asyncAwaitAny!(interruptible, Waitables)(func);
86 	else {
87 		import eventcore.core;
88 
89 		auto tm = eventDriver.timers.create();
90 		eventDriver.timers.set(tm, timeout, 0.seconds);
91 		scope (exit) eventDriver.timers.releaseRef(tm);
92 		alias timerwaitable = Waitable!(TimerCallback,
93 			cb => eventDriver.timers.wait(tm, cb),
94 			cb => eventDriver.timers.cancelWait(tm),
95 			(tid) {}
96 		);
97 		asyncAwaitAny!(interruptible, timerwaitable, Waitables)(func);
98 	}
99 }
100 
101 void asyncAwaitAny(bool interruptible, Waitables...)(string func = __FUNCTION__)
102 	if (Waitables.length >= 1)
103 {
104 	import std.meta : staticMap;
105 	import std.algorithm.searching : any;
106 	import std.format : format;
107 	import std.meta : AliasSeq;
108 	import std.traits : ReturnType;
109 
110 	bool[Waitables.length] fired;
111 	bool any_fired = false;
112 	Task t;
113 
114 	bool still_inside = true;
115 	scope (exit) still_inside = false;
116 
117 	debug(VibeAsyncLog) logDebugV("Performing %s async operations in %s", Waitables.length, func);
118 
119 	static string waitableCode()
120 	{
121 		string ret;
122 		foreach (i, W; Waitables) {
123 			alias PTypes = ParameterTypeTuple!(CBDel!W);
124 			ret ~= q{
125 				alias PT%1$s = ParameterTypeTuple!(Waitables[%1$s].Callback);
126 				scope callback_%1$s = (%2$s) @safe nothrow {
127 					// NOTE: this triggers DigitalMars/optlink#18
128 					//() @trusted { logDebugV("siw %%x", &still_inside); } ();
129 					debug(VibeAsyncLog) logDebugV("Waitable %%s in %%s fired (istask=%%s).", %1$s, func, t != Task.init);
130 					assert(still_inside, "Notification fired after asyncAwait had already returned!");
131 					fired[%1$s] = true;
132 					any_fired = true;
133 					Waitables[%1$s].done(%3$s);
134 					if (t != Task.init) {
135 						version (VibeHighEventPriority) switchToTask(t);
136 						else switchToTask(t, TaskSwitchPriority.normal);
137 					}
138 				};
139 
140 				debug(VibeAsyncLog) logDebugV("Starting operation %%s", %1$s);
141 				alias WR%1$s = typeof(Waitables[%1$s].wait(callback_%1$s));
142 				static if (is(WR%1$s == void)) Waitables[%1$s].wait(callback_%1$s);
143 				else auto wr%1$s = Waitables[%1$s].wait(callback_%1$s);
144 
145 				scope (exit) {
146 					if (!fired[%1$s]) {
147 						debug(VibeAsyncLog) logDebugV("Cancelling operation %%s", %1$s);
148 						static if (is(WR%1$s == void)) Waitables[%1$s].cancel(callback_%1$s);
149 						else Waitables[%1$s].cancel(callback_%1$s, wr%1$s);
150 						any_fired = true;
151 						fired[%1$s] = true;
152 					}
153 				}
154 
155 				if (any_fired) {
156 					debug(VibeAsyncLog) logDebugV("Returning to %%s without waiting.", func);
157 					return;
158 				}
159 			}.format(i, generateParamDecls!(CBDel!W)(format("PT%s", i)), generateParamNames!(CBDel!W));
160 		}
161 		return ret;
162 	}
163 
164 	mixin(waitableCode());
165 
166 	debug(VibeAsyncLog) logDebugV("Need to wait in %s (%s)...", func, interruptible ? "interruptible" : "uninterruptible");
167 
168 	t = Task.getThis();
169 
170 	debug (VibeAsyncLog) scope (failure) logDebugV("Aborting wait due to exception");
171 
172 	do {
173 		static if (interruptible) {
174 			bool interrupted = false;
175 			hibernate(() @safe nothrow {
176 				debug(VibeAsyncLog) logDebugV("Got interrupted in %s.", func);
177 				interrupted = true;
178 			});
179 			debug(VibeAsyncLog) logDebugV("Task resumed (fired=%s, interrupted=%s)", fired, interrupted);
180 			if (interrupted)
181 				throw new InterruptException;
182 		} else {
183 			hibernate();
184 			debug(VibeAsyncLog) logDebugV("Task resumed (fired=%s)", fired);
185 		}
186 	} while (!any_fired);
187 
188 	debug(VibeAsyncLog) logDebugV("Return result for %s.", func);
189 }
190 
191 private alias CBDel(alias Waitable) = Waitable.Callback;
192 
193 @safe nothrow /*@nogc*/ unittest {
194 	int cnt = 0;
195 	auto ret = asyncAwaitUninterruptible!(void delegate(int) @safe nothrow, (cb) { cnt++; cb(42); });
196 	assert(ret[0] == 42);
197 	assert(cnt == 1);
198 }
199 
200 @safe nothrow /*@nogc*/ unittest {
201 	int a, b, c;
202 	int w1r, w2r;
203 	alias w1 = Waitable!(
204 		void delegate(int) @safe nothrow,
205 		(cb) { a++; cb(42); },
206 		(cb) { assert(false); },
207 		(i) { w1r = i; }
208 	);
209 	alias w2 = Waitable!(
210 		void delegate(int) @safe nothrow,
211 		(cb) { b++; },
212 		(cb) { c++; },
213 		(i) { w2r = i; }
214 	);
215 	alias w3 = Waitable!(
216 		void delegate(int) @safe nothrow,
217 		(cb) { c++; cb(42); },
218 		(cb) { assert(false); },
219 		(int n) { assert(n == 42); }
220 	);
221 
222 	asyncAwaitAny!(false, w1, w2);
223 	assert(w1r == 42 && w2r == 0);
224 	assert(a == 1 && b == 0 && c == 0);
225 
226 	asyncAwaitAny!(false, w2, w1);
227 	assert(w1r == 42 && w2r == 0);
228 	assert(a == 2 && b == 1 && c == 1);
229 
230 	asyncAwaitAny!(false, w3);
231 	assert(c == 2);
232 }
233 
234 private string generateParamDecls(Fun)(string ptypes_name = "PTypes")
235 {
236 	import std.format : format;
237 	import std.traits : ParameterTypeTuple, ParameterStorageClass, ParameterStorageClassTuple;
238 
239 	if (!__ctfe) assert(false);
240 
241 	alias Types = ParameterTypeTuple!Fun;
242 	alias SClasses = ParameterStorageClassTuple!Fun;
243 	string ret;
244 	foreach (i, T; Types) {
245 		static if (i > 0) ret ~= ", ";
246 		static if (SClasses[i] & ParameterStorageClass.lazy_) ret ~= "lazy ";
247 		static if (SClasses[i] & ParameterStorageClass.scope_) ret ~= "scope ";
248 		static if (SClasses[i] & ParameterStorageClass.out_) ret ~= "out ";
249 		static if (SClasses[i] & ParameterStorageClass.ref_) ret ~= "ref ";
250 		ret ~= format("%s[%s] param_%s", ptypes_name, i, i);
251 	}
252 	return ret;
253 }
254 
255 private string generateParamNames(Fun)()
256 {
257 	import std.format : format;
258 	if (!__ctfe) assert(false);
259 
260 	string ret;
261 	foreach (i, T; ParameterTypeTuple!Fun) {
262 		static if (i > 0) ret ~= ", ";
263 		ret ~= format("param_%s", i);
264 	}
265 	return ret;
266 }
267 
268 private template hasAnyScopeParameter(Callback) {
269 	import std.algorithm.searching : any;
270 	import std.traits : ParameterStorageClass, ParameterStorageClassTuple;
271 	alias SC = ParameterStorageClassTuple!Callback;
272 	static if (SC.length == 0) enum hasAnyScopeParameter = false;
273 	else enum hasAnyScopeParameter = any!(c => c & ParameterStorageClass.scope_)([SC]);
274 }