1 /** Implements a thread-safe, typed producer-consumer queue. 2 3 Copyright: © 2017-2019 Sönke Ludwig 4 Authors: Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 */ 7 module vibe.core.channel; 8 9 import vibe.core.sync : TaskCondition; 10 import vibe.internal.array : FixedRingBuffer; 11 12 import std.algorithm.mutation : move, swap; 13 import std.exception : enforce; 14 import core.sync.mutex; 15 16 // multiple producers allowed, multiple consumers allowed - Q: should this be restricted to allow higher performance? maybe configurable? 17 // currently always buffered - TODO: implement blocking non-buffered mode 18 // TODO: implement a multi-channel wait, e.g. 19 // TaggedAlgebraic!(...) consumeAny(ch1, ch2, ch3); - requires a waitOnMultipleConditions function 20 21 // NOTE: not using synchronized (m_mutex) because it is not nothrow 22 23 24 /** Creates a new channel suitable for cross-task and cross-thread communication. 25 */ 26 Channel!(T, buffer_size) createChannel(T, size_t buffer_size = 100)() 27 { 28 Channel!(T, buffer_size) ret; 29 ret.m_impl = new shared ChannelImpl!(T, buffer_size); 30 return ret; 31 } 32 33 34 /** Thread-safe typed data channel implementation. 35 36 The implementation supports multiple-reader-multiple-writer operation across 37 multiple tasks in multiple threads. 38 */ 39 struct Channel(T, size_t buffer_size = 100) { 40 enum bufferSize = buffer_size; 41 42 private shared ChannelImpl!(T, buffer_size) m_impl; 43 44 /** Determines whether there is more data to read in a single-reader scenario. 45 46 This property is empty $(I iff) no more elements are in the internal 47 buffer and `close()` has been called. Once the channel is empty, 48 subsequent calls to `consumeOne` or `consumeAll` will throw an 49 exception. 50 51 Note that relying on the return value to determine whether another 52 element can be read is only safe in a single-reader scenario. It is 53 generally recommended to use `tryConsumeOne` instead. 54 */ 55 @property bool empty() { return m_impl.empty; } 56 /// ditto 57 @property bool empty() shared { return m_impl.empty; } 58 59 /** Returns the current count of items in the buffer. 60 61 This function is useful for diagnostic purposes. 62 */ 63 @property size_t bufferFill() { return m_impl.bufferFill; } 64 /// ditto 65 @property size_t bufferFill() shared { return m_impl.bufferFill; } 66 67 /** Closes the channel. 68 69 A closed channel does not accept any new items enqueued using `put` and 70 causes `empty` to return `fals` as soon as all preceeding elements have 71 been consumed. 72 */ 73 void close() { m_impl.close(); } 74 /// ditto 75 void close() shared { m_impl.close(); } 76 77 /** Consumes a single element off the queue. 78 79 This function will block if no elements are available. If the `empty` 80 property is `true`, an exception will be thrown. 81 82 Note that it is recommended to use `tryConsumeOne` instead of a 83 combination of `empty` and `consumeOne` due to being more efficient and 84 also being reliable in a multiple-reader scenario. 85 */ 86 T consumeOne() { return m_impl.consumeOne(); } 87 /// ditto 88 T consumeOne() shared { return m_impl.consumeOne(); } 89 90 /** Attempts to consume a single element. 91 92 If no more elements are available and the channel has been closed, 93 `false` is returned and `dst` is left untouched. 94 */ 95 bool tryConsumeOne(ref T dst) { return m_impl.tryConsumeOne(dst); } 96 /// ditto 97 bool tryConsumeOne(ref T dst) shared { return m_impl.tryConsumeOne(dst); } 98 99 /** Attempts to consume all elements currently in the queue. 100 101 This function will block if no elements are available. Once at least one 102 element is available, the contents of `dst` will be replaced with all 103 available elements. 104 105 If the `empty` property is or becomes `true` before data becomes 106 avaiable, `dst` will be left untouched and `false` is returned. 107 */ 108 bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) 109 in { assert(dst.empty); } 110 do { return m_impl.consumeAll(dst); } 111 /// ditto 112 bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) shared 113 in { assert(dst.empty); } 114 do { return m_impl.consumeAll(dst); } 115 116 /** Enqueues an element. 117 118 This function may block the the event that the internal buffer is full. 119 */ 120 void put(T item) { m_impl.put(item.move); } 121 /// ditto 122 void put(T item) shared { m_impl.put(item.move); } 123 } 124 125 126 private final class ChannelImpl(T, size_t buffer_size) { 127 import vibe.core.concurrency : isWeaklyIsolated; 128 static assert(isWeaklyIsolated!T, "Channel data type "~T.stringof~" is not safe to pass between threads."); 129 130 private { 131 Mutex m_mutex; 132 TaskCondition m_condition; 133 FixedRingBuffer!(T, buffer_size) m_items; 134 bool m_closed = false; 135 } 136 137 this() 138 shared @trusted { 139 m_mutex = cast(shared)new Mutex; 140 m_condition = cast(shared)new TaskCondition(cast(Mutex)m_mutex); 141 } 142 143 @property bool empty() 144 shared nothrow { 145 { 146 m_mutex.lock_nothrow(); 147 scope (exit) m_mutex.unlock_nothrow(); 148 149 auto thisus = () @trusted { return cast(ChannelImpl)this; } (); 150 151 // ensure that in a single-reader scenario !empty guarantees a 152 // successful call to consumeOne 153 while (!thisus.m_closed && thisus.m_items.empty) 154 thisus.m_condition.wait(); 155 156 return thisus.m_closed && thisus.m_items.empty; 157 } 158 } 159 160 @property size_t bufferFill() 161 shared nothrow { 162 { 163 m_mutex.lock_nothrow(); 164 scope (exit) m_mutex.unlock_nothrow(); 165 166 auto thisus = () @trusted { return cast(ChannelImpl)this; } (); 167 return thisus.m_items.length; 168 } 169 } 170 171 void close() 172 shared nothrow { 173 { 174 m_mutex.lock_nothrow(); 175 scope (exit) m_mutex.unlock_nothrow(); 176 177 auto thisus = () @trusted { return cast(ChannelImpl)this; } (); 178 thisus.m_closed = true; 179 thisus.m_condition.notifyAll(); 180 } 181 } 182 183 bool tryConsumeOne(ref T dst) 184 shared nothrow { 185 auto thisus = () @trusted { return cast(ChannelImpl)this; } (); 186 bool was_full = false; 187 188 { 189 m_mutex.lock_nothrow(); 190 scope (exit) m_mutex.unlock_nothrow(); 191 192 while (thisus.m_items.empty) { 193 if (m_closed) return false; 194 thisus.m_condition.wait(); 195 } 196 was_full = thisus.m_items.full; 197 move(thisus.m_items.front, dst); 198 thisus.m_items.popFront(); 199 } 200 201 if (was_full) thisus.m_condition.notify(); 202 203 return true; 204 } 205 206 T consumeOne() 207 shared { 208 auto thisus = () @trusted { return cast(ChannelImpl)this; } (); 209 T ret; 210 bool was_full = false; 211 212 { 213 m_mutex.lock_nothrow(); 214 scope (exit) m_mutex.unlock_nothrow(); 215 216 while (thisus.m_items.empty) { 217 if (m_closed) throw new Exception("Attempt to consume from an empty channel."); 218 thisus.m_condition.wait(); 219 } 220 was_full = thisus.m_items.full; 221 move(thisus.m_items.front, ret); 222 thisus.m_items.popFront(); 223 } 224 225 if (was_full) thisus.m_condition.notify(); 226 227 return ret.move; 228 } 229 230 bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) 231 shared nothrow { 232 auto thisus = () @trusted { return cast(ChannelImpl)this; } (); 233 bool was_full = false; 234 235 { 236 m_mutex.lock_nothrow(); 237 scope (exit) m_mutex.unlock_nothrow(); 238 239 while (thisus.m_items.empty) { 240 if (m_closed) return false; 241 thisus.m_condition.wait(); 242 } 243 244 was_full = thisus.m_items.full; 245 swap(thisus.m_items, dst); 246 } 247 248 if (was_full) thisus.m_condition.notify(); 249 250 return true; 251 } 252 253 void put(T item) 254 shared { 255 auto thisus = () @trusted { return cast(ChannelImpl)this; } (); 256 bool need_notify = false; 257 258 { 259 m_mutex.lock_nothrow(); 260 scope (exit) m_mutex.unlock_nothrow(); 261 262 enforce(!m_closed, "Sending on closed channel."); 263 while (thisus.m_items.full) 264 thisus.m_condition.wait(); 265 need_notify = thisus.m_items.empty; 266 thisus.m_items.put(item.move); 267 } 268 269 if (need_notify) thisus.m_condition.notify(); 270 } 271 } 272 273 @safe unittest { // test basic operation and non-copyable struct compatiblity 274 static struct S { 275 int i; 276 @disable this(this); 277 } 278 279 auto ch = createChannel!S; 280 ch.put(S(1)); 281 assert(ch.consumeOne().i == 1); 282 ch.put(S(4)); 283 ch.put(S(5)); 284 { 285 FixedRingBuffer!(S, 100) buf; 286 ch.consumeAll(buf); 287 assert(buf.length == 2); 288 assert(buf[0].i == 4); 289 assert(buf[1].i == 5); 290 } 291 ch.put(S(2)); 292 assert(!ch.empty); 293 ch.close(); 294 assert(!ch.empty); 295 S v; 296 assert(ch.tryConsumeOne(v)); 297 assert(v.i == 2); 298 assert(ch.empty); 299 assert(!ch.tryConsumeOne(v)); 300 } 301 302 @safe unittest { // make sure shared(Channel!T) can also be used 303 shared ch = createChannel!int; 304 ch.put(1); 305 assert(!ch.empty); 306 assert(ch.consumeOne == 1); 307 ch.close(); 308 assert(ch.empty); 309 } 310 311 @safe unittest { // ensure nothrow'ness for throwing struct 312 static struct S { 313 this(this) { throw new Exception("meh!"); } 314 } 315 auto ch = createChannel!S; 316 ch.put(S.init); 317 ch.put(S.init); 318 319 S s; 320 FixedRingBuffer!(S, 100, true) sb; 321 322 () nothrow { 323 assert(ch.tryConsumeOne(s)); 324 assert(ch.consumeAll(sb)); 325 assert(sb.length == 1); 326 ch.close(); 327 assert(ch.empty); 328 } (); 329 }