1 /** Implements a thread-safe, typed producer-consumer queue.
2 
3 	Copyright: © 2017-2019 Sönke Ludwig
4 	Authors: Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 */
7 module vibe.core.channel;
8 
9 import vibe.core.sync : TaskCondition;
10 import vibe.internal.array : FixedRingBuffer;
11 
12 import std.algorithm.mutation : move, swap;
13 import std.exception : enforce;
14 import core.sync.mutex;
15 
16 // multiple producers allowed, multiple consumers allowed - Q: should this be restricted to allow higher performance? maybe configurable?
17 // currently always buffered - TODO: implement blocking non-buffered mode
18 // TODO: implement a multi-channel wait, e.g.
19 // TaggedAlgebraic!(...) consumeAny(ch1, ch2, ch3); - requires a waitOnMultipleConditions function
20 
21 // NOTE: not using synchronized (m_mutex) because it is not nothrow
22 
23 
24 /** Creates a new channel suitable for cross-task and cross-thread communication.
25 */
26 Channel!(T, buffer_size) createChannel(T, size_t buffer_size = 100)(ChannelConfig config = ChannelConfig.init)
27 {
28 	Channel!(T, buffer_size) ret;
29 	ret.m_impl = new shared ChannelImpl!(T, buffer_size)(config);
30 	return ret;
31 }
32 
33 struct ChannelConfig {
34 	ChannelPriority priority = ChannelPriority.latency;
35 }
36 
37 enum ChannelPriority {
38 	/** Minimize latency
39 
40 		Triggers readers immediately once data is available and triggers writers
41 		as soon as the queue has space.
42 	*/
43 	latency,
44 
45 	/** Minimize overhead.
46 
47 		Triggers readers once the queue is full and triggers writers once the
48 		queue is empty in order to maximize batch sizes and minimize
49 		synchronization overhead.
50 
51 		Note that in this mode it is necessary to close the channel to ensure
52 		that the buffered data is fully processed.
53 	*/
54 	overhead
55 }
56 
57 /** Thread-safe typed data channel implementation.
58 
59 	The implementation supports multiple-reader-multiple-writer operation across
60 	multiple tasks in multiple threads.
61 */
62 struct Channel(T, size_t buffer_size = 100) {
63 	enum bufferSize = buffer_size;
64 
65 	private shared ChannelImpl!(T, buffer_size) m_impl;
66 
67 	/** Determines whether there is more data to read in a single-reader scenario.
68 
69 		This property is empty $(I iff) no more elements are in the internal
70 		buffer and `close()` has been called. Once the channel is empty,
71 		subsequent calls to `consumeOne` or `consumeAll` will throw an
72 		exception.
73 
74 		Note that relying on the return value to determine whether another
75 		element can be read is only safe in a single-reader scenario. It is
76 		generally recommended to use `tryConsumeOne` instead.
77 	*/
78 	@property bool empty() { return m_impl.empty; }
79 	/// ditto
80 	@property bool empty() shared { return m_impl.empty; }
81 
82 	/** Returns the current count of items in the buffer.
83 
84 		This function is useful for diagnostic purposes.
85 	*/
86 	@property size_t bufferFill() { return m_impl.bufferFill; }
87 	/// ditto
88 	@property size_t bufferFill() shared { return m_impl.bufferFill; }
89 
90 	/** Closes the channel.
91 
92 		A closed channel does not accept any new items enqueued using `put` and
93 		causes `empty` to return `fals` as soon as all preceeding elements have
94 		been consumed.
95 	*/
96 	void close() { m_impl.close(); }
97 	/// ditto
98 	void close() shared { m_impl.close(); }
99 
100 	/** Consumes a single element off the queue.
101 
102 		This function will block if no elements are available. If the `empty`
103 		property is `true`, an exception will be thrown.
104 
105 		Note that it is recommended to use `tryConsumeOne` instead of a
106 		combination of `empty` and `consumeOne` due to being more efficient and
107 		also being reliable in a multiple-reader scenario.
108 	*/
109 	T consumeOne() { return m_impl.consumeOne(); }
110 	/// ditto
111 	T consumeOne() shared { return m_impl.consumeOne(); }
112 
113 	/** Attempts to consume a single element.
114 
115 		If no more elements are available and the channel has been closed,
116 		`false` is returned and `dst` is left untouched.
117 	*/
118 	bool tryConsumeOne(ref T dst) { return m_impl.tryConsumeOne(dst); }
119 	/// ditto
120 	bool tryConsumeOne(ref T dst) shared { return m_impl.tryConsumeOne(dst); }
121 
122 	/** Attempts to consume all elements currently in the queue.
123 
124 		This function will block if no elements are available. Once at least one
125 		element is available, the contents of `dst` will be replaced with all
126 		available elements.
127 
128 		If the `empty` property is or becomes `true` before data becomes
129 		avaiable, `dst` will be left untouched and `false` is returned.
130 	*/
131 	bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst)
132 		in { assert(dst.empty); }
133 		do { return m_impl.consumeAll(dst); }
134 	/// ditto
135 	bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) shared
136 		in { assert(dst.empty); }
137 		do { return m_impl.consumeAll(dst); }
138 
139 	/** Enqueues an element.
140 
141 		This function may block the the event that the internal buffer is full.
142 	*/
143 	void put(T item) { m_impl.put(item.move); }
144 	/// ditto
145 	void put(T item) shared { m_impl.put(item.move); }
146 }
147 
148 
149 private final class ChannelImpl(T, size_t buffer_size) {
150 	import vibe.core.concurrency : isWeaklyIsolated;
151 	static assert(isWeaklyIsolated!T, "Channel data type "~T.stringof~" is not safe to pass between threads.");
152 
153 	private {
154 		Mutex m_mutex;
155 		TaskCondition m_condition;
156 		FixedRingBuffer!(T, buffer_size) m_items;
157 		bool m_closed = false;
158 		ChannelConfig m_config;
159 	}
160 
161 	this(ChannelConfig config)
162 	shared @trusted nothrow {
163 		m_mutex = cast(shared)new Mutex;
164 		m_condition = cast(shared)new TaskCondition(cast(Mutex)m_mutex);
165 		m_config = config;
166 	}
167 
168 	@property bool empty()
169 	shared nothrow {
170 		{
171 			m_mutex.lock_nothrow();
172 			scope (exit) m_mutex.unlock_nothrow();
173 
174 			auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
175 
176 			// ensure that in a single-reader scenario !empty guarantees a
177 			// successful call to consumeOne
178 			while (!thisus.m_closed && thisus.m_items.empty)
179 				thisus.m_condition.wait();
180 
181 			return thisus.m_closed && thisus.m_items.empty;
182 		}
183 	}
184 
185 	@property size_t bufferFill()
186 	shared nothrow {
187 		{
188 			m_mutex.lock_nothrow();
189 			scope (exit) m_mutex.unlock_nothrow();
190 
191 			auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
192 			return thisus.m_items.length;
193 		}
194 	}
195 
196 	void close()
197 	shared nothrow {
198 		{
199 			m_mutex.lock_nothrow();
200 			scope (exit) m_mutex.unlock_nothrow();
201 
202 			auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
203 			thisus.m_closed = true;
204 			thisus.m_condition.notifyAll();
205 		}
206 	}
207 
208 	bool tryConsumeOne(ref T dst)
209 	shared nothrow {
210 		auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
211 		bool need_notify = false;
212 
213 		{
214 			m_mutex.lock_nothrow();
215 			scope (exit) m_mutex.unlock_nothrow();
216 
217 			while (thisus.m_items.empty) {
218 				if (m_closed) return false;
219 				thisus.m_condition.wait();
220 			}
221 
222 			if (m_config.priority == ChannelPriority.latency)
223 				need_notify = thisus.m_items.full;
224 
225 			move(thisus.m_items.front, dst);
226 			thisus.m_items.popFront();
227 
228 			if (m_config.priority == ChannelPriority.overhead)
229 				need_notify = thisus.m_items.empty;
230 		}
231 
232 		if (need_notify) {
233 			if (m_config.priority == ChannelPriority.overhead)
234 				thisus.m_condition.notifyAll();
235 			else
236 				thisus.m_condition.notify();
237 		}
238 
239 		return true;
240 	}
241 
242 	T consumeOne()
243 	shared {
244 		T ret;
245 		if (!tryConsumeOne(ret))
246 			throw new Exception("Attempt to consume from an empty channel.");
247 		return ret;
248 	}
249 
250 	bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst)
251 	shared nothrow {
252 		auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
253 		bool need_notify = false;
254 
255 		{
256 			m_mutex.lock_nothrow();
257 			scope (exit) m_mutex.unlock_nothrow();
258 
259 			while (thisus.m_items.empty) {
260 				if (m_closed) return false;
261 				thisus.m_condition.wait();
262 			}
263 
264 			if (m_config.priority == ChannelPriority.latency)
265 				need_notify = thisus.m_items.full;
266 
267 			swap(thisus.m_items, dst);
268 
269 			if (m_config.priority == ChannelPriority.overhead)
270 				need_notify = true;
271 		}
272 
273 		if (need_notify) {
274 			if (m_config.priority == ChannelPriority.overhead)
275 				thisus.m_condition.notifyAll();
276 			else thisus.m_condition.notify();
277 		}
278 
279 		return true;
280 	}
281 
282 	void put(T item)
283 	shared {
284 		auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
285 		bool need_notify = false;
286 
287 		{
288 			m_mutex.lock_nothrow();
289 			scope (exit) m_mutex.unlock_nothrow();
290 
291 			enforce(!m_closed, "Sending on closed channel.");
292 			while (thisus.m_items.full)
293 				thisus.m_condition.wait();
294 			if (m_config.priority == ChannelPriority.latency)
295 				need_notify = thisus.m_items.empty;
296 			thisus.m_items.put(item.move);
297 			if (m_config.priority == ChannelPriority.overhead)
298 				need_notify = thisus.m_items.full;
299 		}
300 
301 		if (need_notify) {
302 			if (m_config.priority == ChannelPriority.overhead)
303 				thisus.m_condition.notifyAll();
304 			else thisus.m_condition.notify();
305 		}
306 	}
307 }
308 
309 @safe unittest { // test basic operation and non-copyable struct compatiblity
310 	static struct S {
311 		int i;
312 		@disable this(this);
313 	}
314 
315 	auto ch = createChannel!S;
316 	ch.put(S(1));
317 	assert(ch.consumeOne().i == 1);
318 	ch.put(S(4));
319 	ch.put(S(5));
320 	{
321 		FixedRingBuffer!(S, 100) buf;
322 		ch.consumeAll(buf);
323 		assert(buf.length == 2);
324 		assert(buf[0].i == 4);
325 		assert(buf[1].i == 5);
326 	}
327 	ch.put(S(2));
328 	assert(!ch.empty);
329 	ch.close();
330 	assert(!ch.empty);
331 	S v;
332 	assert(ch.tryConsumeOne(v));
333 	assert(v.i == 2);
334 	assert(ch.empty);
335 	assert(!ch.tryConsumeOne(v));
336 }
337 
338 @safe unittest { // make sure shared(Channel!T) can also be used
339 	shared ch = createChannel!int;
340 	ch.put(1);
341 	assert(!ch.empty);
342 	assert(ch.consumeOne == 1);
343 	ch.close();
344 	assert(ch.empty);
345 }
346 
347 @safe unittest { // ensure nothrow'ness for throwing struct
348 	static struct S {
349 		this(this) { throw new Exception("meh!"); }
350 	}
351 	auto ch = createChannel!S;
352 	ch.put(S.init);
353 	ch.put(S.init);
354 
355 	S s;
356 	FixedRingBuffer!(S, 100, true) sb;
357 
358 	() nothrow {
359 		assert(ch.tryConsumeOne(s));
360 		assert(ch.consumeAll(sb));
361 		assert(sb.length == 1);
362 		ch.close();
363 		assert(ch.empty);
364 	} ();
365 }
366 
367 unittest {
368 	import std.traits : EnumMembers;
369 	import vibe.core.core : runTask;
370 
371 	void test(ChannelPriority prio)
372 	{
373 		auto ch = createChannel!int(ChannelConfig(prio));
374 		runTask(() nothrow {
375 			try {
376 				ch.put(1);
377 				ch.put(2);
378 				ch.put(3);
379 			} catch (Exception e) assert(false, e.msg);
380 			ch.close();
381 		});
382 
383 		int i;
384 		assert(ch.tryConsumeOne(i) && i == 1);
385 		assert(ch.tryConsumeOne(i) && i == 2);
386 		assert(ch.tryConsumeOne(i) && i == 3);
387 		assert(!ch.tryConsumeOne(i));
388 	}
389 
390 	foreach (m; EnumMembers!ChannelPriority)
391 		test(m);
392 }