1 /** 2 Functions and structures for dealing with threads and concurrent access. 3 4 This module is modeled after std.concurrency, but provides a fiber-aware alternative 5 to it. All blocking operations will yield the calling fiber instead of blocking it. 6 7 Copyright: © 2013-2014 Sönke Ludwig 8 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 9 Authors: Sönke Ludwig 10 */ 11 module vibe.core.concurrency; 12 13 public import std.concurrency; 14 15 import core.time; 16 import std.traits; 17 import std.typecons; 18 import std.typetuple; 19 import std.string; 20 import vibe.core.task; 21 22 23 private extern (C) pure nothrow void _d_monitorenter(Object h); 24 private extern (C) pure nothrow void _d_monitorexit(Object h); 25 26 /** 27 Locks the given shared object and returns a ScopedLock for accessing any unshared members. 28 29 Using this function will ensure that there are no data races. For this reason, the class 30 type T is required to contain no unshared or unisolated aliasing. 31 32 See_Also: core.concurrency.isWeaklyIsolated 33 */ 34 ScopedLock!T lock(T : const(Object))(shared(T) object) 35 pure nothrow @safe { 36 return ScopedLock!T(object); 37 } 38 /// ditto 39 void lock(T : const(Object))(shared(T) object, scope void delegate(scope T) nothrow accessor) 40 nothrow { 41 auto l = lock(object); 42 accessor(l.unsafeGet()); 43 } 44 /// ditto 45 void lock(T : const(Object))(shared(T) object, scope void delegate(scope T) accessor) 46 { 47 auto l = lock(object); 48 accessor(l.unsafeGet()); 49 } 50 51 /// 52 unittest { 53 import vibe.core.concurrency; 54 55 static class Item { 56 private double m_value; 57 58 this(double value) pure { m_value = value; } 59 60 @property double value() const pure { return m_value; } 61 } 62 63 static class Manager { 64 private { 65 string m_name; 66 Isolated!(Item) m_ownedItem; 67 Isolated!(shared(Item)[]) m_items; 68 } 69 70 pure this(string name) 71 { 72 m_name = name; 73 auto itm = makeIsolated!Item(3.5); 74 m_ownedItem = itm.move; 75 } 76 77 void addItem(shared(Item) item) pure { m_items ~= item; } 78 79 double getTotalValue() 80 const pure { 81 double sum = 0; 82 83 // lock() is required to access shared objects 84 foreach (itm; m_items) { 85 auto l = itm.lock(); 86 sum += l.value; 87 } 88 89 // owned objects can be accessed without locking 90 sum += m_ownedItem.value; 91 92 return sum; 93 } 94 } 95 96 void test() 97 { 98 import std.stdio; 99 100 auto man = cast(shared)new Manager("My manager"); 101 { 102 auto l = man.lock(); 103 l.addItem(new shared(Item)(1.5)); 104 l.addItem(new shared(Item)(0.5)); 105 } 106 107 writefln("Total value: %s", man.lock().getTotalValue()); 108 } 109 } 110 111 112 /** 113 Proxy structure that keeps the monitor of the given object locked until it 114 goes out of scope. 115 116 Any unshared members of the object are safely accessible during this time. The usual 117 way to use it is by calling lock. 118 119 See_Also: lock 120 */ 121 struct ScopedLock(T) 122 { 123 static assert(is(T == class), "ScopedLock is only usable with classes."); 124 // static assert(isWeaklyIsolated!(FieldTypeTuple!T), T.stringof~" contains non-immutable, non-shared references. Accessing it in a multi-threaded environment is not safe."); 125 126 private Rebindable!T m_ref; 127 128 @disable this(this); 129 130 this(shared(T) obj) 131 pure nothrow @trusted 132 { 133 assert(obj !is null, "Attempting to lock null object."); 134 m_ref = cast(T)obj; 135 _d_monitorenter(getObject()); 136 assert(getObject().__monitor !is null); 137 } 138 139 ~this() 140 pure nothrow @trusted 141 { 142 assert(m_ref !is null); 143 assert(getObject().__monitor !is null); 144 _d_monitorexit(getObject()); 145 } 146 147 /** 148 Returns an unshared reference to the locked object. 149 150 Note that using this function breaks type safety. Be sure to not escape the reference beyond 151 the life time of the lock. 152 */ 153 @property inout(T) unsafeGet() inout nothrow { return m_ref; } 154 155 alias unsafeGet this; 156 //pragma(msg, "In ScopedLock!("~T.stringof~")"); 157 //pragma(msg, isolatedRefMethods!T()); 158 // mixin(isolatedAggregateMethodsString!T()); 159 160 private Object getObject() 161 pure nothrow { 162 static if( is(Rebindable!T == struct) ) return cast(Unqual!T)m_ref.get(); 163 else return cast(Unqual!T)m_ref; 164 } 165 } 166 167 168 /** 169 Creates a new isolated object. 170 171 Isolated objects contain no mutable aliasing outside of their own reference tree. They can thus 172 be safely converted to immutable and they can be safely passed between threads. 173 174 The function returns an instance of Isolated that will allow proxied access to the members of 175 the object, as well as providing means to convert the object to immutable or to an ordinary 176 mutable object. 177 */ 178 pure Isolated!T makeIsolated(T, ARGS...)(ARGS args) 179 { 180 static if (is(T == class)) return Isolated!T(new T(args)); 181 else static if (is(T == struct)) return T(args); 182 else static if (isPointer!T && is(PointerTarget!T == struct)) { 183 alias TB = PointerTarget!T; 184 return Isolated!T(new TB(args)); 185 } else static assert(false, "makeIsolated works only for class and (pointer to) struct types."); 186 } 187 188 /// 189 unittest { 190 import vibe.core.concurrency; 191 import vibe.core.core; 192 193 static class Item { 194 double value; 195 string name; 196 } 197 198 static void modifyItem(Isolated!Item itm) 199 { 200 itm.value = 1.3; 201 // TODO: send back to initiating thread 202 } 203 204 void test() 205 { 206 immutable(Item)[] items; 207 208 // create immutable item procedurally 209 auto itm = makeIsolated!Item(); 210 itm.value = 2.4; 211 itm.name = "Test"; 212 items ~= itm.freeze(); 213 214 // send isolated item to other thread 215 auto itm2 = makeIsolated!Item(); 216 runWorkerTask(&modifyItem, itm2.move()); 217 // ... 218 } 219 } 220 221 unittest { 222 static class C { this(int x) pure {} } 223 static struct S { this(int x) pure {} } 224 225 alias CI = typeof(makeIsolated!C(0)); 226 alias SI = typeof(makeIsolated!S(0)); 227 alias SPI = typeof(makeIsolated!(S*)(0)); 228 static assert(isStronglyIsolated!CI); 229 static assert(is(CI == IsolatedRef!C)); 230 static assert(isStronglyIsolated!SI); 231 static assert(is(SI == S)); 232 static assert(isStronglyIsolated!SPI); 233 static assert(is(SPI == IsolatedRef!S)); 234 } 235 236 237 /** 238 Creates a new isolated array. 239 */ 240 pure Isolated!(T[]) makeIsolatedArray(T)(size_t size) 241 { 242 Isolated!(T[]) ret; 243 ret.length = size; 244 return ret.move(); 245 } 246 247 /// 248 unittest { 249 import vibe.core.concurrency; 250 import vibe.core.core; 251 252 static void compute(Tid tid, Isolated!(double[]) array, size_t start_index) 253 nothrow { 254 foreach( i; 0 .. array.length ) 255 array[i] = (start_index + i) * 0.5; 256 257 //send(tid, array.move()); // Isolated!T isn't recognized by std.concurrency 258 } 259 260 void test() 261 { 262 import std.stdio; 263 264 // compute contents of an array using multiple threads 265 auto arr = makeIsolatedArray!double(256); 266 267 // partition the array (no copying takes place) 268 size_t[] indices = [64, 128, 192, 256]; 269 Isolated!(double[])[] subarrays = arr.splice(indices); 270 271 // start processing in threads 272 Tid[] tids; 273 foreach (i, idx; indices) 274 tids ~= runWorkerTaskH(&compute, thisTid, subarrays[i].move(), idx).tid; 275 276 // collect results 277 auto resultarrays = new Isolated!(double[])[tids.length]; 278 //foreach( i, tid; tids ) 279 // resultarrays[i] = receiveOnly!(Isolated!(double[])).move(); // Isolated!T isn't recognized by std.concurrency 280 281 // BUG: the arrays must be sorted here, but since there is no way to tell 282 // from where something was received, this is difficult here. 283 284 // merge results (no copying takes place again) 285 foreach( i; 1 .. resultarrays.length ) 286 resultarrays[0].merge(resultarrays[i]); 287 288 // convert the final result to immutable 289 auto result = resultarrays[0].freeze(); 290 291 writefln("Result: %s", result); 292 } 293 } 294 295 296 /** 297 Unsafe facility to assume that an existing reference is unique. 298 */ 299 Isolated!T assumeIsolated(T)(T object) 300 { 301 return Isolated!T(object); 302 } 303 304 /** 305 Encapsulates the given type in a way that guarantees memory isolation. 306 307 See_Also: makeIsolated, makeIsolatedArray 308 */ 309 template Isolated(T) 310 { 311 static if( isWeaklyIsolated!T ){ 312 alias Isolated = T; 313 } else static if( is(T == class) ){ 314 alias Isolated = IsolatedRef!T; 315 } else static if( isPointer!T ){ 316 alias Isolated = IsolatedRef!(PointerTarget!T); 317 } else static if( isDynamicArray!T ){ 318 alias Isolated = IsolatedArray!(typeof(T.init[0])); 319 } else static if( isAssociativeArray!T ){ 320 alias Isolated = IsolatedAssociativeArray!(KeyType!T, ValueType!T); 321 } else static assert(false, T.stringof~": Unsupported type for Isolated!T - must be class, pointer, array or associative array."); 322 } 323 324 325 // unit tests fails with DMD 2.064 due to some cyclic import regression 326 unittest 327 { 328 static class CE {} 329 static struct SE {} 330 331 static assert(is(Isolated!CE == IsolatedRef!CE)); 332 static assert(is(Isolated!(SE*) == IsolatedRef!SE)); 333 static assert(is(Isolated!(SE[]) == IsolatedArray!SE)); 334 version(EnablePhobosFails){ 335 // AAs don't work because they are impure 336 static assert(is(Isolated!(SE[string]) == IsolatedAssociativeArray!(string, SE))); 337 } 338 } 339 340 341 /// private 342 private struct IsolatedRef(T) 343 { 344 pure: 345 static assert(isWeaklyIsolated!(FieldTypeTuple!T), T.stringof ~ " contains non-immutable/non-shared references. Isolation cannot be guaranteed."); 346 enum __isWeakIsolatedType = true; 347 static if( isStronglyIsolated!(FieldTypeTuple!T) ) 348 enum __isIsolatedType = true; 349 350 alias BaseType = T; 351 352 static if( is(T == class) ){ 353 alias Tref = T; 354 alias Tiref = immutable(T); 355 } else { 356 alias Tref = T*; 357 alias Tiref = immutable(T)*; 358 } 359 360 private Tref m_ref; 361 362 //mixin isolatedAggregateMethods!T; 363 //pragma(msg, isolatedAggregateMethodsString!T()); 364 mixin(isolatedAggregateMethodsString!T()); 365 366 @disable this(this); 367 368 private this(Tref obj) 369 { 370 m_ref = obj; 371 } 372 373 this(ref IsolatedRef src) 374 { 375 m_ref = src.m_ref; 376 src.m_ref = null; 377 } 378 379 void opAssign(ref IsolatedRef src) 380 { 381 m_ref = src.m_ref; 382 src.m_ref = null; 383 } 384 385 /** 386 Returns the raw reference. 387 388 Note that using this function breaks type safety. Be sure to not escape the reference. 389 */ 390 inout(Tref) unsafeGet() inout { return m_ref; } 391 392 /** 393 Move the contained reference to a new IsolatedRef. 394 395 Since IsolatedRef is not copyable, using this function may be necessary when 396 passing a reference to a function or when returning it. The reference in 397 this instance will be set to null after the call returns. 398 */ 399 IsolatedRef move() { auto r = m_ref; m_ref = null; return IsolatedRef(r); } 400 /// ditto 401 void move(ref IsolatedRef target) { target.m_ref = m_ref; m_ref = null; } 402 403 /** 404 Convert the isolated reference to a normal mutable reference. 405 406 The reference in this instance will be set to null after the call returns. 407 */ 408 Tref extract() 409 { 410 auto ret = m_ref; 411 m_ref = null; 412 return ret; 413 } 414 415 /** 416 Converts the isolated reference to immutable. 417 418 The reference in this instance will be set to null after the call has returned. 419 Note that this method is only available for strongly isolated references, 420 which means references that do not contain shared aliasing. 421 */ 422 Tiref freeze()() 423 { 424 static assert(isStronglyIsolated!(FieldTypeTuple!T), "freeze() can only be called on strongly isolated values, but "~T.stringof~" contains shared references."); 425 auto ret = m_ref; 426 m_ref = null; 427 return cast(immutable)ret; 428 } 429 430 /** 431 Performs an up- or down-cast of the reference and moves it to a new IsolatedRef instance. 432 433 The reference in this instance will be set to null after the call has returned. 434 */ 435 U opCast(U)() 436 if (isInstanceOf!(IsolatedRef, U) && (is(U.BaseType : BaseType) || is(BaseType : U.BaseType))) 437 { 438 auto r = U(cast(U.BaseType)m_ref); 439 m_ref = null; 440 return r; 441 } 442 443 /** 444 Determines if the contained reference is non-null. 445 446 This method allows Isolated references to be used in boolean expressions without having to 447 extract the reference. 448 */ 449 U opCast(U)() const if(is(U == bool)) { return m_ref !is null; } 450 } 451 452 453 /// private 454 private struct IsolatedArray(T) 455 { 456 static assert(isWeaklyIsolated!T, T.stringof ~ " contains non-immutable references. Isolation cannot be guaranteed."); 457 enum __isWeakIsolatedType = true; 458 static if( isStronglyIsolated!T ) 459 enum __isIsolatedType = true; 460 461 alias BaseType = T[]; 462 463 private T[] m_array; 464 465 mixin isolatedArrayMethods!T; 466 467 @disable this(this); 468 469 /** 470 Returns the raw reference. 471 472 Note that using this function breaks type safety. Be sure to not escape the reference. 473 */ 474 inout(T[]) unsafeGet() inout { return m_array; } 475 476 IsolatedArray!T move() pure { auto r = m_array; m_array = null; return IsolatedArray(r); } 477 void move(ref IsolatedArray target) pure { target.m_array = m_array; m_array = null; } 478 479 T[] extract() 480 pure { 481 auto arr = m_array; 482 m_array = null; 483 return arr; 484 } 485 486 immutable(T)[] freeze()() pure 487 { 488 static assert(isStronglyIsolated!T, "Freeze can only be called on strongly isolated values, but "~T.stringof~" contains shared references."); 489 auto arr = m_array; 490 m_array = null; 491 return cast(immutable)arr; 492 } 493 494 495 /** 496 Splits the array into individual slices at the given incides. 497 498 The indices must be in ascending order. Any items that are larger than 499 the last given index will remain in this IsolatedArray. 500 */ 501 IsolatedArray!T[] splice(in size_t[] indices...) pure 502 in { 503 //import std.algorithm : isSorted; 504 assert(indices.length > 0, "At least one splice index must be given."); 505 //assert(isSorted(indices), "Indices must be in ascending order."); 506 assert(indices[$-1] <= m_array.length, "Splice index out of bounds."); 507 } 508 do { 509 auto ret = new IsolatedArray!T[indices.length]; 510 size_t lidx = 0; 511 foreach( i, sidx; indices ){ 512 ret[i].m_array = m_array[lidx .. sidx]; 513 lidx = sidx; 514 } 515 m_array = m_array[lidx .. $]; 516 return ret; 517 } 518 519 void merge(ref IsolatedArray!T array) pure 520 in { 521 assert(array.m_array.ptr == m_array.ptr+m_array.length || array.m_array.ptr+array.length == m_array.ptr, 522 "Argument to merge() must be a neighbouring array partition."); 523 } 524 do { 525 if( array.m_array.ptr == m_array.ptr + m_array.length ){ 526 m_array = m_array.ptr[0 .. m_array.length + array.length]; 527 } else { 528 m_array = array.m_array.ptr[0 .. m_array.length + array.length]; 529 } 530 array.m_array.length = 0; 531 } 532 } 533 534 535 /// private 536 private struct IsolatedAssociativeArray(K, V) 537 { 538 pure: 539 static assert(isWeaklyIsolated!K, "Key type has aliasing. Memory isolation cannot be guaranteed."); 540 static assert(isWeaklyIsolated!V, "Value type has aliasing. Memory isolation cannot be guaranteed."); 541 542 enum __isWeakIsolatedType = true; 543 static if( isStronglyIsolated!K && isStronglyIsolated!V ) 544 enum __isIsolatedType = true; 545 546 alias BaseType = V[K]; 547 548 private { 549 V[K] m_aa; 550 } 551 552 mixin isolatedAssociativeArrayMethods!(K, V); 553 554 /** 555 Returns the raw reference. 556 557 Note that using this function breaks type safety. Be sure to not escape the reference. 558 */ 559 inout(V[K]) unsafeGet() inout { return m_aa; } 560 561 IsolatedAssociativeArray move() { auto r = m_aa; m_aa = null; return IsolatedAssociativeArray(r); } 562 void move(ref IsolatedAssociativeArray target) { target.m_aa = m_aa; m_aa = null; } 563 564 V[K] extract() 565 { 566 auto arr = m_aa; 567 m_aa = null; 568 return arr; 569 } 570 571 static if( is(typeof(IsolatedAssociativeArray.__isIsolatedType)) ){ 572 immutable(V)[K] freeze() 573 { 574 auto arr = m_aa; 575 m_aa = null; 576 return cast(immutable(V)[K])(arr); 577 } 578 579 immutable(V[K]) freeze2() 580 { 581 auto arr = m_aa; 582 m_aa = null; 583 return cast(immutable(V[K]))(arr); 584 } 585 } 586 } 587 588 589 /** Encapsulates a reference in a way that disallows escaping it or any contained references. 590 */ 591 template ScopedRef(T) 592 { 593 static if( isAggregateType!T ) alias ScopedRef = ScopedRefAggregate!T; 594 else static if( isAssociativeArray!T ) alias ScopedRef = ScopedRefAssociativeArray!T; 595 else static if( isArray!T ) alias ScopedRef = ScopedRefArray!T; 596 else static if( isBasicType!T ) alias ScopedRef = ScopedRefBasic!T; 597 else static assert(false, "Unsupported type for ScopedRef: "~T.stringof); 598 } 599 600 /// private 601 private struct ScopedRefBasic(T) 602 { 603 private T* m_ref; 604 605 @disable this(this); 606 607 this(ref T tref) pure { m_ref = &tref; } 608 609 //void opAssign(T value) { *m_ref = value; } 610 611 ref T unsafeGet() pure { return *m_ref; } 612 613 alias unsafeGet this; 614 } 615 616 /// private 617 private struct ScopedRefAggregate(T) 618 { 619 private T* m_ref; 620 621 @disable this(this); 622 623 this(ref T tref) pure { m_ref = &tref; } 624 625 //void opAssign(T value) { *m_ref = value; } 626 627 ref T unsafeGet() pure { return *m_ref; } 628 629 static if( is(T == shared) ){ 630 auto lock() pure { return .lock(unsafeGet()); } 631 } else { 632 mixin(isolatedAggregateMethodsString!T()); 633 //mixin isolatedAggregateMethods!T; 634 } 635 } 636 637 /// private 638 private struct ScopedRefArray(T) 639 { 640 alias V = typeof(T.init[0]) ; 641 private T* m_ref; 642 643 private @property ref T m_array() pure { return *m_ref; } 644 private @property ref const(T) m_array() const pure { return *m_ref; } 645 646 mixin isolatedArrayMethods!(V, !is(T == const) && !is(T == immutable)); 647 648 @disable this(this); 649 650 this(ref T tref) pure { m_ref = &tref; } 651 652 //void opAssign(T value) { *m_ref = value; } 653 654 ref T unsafeGet() pure { return *m_ref; } 655 } 656 657 /// private 658 private struct ScopedRefAssociativeArray(K, V) 659 { 660 alias K = KeyType!T; 661 alias V = ValueType!T; 662 private T* m_ref; 663 664 private @property ref T m_array() pure { return *m_ref; } 665 private @property ref const(T) m_array() const pure { return *m_ref; } 666 667 mixin isolatedAssociativeArrayMethods!(K, V); 668 669 @disable this(this); 670 671 this(ref T tref) pure { m_ref = &tref; } 672 673 //void opAssign(T value) { *m_ref = value; } 674 675 ref T unsafeGet() pure { return *m_ref; } 676 677 } 678 679 /******************************************************************************/ 680 /* COMMON MIXINS FOR NON-REF-ESCAPING WRAPPER STRUCTS */ 681 /******************************************************************************/ 682 683 /// private 684 /*private mixin template(T) isolatedAggregateMethods 685 { 686 mixin(isolatedAggregateMethodsString!T()); 687 }*/ 688 689 /// private 690 private string isolatedAggregateMethodsString(T)() 691 { 692 import vibe.internal.traits; 693 694 string ret = generateModuleImports!T(); 695 //pragma(msg, "Type '"~T.stringof~"'"); 696 foreach( mname; __traits(allMembers, T) ){ 697 static if (isPublicMember!(T, mname)) { 698 static if (isRWPlainField!(T, mname)) { 699 alias mtype = typeof(__traits(getMember, T, mname)) ; 700 auto mtypename = fullyQualifiedName!mtype; 701 //pragma(msg, " field " ~ mname ~ " : " ~ mtype.stringof); 702 ret ~= "@property ScopedRef!(const("~mtypename~")) "~mname~"() const pure { return ScopedRef!(const("~mtypename~"))(m_ref."~mname~"); }\n"; 703 ret ~= "@property ScopedRef!("~mtypename~") "~mname~"() pure { return ScopedRef!("~mtypename~")(m_ref."~mname~"); }\n"; 704 static if( !is(mtype == const) && !is(mtype == immutable) ){ 705 static if( isWeaklyIsolated!mtype ){ 706 ret ~= "@property void "~mname~"("~mtypename~" value) pure { m_ref."~mname~" = value; }\n"; 707 } else { 708 ret ~= "@property void "~mname~"(AT)(AT value) pure { static assert(isWeaklyIsolated!AT); m_ref."~mname~" = value.unsafeGet(); }\n"; 709 } 710 } 711 } else { 712 foreach( method; __traits(getOverloads, T, mname) ){ 713 alias ftype = FunctionTypeOf!method; 714 715 // only pure functions are allowed (or they could escape references to global variables) 716 // don't allow non-isolated references to be escaped 717 if( functionAttributes!ftype & FunctionAttribute.pure_ && 718 isWeaklyIsolated!(ReturnType!ftype) ) 719 { 720 static if( __traits(isStaticFunction, method) ){ 721 //pragma(msg, " static method " ~ mname ~ " : " ~ ftype.stringof); 722 ret ~= "static "~fullyQualifiedName!(ReturnType!ftype)~" "~mname~"("; 723 foreach( i, P; ParameterTypeTuple!ftype ){ 724 if( i > 0 ) ret ~= ", "; 725 ret ~= fullyQualifiedName!P ~ " p"~i.stringof; 726 } 727 ret ~= "){ return "~fullyQualifiedName!T~"."~mname~"("; 728 foreach( i, P; ParameterTypeTuple!ftype ){ 729 if( i > 0 ) ret ~= ", "; 730 ret ~= "p"~i.stringof; 731 } 732 ret ~= "); }\n"; 733 } else if (mname != "__ctor") { 734 //pragma(msg, " normal method " ~ mname ~ " : " ~ ftype.stringof); 735 if( is(ftype == const) ) ret ~= "const "; 736 if( is(ftype == shared) ) ret ~= "shared "; 737 if( is(ftype == immutable) ) ret ~= "immutable "; 738 if( functionAttributes!ftype & FunctionAttribute.pure_ ) ret ~= "pure "; 739 if( functionAttributes!ftype & FunctionAttribute.property ) ret ~= "@property "; 740 ret ~= fullyQualifiedName!(ReturnType!ftype)~" "~mname~"("; 741 foreach( i, P; ParameterTypeTuple!ftype ){ 742 if( i > 0 ) ret ~= ", "; 743 ret ~= fullyQualifiedName!P ~ " p"~i.stringof; 744 } 745 ret ~= "){ return m_ref."~mname~"("; 746 foreach( i, P; ParameterTypeTuple!ftype ){ 747 if( i > 0 ) ret ~= ", "; 748 ret ~= "p"~i.stringof; 749 } 750 ret ~= "); }\n"; 751 } 752 } 753 } 754 } 755 } //else pragma(msg, " non-public field " ~ mname); 756 } 757 return ret; 758 } 759 760 761 /// private 762 private mixin template isolatedArrayMethods(T, bool mutableRef = true) 763 { 764 @property size_t length() const pure { return m_array.length; } 765 766 @property bool empty() const pure { return m_array.length == 0; } 767 768 static if( mutableRef ){ 769 @property void length(size_t value) pure { m_array.length = value; } 770 771 772 void opOpAssign(string op = "~")(T item) pure 773 { 774 static if( isCopyable!T ) m_array ~= item; 775 else { 776 m_array.length++; 777 m_array[$-1] = item; 778 } 779 } 780 781 void opOpAssign(string op = "~")(IsolatedArray!T array) pure 782 { 783 static if( isCopyable!T ) m_array ~= array.m_array; 784 else { 785 size_t start = m_array.length; 786 m_array.length += array.length; 787 foreach( i, ref itm; array.m_array ) 788 m_array[start+i] = itm; 789 } 790 } 791 } 792 793 ScopedRef!(const(T)) opIndex(size_t idx) const pure { return ScopedRef!(const(T))(m_array[idx]); } 794 ScopedRef!T opIndex(size_t idx) pure { return ScopedRef!T(m_array[idx]); } 795 796 static if( !is(T == const) && !is(T == immutable) ) 797 void opIndexAssign(T value, size_t idx) pure { m_array[idx] = value; } 798 799 int opApply(int delegate(ref size_t, ref ScopedRef!T) del) 800 pure { 801 foreach( idx, ref v; m_array ){ 802 auto noref = ScopedRef!T(v); 803 if( auto ret = (cast(int delegate(ref size_t, ref ScopedRef!T) pure)del)(idx, noref) ) 804 return ret; 805 } 806 return 0; 807 } 808 809 int opApply(int delegate(ref size_t, ref ScopedRef!(const(T))) del) 810 const pure { 811 foreach( idx, ref v; m_array ){ 812 auto noref = ScopedRef!(const(T))(v); 813 if( auto ret = (cast(int delegate(ref size_t, ref ScopedRef!(const(T))) pure)del)(idx, noref) ) 814 return ret; 815 } 816 return 0; 817 } 818 819 int opApply(int delegate(ref ScopedRef!T) del) 820 pure { 821 foreach( v; m_array ){ 822 auto noref = ScopedRef!T(v); 823 if( auto ret = (cast(int delegate(ref ScopedRef!T) pure)del)(noref) ) 824 return ret; 825 } 826 return 0; 827 } 828 829 int opApply(int delegate(ref ScopedRef!(const(T))) del) 830 const pure { 831 foreach( v; m_array ){ 832 auto noref = ScopedRef!(const(T))(v); 833 if( auto ret = (cast(int delegate(ref ScopedRef!(const(T))) pure)del)(noref) ) 834 return ret; 835 } 836 return 0; 837 } 838 } 839 840 841 /// private 842 private mixin template isolatedAssociativeArrayMethods(K, V, bool mutableRef = true) 843 { 844 @property size_t length() const pure { return m_aa.length; } 845 @property bool empty() const pure { return m_aa.length == 0; } 846 847 static if( !is(V == const) && !is(V == immutable) ) 848 void opIndexAssign(V value, K key) pure { m_aa[key] = value; } 849 850 inout(V) opIndex(K key) inout pure { return m_aa[key]; } 851 852 int opApply(int delegate(ref ScopedRef!K, ref ScopedRef!V) del) 853 pure { 854 foreach( ref k, ref v; m_aa ) 855 if( auto ret = (cast(int delegate(ref ScopedRef!K, ref ScopedRef!V) pure)del)(k, v) ) 856 return ret; 857 return 0; 858 } 859 860 int opApply(int delegate(ref ScopedRef!V) del) 861 pure { 862 foreach( ref v; m_aa ) 863 if( auto ret = (cast(int delegate(ref ScopedRef!V) pure)del)(v) ) 864 return ret; 865 return 0; 866 } 867 868 int opApply(int delegate(ref ScopedRef!(const(K)), ref ScopedRef!(const(V))) del) 869 const pure { 870 foreach( ref k, ref v; m_aa ) 871 if( auto ret = (cast(int delegate(ref ScopedRef!(const(K)), ref ScopedRef!(const(V))) pure)del)(k, v) ) 872 return ret; 873 return 0; 874 } 875 876 int opApply(int delegate(ref ScopedRef!(const(V))) del) 877 const pure { 878 foreach( v; m_aa ) 879 if( auto ret = (cast(int delegate(ref ScopedRef!(const(V))) pure)del)(v) ) 880 return ret; 881 return 0; 882 } 883 } 884 885 886 /******************************************************************************/ 887 /* UTILITY FUNCTIONALITY */ 888 /******************************************************************************/ 889 890 // private 891 private @property string generateModuleImports(T)() 892 { 893 bool[string] visited; 894 //pragma(msg, "generateModuleImports "~T.stringof); 895 return generateModuleImportsImpl!T(visited); 896 } 897 898 private @property string generateModuleImportsImpl(T, TYPES...)(ref bool[string] visited) 899 { 900 string ret; 901 902 //pragma(msg, T); 903 //pragma(msg, TYPES); 904 905 static if( !haveTypeAlready!(T, TYPES) ){ 906 void addModule(string mod){ 907 if( mod !in visited ){ 908 ret ~= "static import "~mod~";\n"; 909 visited[mod] = true; 910 } 911 } 912 913 static if( isAggregateType!T && !is(typeof(T.__isWeakIsolatedType)) ){ // hack to avoid a recursive template instantiation when Isolated!T is passed to moduleName 914 addModule(moduleName!T); 915 916 foreach( member; __traits(allMembers, T) ){ 917 //static if( isPublicMember!(T, member) ){ 918 static if( !is(typeof(__traits(getMember, T, member))) ){ 919 // ignore sub types 920 } else static if( !is(FunctionTypeOf!(__traits(getMember, T, member)) == function) ){ 921 alias mtype = typeof(__traits(getMember, T, member)) ; 922 ret ~= generateModuleImportsImpl!(mtype, T, TYPES)(visited); 923 } else static if( is(T == class) || is(T == interface) ){ 924 foreach( overload; MemberFunctionsTuple!(T, member) ){ 925 ret ~= generateModuleImportsImpl!(ReturnType!overload, T, TYPES)(visited); 926 foreach( P; ParameterTypeTuple!overload ) 927 ret ~= generateModuleImportsImpl!(P, T, TYPES)(visited); 928 } 929 } // TODO: handle structs! 930 //} 931 } 932 } 933 else static if( isPointer!T ) ret ~= generateModuleImportsImpl!(PointerTarget!T, T, TYPES)(visited); 934 else static if( isArray!T ) ret ~= generateModuleImportsImpl!(typeof(T.init[0]), T, TYPES)(visited); 935 else static if( isAssociativeArray!T ) ret ~= generateModuleImportsImpl!(KeyType!T, T, TYPES)(visited) ~ generateModuleImportsImpl!(ValueType!T, T, TYPES)(visited); 936 } 937 938 return ret; 939 } 940 941 template haveTypeAlready(T, TYPES...) 942 { 943 static if( TYPES.length == 0 ) enum haveTypeAlready = false; 944 else static if( is(T == TYPES[0]) ) enum haveTypeAlready = true; 945 else alias haveTypeAlready = haveTypeAlready!(T, TYPES[1 ..$]); 946 } 947 948 949 /******************************************************************************/ 950 /* Additional traits useful for handling isolated data */ 951 /******************************************************************************/ 952 953 /** 954 Determines if the given list of types has any non-immutable aliasing outside of their object tree. 955 956 The types in particular may only contain plain data, pointers or arrays to immutable data, or references 957 encapsulated in stdx.typecons.Isolated. 958 */ 959 template isStronglyIsolated(T...) 960 { 961 static if (T.length == 0) enum bool isStronglyIsolated = true; 962 else static if (T.length > 1) enum bool isStronglyIsolated = isStronglyIsolated!(T[0 .. $/2]) && isStronglyIsolated!(T[$/2 .. $]); 963 else { 964 static if (is(T[0] == immutable)) enum bool isStronglyIsolated = true; 965 else static if(isInstanceOf!(Rebindable, T[0])) enum bool isStronglyIsolated = isStronglyIsolated!(typeof(T[0].get())); 966 else static if (is(typeof(T[0].__isIsolatedType))) enum bool isStronglyIsolated = true; 967 else static if (is(T[0] == class)) enum bool isStronglyIsolated = false; 968 else static if (is(T[0] == interface)) enum bool isStronglyIsolated = false; // can't know if the implementation is isolated 969 else static if (is(T[0] == delegate)) enum bool isStronglyIsolated = !!(functionAttributes!(T[0]) & FunctionAttribute.immutable_); 970 else static if (isDynamicArray!(T[0])) enum bool isStronglyIsolated = is(typeof(T[0].init[0]) == immutable); 971 else static if (isAssociativeArray!(T[0])) enum bool isStronglyIsolated = false; // TODO: be less strict here 972 else static if (isSomeFunction!(T[0])) enum bool isStronglyIsolated = true; // functions are immutable 973 else static if (isPointer!(T[0])) enum bool isStronglyIsolated = is(typeof(*T[0].init) == immutable); 974 else static if (isAggregateType!(T[0])) enum bool isStronglyIsolated = isStronglyIsolated!(FieldTypeTuple!(T[0])); 975 else enum bool isStronglyIsolated = true; 976 } 977 } 978 979 980 /** 981 Determines if the given list of types has any non-immutable and unshared aliasing outside of their object tree. 982 983 The types in particular may only contain plain data, pointers or arrays to immutable or shared data, or references 984 encapsulated in stdx.typecons.Isolated. Values that do not have unshared and unisolated aliasing are safe to be passed 985 between threads. 986 */ 987 template isWeaklyIsolated(T...) 988 { 989 static if (T.length == 0) enum bool isWeaklyIsolated = true; 990 else static if (T.length > 1) enum bool isWeaklyIsolated = isWeaklyIsolated!(T[0 .. $/2]) && isWeaklyIsolated!(T[$/2 .. $]); 991 else { 992 static if(is(T[0] == immutable)) enum bool isWeaklyIsolated = true; 993 else static if (is(T[0] == shared)) enum bool isWeaklyIsolated = true; 994 else static if (is(T[0] == Tid)) enum bool isWeaklyIsolated = true; 995 else static if (isInstanceOf!(Rebindable, T[0])) enum bool isWeaklyIsolated = isWeaklyIsolated!(typeof(T[0].get())); 996 else static if (is(T[0] : Throwable)) enum bool isWeaklyIsolated = true; // WARNING: this is unsafe, but needed for send/receive! 997 else static if (is(typeof(T[0].__isIsolatedType))) enum bool isWeaklyIsolated = true; 998 else static if (is(typeof(T[0].__isWeakIsolatedType))) enum bool isWeaklyIsolated = true; 999 else static if (is(T[0] == class)) enum bool isWeaklyIsolated = false; 1000 else static if (is(T[0] == interface)) enum bool isWeaklyIsolated = false; // can't know if the implementation is isolated 1001 else static if (is(T[0] == delegate)) enum bool isWeaklyIsolated = !!(functionAttributes!(T[0]) & (FunctionAttribute.shared_|FunctionAttribute.immutable_)); 1002 else static if (isDynamicArray!(T[0])) enum bool isWeaklyIsolated = is(typeof(T[0].init[0]) == immutable) || is(typeof(T[0].init[0]) == shared); 1003 else static if (isAssociativeArray!(T[0])) enum bool isWeaklyIsolated = false; // TODO: be less strict here 1004 else static if (isSomeFunction!(T[0])) enum bool isWeaklyIsolated = true; // functions are immutable 1005 else static if (isPointer!(T[0])) enum bool isWeaklyIsolated = is(typeof(*T[0].init) == immutable) || is(typeof(*T[0].init) == shared); 1006 else static if (isAggregateType!(T[0])) enum bool isWeaklyIsolated = isWeaklyIsolated!(FieldTypeTuple!(T[0])); 1007 else enum bool isWeaklyIsolated = true; 1008 } 1009 } 1010 1011 unittest { 1012 static class A { int x; string y; } 1013 1014 static struct B { 1015 string a; // strongly isolated 1016 Isolated!A b; // strongly isolated 1017 version(EnablePhobosFails) 1018 Isolated!(Isolated!A[]) c; // strongly isolated 1019 version(EnablePhobosFails) 1020 Isolated!(Isolated!A[string]) c; // AA implementation does not like this 1021 version(EnablePhobosFails) 1022 Isolated!(int[string]) d; // strongly isolated 1023 } 1024 1025 static struct C { 1026 string a; // strongly isolated 1027 shared(A) b; // weakly isolated 1028 Isolated!A c; // strongly isolated 1029 shared(A*) d; // weakly isolated 1030 shared(A[]) e; // weakly isolated 1031 shared(A[string]) f; // weakly isolated 1032 } 1033 1034 static struct D { A a; } // not isolated 1035 static struct E { void delegate() a; } // not isolated 1036 static struct ES { void delegate() shared a; } // weakly isolated 1037 static struct EI { void delegate() immutable a; } // strongly isolated 1038 static struct F { void function() a; } // strongly isolated (functions are immutable) 1039 static struct G { void test(); } // strongly isolated 1040 static struct H { A[] a; } // not isolated 1041 static interface I {} 1042 1043 static assert(!isStronglyIsolated!A); 1044 static assert(isStronglyIsolated!(FieldTypeTuple!A)); 1045 static assert(isStronglyIsolated!B); 1046 static assert(!isStronglyIsolated!C); 1047 static assert(!isStronglyIsolated!D); 1048 static assert(!isStronglyIsolated!E); 1049 static assert(!isStronglyIsolated!ES); 1050 static assert(isStronglyIsolated!EI); 1051 static assert(isStronglyIsolated!F); 1052 static assert(isStronglyIsolated!G); 1053 static assert(!isStronglyIsolated!H); 1054 static assert(!isStronglyIsolated!I); 1055 1056 static assert(!isWeaklyIsolated!A); 1057 static assert(isWeaklyIsolated!(FieldTypeTuple!A)); 1058 static assert(isWeaklyIsolated!B); 1059 static assert(isWeaklyIsolated!C); 1060 static assert(!isWeaklyIsolated!D); 1061 static assert(!isWeaklyIsolated!E); 1062 static assert(isWeaklyIsolated!ES); 1063 static assert(isWeaklyIsolated!EI); 1064 static assert(isWeaklyIsolated!F); 1065 static assert(isWeaklyIsolated!G); 1066 static assert(!isWeaklyIsolated!H); 1067 static assert(!isWeaklyIsolated!I); 1068 } 1069 1070 1071 template isCopyable(T) 1072 { 1073 static if( __traits(compiles, {foreach( t; [T.init]){}}) ) enum isCopyable = true; 1074 else enum isCopyable = false; 1075 } 1076 1077 1078 /******************************************************************************/ 1079 /* Future (promise) suppport */ 1080 /******************************************************************************/ 1081 1082 /** 1083 Represents a values that will be computed asynchronously. 1084 1085 This type uses $(D alias this) to enable transparent access to the result 1086 value. 1087 */ 1088 struct Future(T) { 1089 import vibe.internal.freelistref : FreeListRef; 1090 1091 private { 1092 alias ResultRef = FreeListRef!(shared(Tuple!(T, string))); 1093 1094 ResultRef m_result; 1095 Task m_task; 1096 } 1097 1098 /// Checks if the values was fully computed. 1099 @property bool ready() const @safe { return !m_task.running; } 1100 1101 /// Return the associated task. 1102 @property Task task() @safe { return m_task; } 1103 1104 /** Returns the computed value. 1105 1106 This function waits for the computation to finish, if necessary, and 1107 then returns the final value. In case of an uncaught exception 1108 happening during the computation, the exception will be thrown 1109 instead. 1110 */ 1111 ref T getResult() 1112 @safe { 1113 if (!ready) m_task.join(); 1114 assert(ready, "Task still running after join()!?"); 1115 1116 if (m_result.get[1].length) 1117 throw new Exception(m_result.get[1]); 1118 1119 // casting away shared is safe, because this is a unique reference 1120 return *() @trusted { return cast(T*)&m_result.get()[0]; } (); 1121 } 1122 1123 alias getResult this; 1124 1125 private void init() 1126 @safe { 1127 m_result = ResultRef(); 1128 } 1129 } 1130 1131 1132 /** 1133 Starts an asynchronous computation and returns a future for the result value. 1134 1135 If the supplied callable and arguments are all weakly isolated, 1136 $(D vibe.core.core.runWorkerTask) will be used to perform the computation in 1137 a separate worker thread. Otherwise, $(D vibe.core.core.runTask) will be 1138 used and the result is computed within a separate task within the calling thread. 1139 1140 Params: 1141 callable = A callable value, can be either a function, a delegate, or a 1142 user defined type that defines an $(D opCall). 1143 args = Arguments to pass to the callable. 1144 1145 Returns: 1146 Returns a $(D Future) object that can be used to access the result. 1147 1148 See_also: $(D isWeaklyIsolated) 1149 */ 1150 Future!(ReturnType!CALLABLE) async(CALLABLE, ARGS...)(CALLABLE callable, ARGS args) 1151 if (is(typeof(callable(args)) == ReturnType!CALLABLE)) 1152 { 1153 import vibe.core.core; 1154 import vibe.internal.freelistref : FreeListRef; 1155 import std.functional : toDelegate; 1156 1157 alias RET = ReturnType!CALLABLE; 1158 Future!RET ret; 1159 ret.init(); 1160 static void compute(Future!RET.ResultRef dst, CALLABLE callable, ARGS args) { 1161 try dst.get[0] = cast(shared(RET))callable(args); 1162 catch (Exception e) dst.get[1] = e.msg.length ? e.msg : "Asynchronous operation failed"; 1163 } 1164 static if (isWeaklyIsolated!CALLABLE && isWeaklyIsolated!ARGS) { 1165 ret.m_task = runWorkerTaskH(&compute, ret.m_result, callable, args); 1166 } else { 1167 ret.m_task = runTask(toDelegate(&compute), ret.m_result, callable, args); 1168 } 1169 return ret; 1170 } 1171 1172 /// 1173 @safe unittest { 1174 import vibe.core.core; 1175 import vibe.core.log; 1176 1177 void test() 1178 { 1179 static if (__VERSION__ >= 2065) { 1180 auto val = async({ 1181 logInfo("Starting to compute value in worker task."); 1182 sleep(500.msecs); // simulate some lengthy computation 1183 logInfo("Finished computing value in worker task."); 1184 return 32; 1185 }); 1186 1187 logInfo("Starting computation in main task"); 1188 sleep(200.msecs); // simulate some lengthy computation 1189 logInfo("Finished computation in main task. Waiting for async value."); 1190 logInfo("Result: %s", val.getResult()); 1191 } 1192 } 1193 } 1194 1195 /// 1196 unittest { 1197 int sum(int a, int b) 1198 { 1199 return a + b; 1200 } 1201 1202 static int sum2(int a, int b) 1203 { 1204 return a + b; 1205 } 1206 1207 void test() 1208 { 1209 // Using a delegate will use runTask internally 1210 assert(async(&sum, 2, 3).getResult() == 5); 1211 1212 // Using a static function will use runTaskWorker internally, 1213 // if all arguments are weakly isolated 1214 assert(async(&sum2, 2, 3).getResult() == 5); 1215 } 1216 } 1217 1218 Future!(ReturnType!CALLABLE) asyncWork(CALLABLE, ARGS...)(CALLABLE callable, ARGS args) @safe 1219 if (is(typeof(callable(args)) == ReturnType!CALLABLE) && 1220 isWeaklyIsolated!CALLABLE && isWeaklyIsolated!ARGS) 1221 { 1222 import vibe.core.core; 1223 import vibe.internal.freelistref : FreeListRef; 1224 import std.functional : toDelegate; 1225 1226 alias RET = ReturnType!CALLABLE; 1227 Future!RET ret; 1228 ret.init(); 1229 static void compute(Future!RET.ResultRef dst, CALLABLE callable, ARGS args) { 1230 try *cast(RET*)&dst.get[0] = callable(args); 1231 catch (Exception e) dst.get[1] = e.msg.length ? e.msg : "Asynchronous operation failed"; 1232 } 1233 ret.m_task = runWorkerTaskH(&compute, ret.m_result, callable, args); 1234 return ret; 1235 } 1236 1237 1238 /******************************************************************************/ 1239 /* std.concurrency compatible interface for message passing */ 1240 /******************************************************************************/ 1241 1242 enum ConcurrencyPrimitive { 1243 task, // Task run in the caller's thread (`runTask`) 1244 workerTask, // Task run in the worker thread pool (`runWorkerTask`) 1245 thread // Separate thread 1246 } 1247 1248 /** Sets the concurrency primitive to use for `śtd.concurrency.spawn()`. 1249 1250 By default, `spawn()` will start a thread for each call, mimicking the 1251 default behavior of `std.concurrency`. 1252 */ 1253 void setConcurrencyPrimitive(ConcurrencyPrimitive primitive) 1254 { 1255 import core.atomic : atomicStore; 1256 atomicStore(st_concurrencyPrimitive, primitive); 1257 } 1258 1259 void send(ARGS...)(Task task, ARGS args) { std.concurrency.send(task.tid, args); } 1260 void send(ARGS...)(Tid tid, ARGS args) { std.concurrency.send(tid, args); } 1261 void prioritySend(ARGS...)(Task task, ARGS args) { std.concurrency.prioritySend(task.tid, args); } 1262 void prioritySend(ARGS...)(Tid tid, ARGS args) { std.concurrency.prioritySend(tid, args); } 1263 1264 1265 package final class VibedScheduler : Scheduler { 1266 import core.sync.mutex; 1267 import core.stdc.stdlib : abort; 1268 import vibe.core.core; 1269 import vibe.core.log : LogLevel, logException; 1270 import vibe.core.sync; 1271 1272 override void start(void delegate() op) { op(); } 1273 override void spawn(void delegate() op) { 1274 import core.thread : Thread; 1275 1276 final switch (st_concurrencyPrimitive) with (ConcurrencyPrimitive) { 1277 case task: 1278 static void nothrow_wrapper(void delegate() op) { 1279 try op(); 1280 catch (Exception e) assert(false, e.msg); 1281 } 1282 runTask(¬hrow_wrapper, op); 1283 break; 1284 case workerTask: 1285 static void wrapper(shared(void delegate()) op) nothrow { 1286 try (cast(void delegate())op)(); 1287 catch (Exception e) { 1288 logException!(LogLevel.fatal)(e, "Uncaught exception in spawn()ed task"); 1289 abort(); 1290 } 1291 } 1292 runWorkerTask(&wrapper, cast(shared)op); 1293 break; 1294 case thread: 1295 auto t = new Thread(op); 1296 t.start(); 1297 break; 1298 } 1299 } 1300 override void yield() {} 1301 override @property ref ThreadInfo thisInfo() @trusted { return Task.getThis().tidInfo; } 1302 override TaskCondition newCondition(Mutex m) 1303 { 1304 try { 1305 return new TaskCondition(m); 1306 } catch(Exception e) { assert(false, e.msg); } 1307 } 1308 } 1309 1310 private shared ConcurrencyPrimitive st_concurrencyPrimitive = ConcurrencyPrimitive.thread;