1 /**
2 	Contains parallel computation primitives.
3 
4 	Copyright: © 2021 Sönke Ludwig
5 	Authors: Sönke Ludwig
6 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
7 */
8 module vibe.core.parallelism;
9 
10 public import vibe.core.taskpool;
11 
12 import vibe.core.channel;
13 import vibe.core.concurrency : isWeaklyIsolated;
14 import vibe.core.log;
15 import std.range : ElementType, isInputRange;
16 
17 
18 /** Processes a range of items in worker tasks and returns them as an unordered
19 	range.
20 
21 	The order of the result stream can deviate from the order of the input
22 	items, but the approach is more efficient that an ordered map.#
23 
24 	See_also: `parallelMap`
25 */
26 auto parallelUnorderedMap(alias fun, R)(R items, shared(TaskPool) task_pool, ChannelConfig channel_config = ChannelConfig.init)
27 	if (isInputRange!R && isWeaklyIsolated!(ElementType!R) && isWeaklyIsolated!(typeof(fun(ElementType!R.init))))
28 {
29 	import vibe.core.core : runTask;
30 	import core.atomic : atomicOp, atomicStore;
31 
32 	alias I = ElementType!R;
33 	alias O = typeof(fun(I.init));
34 
35 	ChannelConfig inconfig;
36 	inconfig.priority = ChannelPriority.overhead;
37 	auto chin = createChannel!I(inconfig);
38 	auto chout = createChannel!O(channel_config);
39 
40 	// TODO: discard all operations if the result range is not referenced anymore
41 
42 	static void senderFun(R items, Channel!I chin)
43 	nothrow {
44 		foreach (itm; items) {
45 			try chin.put(itm);
46 			catch (Exception e) {
47 				logException(e, "Failed to send parallel mapped input");
48 				break;
49 			}
50 		}
51 		chin.close();
52 	}
53 
54 	static void workerFun(Channel!I chin, Channel!O chout, shared(int)* rc)
55 	nothrow {
56 		I item;
57 		while (chin.tryConsumeOne(item)) {
58 			try chout.put(fun(item));
59 			catch (Exception e) {
60 				logException(e, "Failed to send back parallel mapped result");
61 				break;
62 			}
63 		}
64 		if (!atomicOp!"-="(*rc, 1))
65 			chout.close();
66 	}
67 
68 	runTask(&senderFun, items, chin);
69 
70 	auto rc = new shared int;
71 	atomicStore(*rc, cast(int)task_pool.threadCount);
72 
73 	task_pool.runTaskDist(&workerFun, chin, chout, rc);
74 
75 	static struct Result {
76 		private {
77 			Channel!O m_channel;
78 			O m_front;
79 			bool m_gotFront = false;
80 		}
81 
82 		@property bool empty()
83 		{
84 			fetchFront();
85 			return !m_gotFront;
86 		}
87 
88 		@property ref O front()
89 		{
90 			fetchFront();
91 			assert(m_gotFront, "Accessing empty prallelMap range.");
92 			return m_front;
93 		}
94 
95 		void popFront()
96 		{
97 			fetchFront();
98 			m_gotFront = false;
99 		}
100 
101 		private void fetchFront()
102 		{
103 			if (m_gotFront) return;
104 			m_gotFront = m_channel.tryConsumeOne(m_front);
105 		}
106 	}
107 
108 	return Result(chout);
109 }
110 
111 /// ditto
112 auto parallelUnorderedMap(alias fun, R)(R items, ChannelConfig channel_config = ChannelConfig.init)
113 	if (isInputRange!R && isWeaklyIsolated!(ElementType!R) && isWeaklyIsolated!(typeof(fun(ElementType!R.init))))
114 {
115 	import vibe.core.core : workerTaskPool;
116 	return parallelUnorderedMap!(fun, R)(items, workerTaskPool, channel_config);
117 }
118 
119 ///
120 unittest {
121 	import std.algorithm : isPermutation, map;
122 	import std.array : array;
123 	import std.range : iota;
124 
125 	auto res = iota(100)
126 		.parallelMap!(i => 2 * i)
127 		.array;
128 	assert(res.isPermutation(iota(100).map!(i => 2 * i).array));
129 }
130 
131 
132 /** Processes a range of items in worker tasks and returns them as an ordered
133 	range.
134 
135 	The items of the returned stream are in the same order as input. Note that
136 	this may require dynamic buffering of results, so it is recommended to
137 	use unordered mapping if possible.
138 
139 	See_also: `parallelUnorderedMap`
140 */
141 auto parallelMap(alias fun, R)(R items, shared(TaskPool) task_pool, ChannelConfig channel_config)
142 	if (isInputRange!R && isWeaklyIsolated!(ElementType!R) && isWeaklyIsolated!(typeof(fun(ElementType!R.init))))
143 {
144 	import std.algorithm : canFind, countUntil, move, remove;
145 	import std.container.array : Array;
146 	import std.range : enumerate;
147 	import std.typecons : RefCounted, Tuple;
148 
149 	alias I = ElementType!R;
150 	alias O = typeof(fun(I.init));
151 	static struct SR { size_t index; O value; }
152 
153 	auto resunord = items
154 		.enumerate
155 		.parallelUnorderedMap!(itm => SR(itm.index, fun(itm.value)))(task_pool);
156 
157 	static struct State {
158 		typeof(resunord) m_source;
159 		size_t m_index = 0, m_minIndex = -1;
160 		Array!SR m_buffer;
161 		int m_refCount = 0;
162 
163 		@property bool empty()
164 		{
165 			return m_source.empty && m_buffer.length == 0;
166 		}
167 		@property ref O front()
168 		{
169 			fetchFront();
170 			auto idx = m_buffer[].countUntil!(sr => sr.index == m_index);
171 			if (idx < 0) {
172 				assert(m_source.front.index == m_index);
173 				return m_source.front.value;
174 			}
175 			return m_buffer[idx].value;
176 		}
177 		void popFront()
178 		{
179 			m_index++;
180 
181 			auto idx = m_buffer[].countUntil!(sr => sr.index == m_index-1);
182 			if (idx < 0) {
183 				assert(m_source.front.index == m_index-1);
184 				m_source.popFront();
185 			} else {
186 				if (idx < m_buffer.length-1)
187 					m_buffer[idx] = m_buffer[$-1];
188 				m_buffer.removeBack();
189 			}
190 		}
191 
192 		private void fetchFront()
193 		{
194 			if (m_buffer[].canFind!(sr => sr.index == m_index))
195 				return;
196 
197 			while (m_source.front.index != m_index) {
198 				m_buffer ~= m_source.front;
199 				m_source.popFront();
200 			}
201 		}
202 	}
203 
204 	static struct Result {
205 		private RefCounted!State state;
206 
207 		@property bool empty() { return state.empty; }
208 		@property ref O front() { return state.front; }
209 		void popFront() { state.popFront; }
210 	}
211 
212 	return Result(RefCounted!State(resunord.move));
213 }
214 
215 /// ditto
216 auto parallelMap(alias fun, R)(R items, ChannelConfig channel_config = ChannelConfig.init)
217 	if (isInputRange!R && isWeaklyIsolated!(ElementType!R) && isWeaklyIsolated!(typeof(fun(ElementType!R.init))))
218 {
219 	import vibe.core.core : workerTaskPool;
220 	return parallelMap!(fun, R)(items, workerTaskPool, channel_config);
221 }
222 
223 ///
224 unittest {
225 	import std.algorithm : map;
226 	import std.array : array;
227 	import std.range : iota;
228 
229 	auto res = iota(100)
230 		.parallelMap!(i => 2 * i)
231 		.array;
232 	assert(res == iota(100).map!(i => 2 * i).array);
233 }
234 
235 ///
236 unittest {
237 	import std.algorithm : isPermutation, map;
238 	import std.array : array;
239 	import std.random : uniform;
240 	import std.range : iota;
241 	import core.time : msecs;
242 	import vibe.core.core : sleep;
243 
244 	// forcing a random computation result order still results in the same
245 	// output order
246 	auto res = iota(100)
247 		.parallelMap!((i) {
248 			sleep(uniform(0, 100).msecs);
249 			return 2 * i;
250 		})
251 		.array;
252 	assert(res == iota(100).map!(i => 2 * i).array);
253 }