1 /** Implements a thread-safe, typed producer-consumer queue.
2 
3 	Copyright: © 2017-2019 Sönke Ludwig
4 	Authors: Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 */
7 module vibe.core.channel;
8 
9 import vibe.core.sync : TaskCondition;
10 import vibe.internal.array : FixedRingBuffer;
11 
12 import std.algorithm.mutation : move, swap;
13 import std.exception : enforce;
14 import core.sync.mutex;
15 
16 // multiple producers allowed, multiple consumers allowed - Q: should this be restricted to allow higher performance? maybe configurable?
17 // currently always buffered - TODO: implement blocking non-buffered mode
18 // TODO: implement a multi-channel wait, e.g.
19 // TaggedAlgebraic!(...) consumeAny(ch1, ch2, ch3); - requires a waitOnMultipleConditions function
20 
21 // NOTE: not using synchronized (m_mutex) because it is not nothrow
22 
23 
24 /** Creates a new channel suitable for cross-task and cross-thread communication.
25 */
26 Channel!(T, buffer_size) createChannel(T, size_t buffer_size = 100)()
27 {
28 	Channel!(T, buffer_size) ret;
29 	ret.m_impl = new shared ChannelImpl!(T, buffer_size);
30 	return ret;
31 }
32 
33 
34 /** Thread-safe typed data channel implementation.
35 
36 	The implementation supports multiple-reader-multiple-writer operation across
37 	multiple tasks in multiple threads.
38 */
39 struct Channel(T, size_t buffer_size = 100) {
40 	enum bufferSize = buffer_size;
41 
42 	private shared ChannelImpl!(T, buffer_size) m_impl;
43 
44 	/** Determines whether there is more data to read in a single-reader scenario.
45 
46 		This property is empty $(I iff) no more elements are in the internal
47 		buffer and `close()` has been called. Once the channel is empty,
48 		subsequent calls to `consumeOne` or `consumeAll` will throw an
49 		exception.
50 
51 		Note that relying on the return value to determine whether another
52 		element can be read is only safe in a single-reader scenario. It is
53 		generally recommended to use `tryConsumeOne` instead.
54 	*/
55 	@property bool empty() { return m_impl.empty; }
56 	/// ditto
57 	@property bool empty() shared { return m_impl.empty; }
58 
59 	/** Returns the current count of items in the buffer.
60 
61 		This function is useful for diagnostic purposes.
62 	*/
63 	@property size_t bufferFill() { return m_impl.bufferFill; }
64 	/// ditto
65 	@property size_t bufferFill() shared { return m_impl.bufferFill; }
66 
67 	/** Closes the channel.
68 
69 		A closed channel does not accept any new items enqueued using `put` and
70 		causes `empty` to return `fals` as soon as all preceeding elements have
71 		been consumed.
72 	*/
73 	void close() { m_impl.close(); }
74 	/// ditto
75 	void close() shared { m_impl.close(); }
76 
77 	/** Consumes a single element off the queue.
78 
79 		This function will block if no elements are available. If the `empty`
80 		property is `true`, an exception will be thrown.
81 
82 		Note that it is recommended to use `tryConsumeOne` instead of a
83 		combination of `empty` and `consumeOne` due to being more efficient and
84 		also being reliable in a multiple-reader scenario.
85 	*/
86 	T consumeOne() { return m_impl.consumeOne(); }
87 	/// ditto
88 	T consumeOne() shared { return m_impl.consumeOne(); }
89 
90 	/** Attempts to consume a single element.
91 
92 		If no more elements are available and the channel has been closed,
93 		`false` is returned and `dst` is left untouched.
94 	*/
95 	bool tryConsumeOne(ref T dst) { return m_impl.tryConsumeOne(dst); }
96 	/// ditto
97 	bool tryConsumeOne(ref T dst) shared { return m_impl.tryConsumeOne(dst); }
98 
99 	/** Attempts to consume all elements currently in the queue.
100 
101 		This function will block if no elements are available. Once at least one
102 		element is available, the contents of `dst` will be replaced with all
103 		available elements.
104 
105 		If the `empty` property is or becomes `true` before data becomes
106 		avaiable, `dst` will be left untouched and `false` is returned.
107 	*/
108 	bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst)
109 		in { assert(dst.empty); }
110 		do { return m_impl.consumeAll(dst); }
111 	/// ditto
112 	bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) shared
113 		in { assert(dst.empty); }
114 		do { return m_impl.consumeAll(dst); }
115 
116 	/** Enqueues an element.
117 
118 		This function may block the the event that the internal buffer is full.
119 	*/
120 	void put(T item) { m_impl.put(item.move); }
121 	/// ditto
122 	void put(T item) shared { m_impl.put(item.move); }
123 }
124 
125 
126 private final class ChannelImpl(T, size_t buffer_size) {
127 	import vibe.core.concurrency : isWeaklyIsolated;
128 	static assert(isWeaklyIsolated!T, "Channel data type "~T.stringof~" is not safe to pass between threads.");
129 
130 	private {
131 		Mutex m_mutex;
132 		TaskCondition m_condition;
133 		FixedRingBuffer!(T, buffer_size) m_items;
134 		bool m_closed = false;
135 	}
136 
137 	this()
138 	shared @trusted {
139 		m_mutex = cast(shared)new Mutex;
140 		m_condition = cast(shared)new TaskCondition(cast(Mutex)m_mutex);
141 	}
142 
143 	@property bool empty()
144 	shared nothrow {
145 		{
146 			m_mutex.lock_nothrow();
147 			scope (exit) m_mutex.unlock_nothrow();
148 
149 			auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
150 
151 			// ensure that in a single-reader scenario !empty guarantees a
152 			// successful call to consumeOne
153 			while (!thisus.m_closed && thisus.m_items.empty)
154 				thisus.m_condition.wait();
155 
156 			return thisus.m_closed && thisus.m_items.empty;
157 		}
158 	}
159 
160 	@property size_t bufferFill()
161 	shared nothrow {
162 		{
163 			m_mutex.lock_nothrow();
164 			scope (exit) m_mutex.unlock_nothrow();
165 
166 			auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
167 			return thisus.m_items.length;
168 		}
169 	}
170 
171 	void close()
172 	shared nothrow {
173 		{
174 			m_mutex.lock_nothrow();
175 			scope (exit) m_mutex.unlock_nothrow();
176 
177 			auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
178 			thisus.m_closed = true;
179 			thisus.m_condition.notifyAll();
180 		}
181 	}
182 
183 	bool tryConsumeOne(ref T dst)
184 	shared nothrow {
185 		auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
186 		bool was_full = false;
187 
188 		{
189 			m_mutex.lock_nothrow();
190 			scope (exit) m_mutex.unlock_nothrow();
191 
192 			while (thisus.m_items.empty) {
193 				if (m_closed) return false;
194 				thisus.m_condition.wait();
195 			}
196 			was_full = thisus.m_items.full;
197 			move(thisus.m_items.front, dst);
198 			thisus.m_items.popFront();
199 		}
200 
201 		if (was_full) thisus.m_condition.notify();
202 
203 		return true;
204 	}
205 
206 	T consumeOne()
207 	shared {
208 		auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
209 		T ret;
210 		bool was_full = false;
211 
212 		{
213 			m_mutex.lock_nothrow();
214 			scope (exit) m_mutex.unlock_nothrow();
215 
216 			while (thisus.m_items.empty) {
217 				if (m_closed) throw new Exception("Attempt to consume from an empty channel.");
218 				thisus.m_condition.wait();
219 			}
220 			was_full = thisus.m_items.full;
221 			move(thisus.m_items.front, ret);
222 			thisus.m_items.popFront();
223 		}
224 
225 		if (was_full) thisus.m_condition.notify();
226 
227 		return ret.move;
228 	}
229 
230 	bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst)
231 	shared nothrow {
232 		auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
233 		bool was_full = false;
234 
235 		{
236 			m_mutex.lock_nothrow();
237 			scope (exit) m_mutex.unlock_nothrow();
238 
239 			while (thisus.m_items.empty) {
240 				if (m_closed) return false;
241 				thisus.m_condition.wait();
242 			}
243 
244 			was_full = thisus.m_items.full;
245 			swap(thisus.m_items, dst);
246 		}
247 
248 		if (was_full) thisus.m_condition.notify();
249 
250 		return true;
251 	}
252 
253 	void put(T item)
254 	shared {
255 		auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
256 		bool need_notify = false;
257 
258 		{
259 			m_mutex.lock_nothrow();
260 			scope (exit) m_mutex.unlock_nothrow();
261 
262 			enforce(!m_closed, "Sending on closed channel.");
263 			while (thisus.m_items.full)
264 				thisus.m_condition.wait();
265 			need_notify = thisus.m_items.empty;
266 			thisus.m_items.put(item.move);
267 		}
268 
269 		if (need_notify) thisus.m_condition.notify();
270 	}
271 }
272 
273 @safe unittest { // test basic operation and non-copyable struct compatiblity
274 	static struct S {
275 		int i;
276 		@disable this(this);
277 	}
278 
279 	auto ch = createChannel!S;
280 	ch.put(S(1));
281 	assert(ch.consumeOne().i == 1);
282 	ch.put(S(4));
283 	ch.put(S(5));
284 	{
285 		FixedRingBuffer!(S, 100) buf;
286 		ch.consumeAll(buf);
287 		assert(buf.length == 2);
288 		assert(buf[0].i == 4);
289 		assert(buf[1].i == 5);
290 	}
291 	ch.put(S(2));
292 	assert(!ch.empty);
293 	ch.close();
294 	assert(!ch.empty);
295 	S v;
296 	assert(ch.tryConsumeOne(v));
297 	assert(v.i == 2);
298 	assert(ch.empty);
299 	assert(!ch.tryConsumeOne(v));
300 }
301 
302 @safe unittest { // make sure shared(Channel!T) can also be used
303 	shared ch = createChannel!int;
304 	ch.put(1);
305 	assert(!ch.empty);
306 	assert(ch.consumeOne == 1);
307 	ch.close();
308 	assert(ch.empty);
309 }
310 
311 @safe unittest { // ensure nothrow'ness for throwing struct
312 	static struct S {
313 		this(this) { throw new Exception("meh!"); }
314 	}
315 	auto ch = createChannel!S;
316 	ch.put(S.init);
317 	ch.put(S.init);
318 
319 	S s;
320 	FixedRingBuffer!(S, 100, true) sb;
321 
322 	() nothrow {
323 		assert(ch.tryConsumeOne(s));
324 		assert(ch.consumeAll(sb));
325 		assert(sb.length == 1);
326 		ch.close();
327 		assert(ch.empty);
328 	} ();
329 }