1 /** 2 Contains parallel computation primitives. 3 4 Copyright: © 2021 Sönke Ludwig 5 Authors: Sönke Ludwig 6 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 7 */ 8 module vibe.core.parallelism; 9 10 public import vibe.core.taskpool; 11 12 import vibe.core.channel; 13 import vibe.core.concurrency : isWeaklyIsolated; 14 import vibe.core.log; 15 import std.range : ElementType, isInputRange; 16 17 18 /** Processes a range of items in worker tasks and returns them as an unordered 19 range. 20 21 The order of the result stream can deviate from the order of the input 22 items, but the approach is more efficient that an ordered map.# 23 24 See_also: `parallelMap` 25 */ 26 auto parallelUnorderedMap(alias fun, R)(R items, shared(TaskPool) task_pool, ChannelConfig channel_config = ChannelConfig.init) 27 if (isInputRange!R && isWeaklyIsolated!(ElementType!R) && isWeaklyIsolated!(typeof(fun(ElementType!R.init)))) 28 { 29 import vibe.core.core : runTask; 30 import core.atomic : atomicOp, atomicStore; 31 32 alias I = ElementType!R; 33 alias O = typeof(fun(I.init)); 34 35 ChannelConfig inconfig; 36 inconfig.priority = ChannelPriority.overhead; 37 auto chin = createChannel!I(inconfig); 38 auto chout = createChannel!O(channel_config); 39 40 // TODO: discard all operations if the result range is not referenced anymore 41 42 static void senderFun(R items, Channel!I chin) 43 nothrow { 44 foreach (itm; items) { 45 try chin.put(itm); 46 catch (Exception e) { 47 logException(e, "Failed to send parallel mapped input"); 48 break; 49 } 50 } 51 chin.close(); 52 } 53 54 static void workerFun(Channel!I chin, Channel!O chout, shared(int)* rc) 55 nothrow { 56 I item; 57 while (chin.tryConsumeOne(item)) { 58 try chout.put(fun(item)); 59 catch (Exception e) { 60 logException(e, "Failed to send back parallel mapped result"); 61 break; 62 } 63 } 64 if (!atomicOp!"-="(*rc, 1)) 65 chout.close(); 66 } 67 68 runTask(&senderFun, items, chin); 69 70 auto rc = new shared int; 71 atomicStore(*rc, cast(int)task_pool.threadCount); 72 73 task_pool.runTaskDist(&workerFun, chin, chout, rc); 74 75 static struct Result { 76 private { 77 Channel!O m_channel; 78 O m_front; 79 bool m_gotFront = false; 80 } 81 82 @property bool empty() 83 { 84 fetchFront(); 85 return !m_gotFront; 86 } 87 88 @property ref O front() 89 { 90 fetchFront(); 91 assert(m_gotFront, "Accessing empty prallelMap range."); 92 return m_front; 93 } 94 95 void popFront() 96 { 97 fetchFront(); 98 m_gotFront = false; 99 } 100 101 private void fetchFront() 102 { 103 if (m_gotFront) return; 104 m_gotFront = m_channel.tryConsumeOne(m_front); 105 } 106 } 107 108 return Result(chout); 109 } 110 111 /// ditto 112 auto parallelUnorderedMap(alias fun, R)(R items, ChannelConfig channel_config = ChannelConfig.init) 113 if (isInputRange!R && isWeaklyIsolated!(ElementType!R) && isWeaklyIsolated!(typeof(fun(ElementType!R.init)))) 114 { 115 import vibe.core.core : workerTaskPool; 116 return parallelUnorderedMap!(fun, R)(items, workerTaskPool, channel_config); 117 } 118 119 /// 120 unittest { 121 import std.algorithm : isPermutation, map; 122 import std.array : array; 123 import std.range : iota; 124 125 auto res = iota(100) 126 .parallelMap!(i => 2 * i) 127 .array; 128 assert(res.isPermutation(iota(100).map!(i => 2 * i).array)); 129 } 130 131 132 /** Processes a range of items in worker tasks and returns them as an ordered 133 range. 134 135 The items of the returned stream are in the same order as input. Note that 136 this may require dynamic buffering of results, so it is recommended to 137 use unordered mapping if possible. 138 139 See_also: `parallelUnorderedMap` 140 */ 141 auto parallelMap(alias fun, R)(R items, shared(TaskPool) task_pool, ChannelConfig channel_config) 142 if (isInputRange!R && isWeaklyIsolated!(ElementType!R) && isWeaklyIsolated!(typeof(fun(ElementType!R.init)))) 143 { 144 import std.algorithm : canFind, countUntil, move, remove; 145 import std.container.array : Array; 146 import std.range : enumerate; 147 import std.typecons : RefCounted, Tuple; 148 149 alias I = ElementType!R; 150 alias O = typeof(fun(I.init)); 151 static struct SR { size_t index; O value; } 152 153 auto resunord = items 154 .enumerate 155 .parallelUnorderedMap!(itm => SR(itm.index, fun(itm.value)))(task_pool); 156 157 static struct State { 158 typeof(resunord) m_source; 159 size_t m_index = 0, m_minIndex = -1; 160 Array!SR m_buffer; 161 int m_refCount = 0; 162 163 @property bool empty() 164 { 165 return m_source.empty && m_buffer.length == 0; 166 } 167 @property ref O front() 168 { 169 fetchFront(); 170 auto idx = m_buffer[].countUntil!(sr => sr.index == m_index); 171 if (idx < 0) { 172 assert(m_source.front.index == m_index); 173 return m_source.front.value; 174 } 175 return m_buffer[idx].value; 176 } 177 void popFront() 178 { 179 m_index++; 180 181 auto idx = m_buffer[].countUntil!(sr => sr.index == m_index-1); 182 if (idx < 0) { 183 assert(m_source.front.index == m_index-1); 184 m_source.popFront(); 185 } else { 186 if (idx < m_buffer.length-1) 187 m_buffer[idx] = m_buffer[$-1]; 188 m_buffer.removeBack(); 189 } 190 } 191 192 private void fetchFront() 193 { 194 if (m_buffer[].canFind!(sr => sr.index == m_index)) 195 return; 196 197 while (m_source.front.index != m_index) { 198 m_buffer ~= m_source.front; 199 m_source.popFront(); 200 } 201 } 202 } 203 204 static struct Result { 205 private RefCounted!State state; 206 207 @property bool empty() { return state.empty; } 208 @property ref O front() { return state.front; } 209 void popFront() { state.popFront; } 210 } 211 212 return Result(RefCounted!State(resunord.move)); 213 } 214 215 /// ditto 216 auto parallelMap(alias fun, R)(R items, ChannelConfig channel_config = ChannelConfig.init) 217 if (isInputRange!R && isWeaklyIsolated!(ElementType!R) && isWeaklyIsolated!(typeof(fun(ElementType!R.init)))) 218 { 219 import vibe.core.core : workerTaskPool; 220 return parallelMap!(fun, R)(items, workerTaskPool, channel_config); 221 } 222 223 /// 224 unittest { 225 import std.algorithm : map; 226 import std.array : array; 227 import std.range : iota; 228 229 auto res = iota(100) 230 .parallelMap!(i => 2 * i) 231 .array; 232 assert(res == iota(100).map!(i => 2 * i).array); 233 } 234 235 /// 236 unittest { 237 import std.algorithm : isPermutation, map; 238 import std.array : array; 239 import std.random : uniform; 240 import std.range : iota; 241 import core.time : msecs; 242 import vibe.core.core : sleep; 243 244 // forcing a random computation result order still results in the same 245 // output order 246 auto res = iota(100) 247 .parallelMap!((i) { 248 sleep(uniform(0, 100).msecs); 249 return 2 * i; 250 }) 251 .array; 252 assert(res == iota(100).map!(i => 2 * i).array); 253 }