1 module vibe.internal.async; 2 3 import std.traits : ParameterTypeTuple, ReturnType; 4 import std.typecons : tuple; 5 import vibe.core.core : hibernate, switchToTask; 6 import vibe.core.task : InterruptException, Task, TaskSwitchPriority; 7 import vibe.core.log; 8 import core.time : Duration, seconds; 9 10 11 auto asyncAwait(Callback, alias action, alias cancel)(string func = __FUNCTION__) 12 if (!is(Object == Duration)) { 13 ParameterTypeTuple!Callback results; 14 alias waitable = Waitable!(Callback, action, cancel, (ParameterTypeTuple!Callback r) { results = r; }); 15 asyncAwaitAny!(true, waitable)(func); 16 return tuple(results); 17 } 18 19 auto asyncAwait(Callback, alias action, alias cancel)(Duration timeout, string func = __FUNCTION__) 20 { 21 static struct R { 22 bool completed = true; 23 ParameterTypeTuple!Callback results; 24 } 25 R ret; 26 static if (is(ReturnType!action == void)) { 27 alias waitable = Waitable!(Callback, 28 action, 29 (cb) { ret.completed = false; cancel(cb); }, 30 (ParameterTypeTuple!Callback r) { ret.results = r; } 31 ); 32 } 33 else { 34 alias waitable = Waitable!(Callback, 35 action, 36 (cb, waitres) { ret.completed = false; cancel(cb, waitres); }, 37 (ParameterTypeTuple!Callback r) { ret.results = r; } 38 ); 39 } 40 asyncAwaitAny!(true, waitable)(timeout, func); 41 return ret; 42 } 43 44 auto asyncAwaitUninterruptible(Callback, alias action)(string func = __FUNCTION__) 45 nothrow { 46 static if (is(typeof(action(Callback.init)) == void)) void cancel(Callback) { assert(false, "Action cannot be cancelled."); } 47 else void cancel(Callback, typeof(action(Callback.init))) @safe @nogc nothrow { assert(false, "Action cannot be cancelled."); } 48 ParameterTypeTuple!Callback results; 49 alias waitable = Waitable!(Callback, action, cancel, (ParameterTypeTuple!Callback r) { results = r; }); 50 asyncAwaitAny!(false, waitable)(func); 51 return tuple(results); 52 } 53 54 auto asyncAwaitUninterruptible(Callback, alias action, alias cancel)(Duration timeout, string func = __FUNCTION__) 55 nothrow { 56 ParameterTypeTuple!Callback results; 57 alias waitable = Waitable!(Callback, action, cancel, (ParameterTypeTuple!Callback r) { results = r; }); 58 asyncAwaitAny!(false, waitable)(timeout, func); 59 return tuple(results); 60 } 61 62 template Waitable(CB, alias WAIT, alias CANCEL, alias DONE) 63 { 64 import std.traits : ReturnType; 65 66 static assert(is(typeof(WAIT(CB.init))), "WAIT must be callable with a parameter of type "~CB.stringof); 67 static if (is(typeof(WAIT(CB.init)) == void)) 68 static assert(is(typeof(CANCEL(CB.init))), 69 "CANCEL must be callable with a parameter of type "~CB.stringof); 70 else 71 static assert(is(typeof(CANCEL(CB.init, typeof(WAIT(CB.init)).init))), 72 "CANCEL must be callable with parameters "~CB.stringof~ 73 " and "~typeof(WAIT(CB.init)).stringof); 74 static assert(is(typeof(DONE(ParameterTypeTuple!CB.init))), 75 "DONE must be callable with types "~ParameterTypeTuple!CB.stringof); 76 77 alias Callback = CB; 78 alias wait = WAIT; 79 alias cancel = CANCEL; 80 alias done = DONE; 81 } 82 83 void asyncAwaitAny(bool interruptible, Waitables...)(Duration timeout, string func = __FUNCTION__) 84 { 85 if (timeout == Duration.max) asyncAwaitAny!(interruptible, Waitables)(func); 86 else { 87 import eventcore.core; 88 89 auto tm = eventDriver.timers.create(); 90 eventDriver.timers.set(tm, timeout, 0.seconds); 91 scope (exit) eventDriver.timers.releaseRef(tm); 92 alias timerwaitable = Waitable!(TimerCallback, 93 cb => eventDriver.timers.wait(tm, cb), 94 cb => eventDriver.timers.cancelWait(tm), 95 (tid) {} 96 ); 97 asyncAwaitAny!(interruptible, timerwaitable, Waitables)(func); 98 } 99 } 100 101 void asyncAwaitAny(bool interruptible, Waitables...)(string func = __FUNCTION__) 102 if (Waitables.length >= 1) 103 { 104 import std.meta : staticMap; 105 import std.algorithm.searching : any; 106 import std.format : format; 107 import std.meta : AliasSeq; 108 import std.traits : ReturnType; 109 110 bool[Waitables.length] fired; 111 bool any_fired = false; 112 Task t; 113 114 bool still_inside = true; 115 scope (exit) still_inside = false; 116 117 debug(VibeAsyncLog) logDebugV("Performing %s async operations in %s", Waitables.length, func); 118 119 static string waitableCode() 120 { 121 string ret; 122 foreach (i, W; Waitables) { 123 alias PTypes = ParameterTypeTuple!(CBDel!W); 124 ret ~= q{ 125 alias PT%1$s = ParameterTypeTuple!(Waitables[%1$s].Callback); 126 scope callback_%1$s = (%2$s) @safe nothrow { 127 // NOTE: this triggers DigitalMars/optlink#18 128 //() @trusted { logDebugV("siw %%x", &still_inside); } (); 129 debug(VibeAsyncLog) logDebugV("Waitable %%s in %%s fired (istask=%%s).", %1$s, func, t != Task.init); 130 assert(still_inside, "Notification fired after asyncAwait had already returned!"); 131 fired[%1$s] = true; 132 any_fired = true; 133 Waitables[%1$s].done(%3$s); 134 if (t != Task.init) { 135 version (VibeHighEventPriority) switchToTask(t); 136 else switchToTask(t, TaskSwitchPriority.normal); 137 } 138 }; 139 140 debug(VibeAsyncLog) logDebugV("Starting operation %%s", %1$s); 141 alias WR%1$s = typeof(Waitables[%1$s].wait(callback_%1$s)); 142 static if (is(WR%1$s == void)) Waitables[%1$s].wait(callback_%1$s); 143 else auto wr%1$s = Waitables[%1$s].wait(callback_%1$s); 144 145 scope (exit) { 146 if (!fired[%1$s]) { 147 debug(VibeAsyncLog) logDebugV("Cancelling operation %%s", %1$s); 148 static if (is(WR%1$s == void)) Waitables[%1$s].cancel(callback_%1$s); 149 else Waitables[%1$s].cancel(callback_%1$s, wr%1$s); 150 any_fired = true; 151 fired[%1$s] = true; 152 } 153 } 154 155 if (any_fired) { 156 debug(VibeAsyncLog) logDebugV("Returning to %%s without waiting.", func); 157 return; 158 } 159 }.format(i, generateParamDecls!(CBDel!W)(format("PT%s", i)), generateParamNames!(CBDel!W)); 160 } 161 return ret; 162 } 163 164 mixin(waitableCode()); 165 166 debug(VibeAsyncLog) logDebugV("Need to wait in %s (%s)...", func, interruptible ? "interruptible" : "uninterruptible"); 167 168 t = Task.getThis(); 169 170 debug (VibeAsyncLog) scope (failure) logDebugV("Aborting wait due to exception"); 171 172 do { 173 static if (interruptible) { 174 bool interrupted = false; 175 hibernate(() @safe nothrow { 176 debug(VibeAsyncLog) logDebugV("Got interrupted in %s.", func); 177 interrupted = true; 178 }); 179 debug(VibeAsyncLog) logDebugV("Task resumed (fired=%s, interrupted=%s)", fired, interrupted); 180 if (interrupted) 181 throw new InterruptException; 182 } else { 183 hibernate(); 184 debug(VibeAsyncLog) logDebugV("Task resumed (fired=%s)", fired); 185 } 186 } while (!any_fired); 187 188 debug(VibeAsyncLog) logDebugV("Return result for %s.", func); 189 } 190 191 private alias CBDel(alias Waitable) = Waitable.Callback; 192 193 @safe nothrow /*@nogc*/ unittest { 194 int cnt = 0; 195 auto ret = asyncAwaitUninterruptible!(void delegate(int) @safe nothrow, (cb) { cnt++; cb(42); }); 196 assert(ret[0] == 42); 197 assert(cnt == 1); 198 } 199 200 @safe nothrow /*@nogc*/ unittest { 201 int a, b, c; 202 int w1r, w2r; 203 alias w1 = Waitable!( 204 void delegate(int) @safe nothrow, 205 (cb) { a++; cb(42); }, 206 (cb) { assert(false); }, 207 (i) { w1r = i; } 208 ); 209 alias w2 = Waitable!( 210 void delegate(int) @safe nothrow, 211 (cb) { b++; }, 212 (cb) { c++; }, 213 (i) { w2r = i; } 214 ); 215 alias w3 = Waitable!( 216 void delegate(int) @safe nothrow, 217 (cb) { c++; cb(42); }, 218 (cb) { assert(false); }, 219 (int n) { assert(n == 42); } 220 ); 221 222 asyncAwaitAny!(false, w1, w2); 223 assert(w1r == 42 && w2r == 0); 224 assert(a == 1 && b == 0 && c == 0); 225 226 asyncAwaitAny!(false, w2, w1); 227 assert(w1r == 42 && w2r == 0); 228 assert(a == 2 && b == 1 && c == 1); 229 230 asyncAwaitAny!(false, w3); 231 assert(c == 2); 232 } 233 234 private string generateParamDecls(Fun)(string ptypes_name = "PTypes") 235 { 236 import std.format : format; 237 import std.traits : ParameterTypeTuple, ParameterStorageClass, ParameterStorageClassTuple; 238 239 if (!__ctfe) assert(false); 240 241 alias Types = ParameterTypeTuple!Fun; 242 alias SClasses = ParameterStorageClassTuple!Fun; 243 string ret; 244 foreach (i, T; Types) { 245 static if (i > 0) ret ~= ", "; 246 static if (SClasses[i] & ParameterStorageClass.lazy_) ret ~= "lazy "; 247 static if (SClasses[i] & ParameterStorageClass.scope_) ret ~= "scope "; 248 static if (SClasses[i] & ParameterStorageClass.out_) ret ~= "out "; 249 static if (SClasses[i] & ParameterStorageClass.ref_) ret ~= "ref "; 250 ret ~= format("%s[%s] param_%s", ptypes_name, i, i); 251 } 252 return ret; 253 } 254 255 private string generateParamNames(Fun)() 256 { 257 import std.format : format; 258 if (!__ctfe) assert(false); 259 260 string ret; 261 foreach (i, T; ParameterTypeTuple!Fun) { 262 static if (i > 0) ret ~= ", "; 263 ret ~= format("param_%s", i); 264 } 265 return ret; 266 } 267 268 private template hasAnyScopeParameter(Callback) { 269 import std.algorithm.searching : any; 270 import std.traits : ParameterStorageClass, ParameterStorageClassTuple; 271 alias SC = ParameterStorageClassTuple!Callback; 272 static if (SC.length == 0) enum hasAnyScopeParameter = false; 273 else enum hasAnyScopeParameter = any!(c => c & ParameterStorageClass.scope_)([SC]); 274 }