1 /** Implements a thread-safe, typed producer-consumer queue. 2 3 Copyright: © 2017-2019 Sönke Ludwig 4 Authors: Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 */ 7 module vibe.core.channel; 8 9 import vibe.core.sync : TaskCondition; 10 import vibe.internal.array : FixedRingBuffer; 11 12 import std.algorithm.mutation : move, swap; 13 import std.exception : enforce; 14 import core.sync.mutex; 15 16 // multiple producers allowed, multiple consumers allowed - Q: should this be restricted to allow higher performance? maybe configurable? 17 // currently always buffered - TODO: implement blocking non-buffered mode 18 // TODO: implement a multi-channel wait, e.g. 19 // TaggedAlgebraic!(...) consumeAny(ch1, ch2, ch3); - requires a waitOnMultipleConditions function 20 21 // NOTE: not using synchronized (m_mutex) because it is not nothrow 22 23 24 /** Creates a new channel suitable for cross-task and cross-thread communication. 25 */ 26 Channel!(T, buffer_size) createChannel(T, size_t buffer_size = 100)(ChannelConfig config = ChannelConfig.init) 27 { 28 Channel!(T, buffer_size) ret; 29 ret.m_impl = new shared ChannelImpl!(T, buffer_size)(config); 30 return ret; 31 } 32 33 struct ChannelConfig { 34 ChannelPriority priority = ChannelPriority.latency; 35 } 36 37 enum ChannelPriority { 38 /** Minimize latency 39 40 Triggers readers immediately once data is available and triggers writers 41 as soon as the queue has space. 42 */ 43 latency, 44 45 /** Minimize overhead. 46 47 Triggers readers once the queue is full and triggers writers once the 48 queue is empty in order to maximize batch sizes and minimize 49 synchronization overhead. 50 51 Note that in this mode it is necessary to close the channel to ensure 52 that the buffered data is fully processed. 53 */ 54 overhead 55 } 56 57 /** Thread-safe typed data channel implementation. 58 59 The implementation supports multiple-reader-multiple-writer operation across 60 multiple tasks in multiple threads. 61 */ 62 struct Channel(T, size_t buffer_size = 100) { 63 enum bufferSize = buffer_size; 64 65 private shared ChannelImpl!(T, buffer_size) m_impl; 66 67 /** Determines whether there is more data to read in a single-reader scenario. 68 69 This property is empty $(I iff) no more elements are in the internal 70 buffer and `close()` has been called. Once the channel is empty, 71 subsequent calls to `consumeOne` or `consumeAll` will throw an 72 exception. 73 74 Note that relying on the return value to determine whether another 75 element can be read is only safe in a single-reader scenario. It is 76 generally recommended to use `tryConsumeOne` instead. 77 */ 78 @property bool empty() { return m_impl.empty; } 79 /// ditto 80 @property bool empty() shared { return m_impl.empty; } 81 82 /** Returns the current count of items in the buffer. 83 84 This function is useful for diagnostic purposes. 85 */ 86 @property size_t bufferFill() { return m_impl.bufferFill; } 87 /// ditto 88 @property size_t bufferFill() shared { return m_impl.bufferFill; } 89 90 /** Closes the channel. 91 92 A closed channel does not accept any new items enqueued using `put` and 93 causes `empty` to return `fals` as soon as all preceeding elements have 94 been consumed. 95 */ 96 void close() { m_impl.close(); } 97 /// ditto 98 void close() shared { m_impl.close(); } 99 100 /** Consumes a single element off the queue. 101 102 This function will block if no elements are available. If the `empty` 103 property is `true`, an exception will be thrown. 104 105 Note that it is recommended to use `tryConsumeOne` instead of a 106 combination of `empty` and `consumeOne` due to being more efficient and 107 also being reliable in a multiple-reader scenario. 108 */ 109 T consumeOne() { return m_impl.consumeOne(); } 110 /// ditto 111 T consumeOne() shared { return m_impl.consumeOne(); } 112 113 /** Attempts to consume a single element. 114 115 If no more elements are available and the channel has been closed, 116 `false` is returned and `dst` is left untouched. 117 */ 118 bool tryConsumeOne(ref T dst) { return m_impl.tryConsumeOne(dst); } 119 /// ditto 120 bool tryConsumeOne(ref T dst) shared { return m_impl.tryConsumeOne(dst); } 121 122 /** Attempts to consume all elements currently in the queue. 123 124 This function will block if no elements are available. Once at least one 125 element is available, the contents of `dst` will be replaced with all 126 available elements. 127 128 If the `empty` property is or becomes `true` before data becomes 129 avaiable, `dst` will be left untouched and `false` is returned. 130 */ 131 bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) 132 in { assert(dst.empty); } 133 do { return m_impl.consumeAll(dst); } 134 /// ditto 135 bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) shared 136 in { assert(dst.empty); } 137 do { return m_impl.consumeAll(dst); } 138 139 /** Enqueues an element. 140 141 This function may block the the event that the internal buffer is full. 142 */ 143 void put(T item) { m_impl.put(item.move); } 144 /// ditto 145 void put(T item) shared { m_impl.put(item.move); } 146 } 147 148 149 private final class ChannelImpl(T, size_t buffer_size) { 150 import vibe.core.concurrency : isWeaklyIsolated; 151 static assert(isWeaklyIsolated!T, "Channel data type "~T.stringof~" is not safe to pass between threads."); 152 153 private { 154 Mutex m_mutex; 155 TaskCondition m_condition; 156 FixedRingBuffer!(T, buffer_size) m_items; 157 bool m_closed = false; 158 ChannelConfig m_config; 159 } 160 161 this(ChannelConfig config) 162 shared @trusted nothrow { 163 m_mutex = cast(shared)new Mutex; 164 m_condition = cast(shared)new TaskCondition(cast(Mutex)m_mutex); 165 m_config = config; 166 } 167 168 @property bool empty() 169 shared nothrow { 170 { 171 m_mutex.lock_nothrow(); 172 scope (exit) m_mutex.unlock_nothrow(); 173 174 auto thisus = () @trusted { return cast(ChannelImpl)this; } (); 175 176 // ensure that in a single-reader scenario !empty guarantees a 177 // successful call to consumeOne 178 while (!thisus.m_closed && thisus.m_items.empty) 179 thisus.m_condition.wait(); 180 181 return thisus.m_closed && thisus.m_items.empty; 182 } 183 } 184 185 @property size_t bufferFill() 186 shared nothrow { 187 { 188 m_mutex.lock_nothrow(); 189 scope (exit) m_mutex.unlock_nothrow(); 190 191 auto thisus = () @trusted { return cast(ChannelImpl)this; } (); 192 return thisus.m_items.length; 193 } 194 } 195 196 void close() 197 shared nothrow { 198 { 199 m_mutex.lock_nothrow(); 200 scope (exit) m_mutex.unlock_nothrow(); 201 202 auto thisus = () @trusted { return cast(ChannelImpl)this; } (); 203 thisus.m_closed = true; 204 thisus.m_condition.notifyAll(); 205 } 206 } 207 208 bool tryConsumeOne(ref T dst) 209 shared nothrow { 210 auto thisus = () @trusted { return cast(ChannelImpl)this; } (); 211 bool need_notify = false; 212 213 { 214 m_mutex.lock_nothrow(); 215 scope (exit) m_mutex.unlock_nothrow(); 216 217 while (thisus.m_items.empty) { 218 if (m_closed) return false; 219 thisus.m_condition.wait(); 220 } 221 222 if (m_config.priority == ChannelPriority.latency) 223 need_notify = thisus.m_items.full; 224 225 move(thisus.m_items.front, dst); 226 thisus.m_items.popFront(); 227 228 if (m_config.priority == ChannelPriority.overhead) 229 need_notify = thisus.m_items.empty; 230 } 231 232 if (need_notify) { 233 if (m_config.priority == ChannelPriority.overhead) 234 thisus.m_condition.notifyAll(); 235 else 236 thisus.m_condition.notify(); 237 } 238 239 return true; 240 } 241 242 T consumeOne() 243 shared { 244 T ret; 245 if (!tryConsumeOne(ret)) 246 throw new Exception("Attempt to consume from an empty channel."); 247 return ret; 248 } 249 250 bool consumeAll(ref FixedRingBuffer!(T, buffer_size) dst) 251 shared nothrow { 252 auto thisus = () @trusted { return cast(ChannelImpl)this; } (); 253 bool need_notify = false; 254 255 { 256 m_mutex.lock_nothrow(); 257 scope (exit) m_mutex.unlock_nothrow(); 258 259 while (thisus.m_items.empty) { 260 if (m_closed) return false; 261 thisus.m_condition.wait(); 262 } 263 264 if (m_config.priority == ChannelPriority.latency) 265 need_notify = thisus.m_items.full; 266 267 swap(thisus.m_items, dst); 268 269 if (m_config.priority == ChannelPriority.overhead) 270 need_notify = true; 271 } 272 273 if (need_notify) { 274 if (m_config.priority == ChannelPriority.overhead) 275 thisus.m_condition.notifyAll(); 276 else thisus.m_condition.notify(); 277 } 278 279 return true; 280 } 281 282 void put(T item) 283 shared { 284 auto thisus = () @trusted { return cast(ChannelImpl)this; } (); 285 bool need_notify = false; 286 287 { 288 m_mutex.lock_nothrow(); 289 scope (exit) m_mutex.unlock_nothrow(); 290 291 enforce(!m_closed, "Sending on closed channel."); 292 while (thisus.m_items.full) 293 thisus.m_condition.wait(); 294 if (m_config.priority == ChannelPriority.latency) 295 need_notify = thisus.m_items.empty; 296 thisus.m_items.put(item.move); 297 if (m_config.priority == ChannelPriority.overhead) 298 need_notify = thisus.m_items.full; 299 } 300 301 if (need_notify) { 302 if (m_config.priority == ChannelPriority.overhead) 303 thisus.m_condition.notifyAll(); 304 else thisus.m_condition.notify(); 305 } 306 } 307 } 308 309 @safe unittest { // test basic operation and non-copyable struct compatiblity 310 static struct S { 311 int i; 312 @disable this(this); 313 } 314 315 auto ch = createChannel!S; 316 ch.put(S(1)); 317 assert(ch.consumeOne().i == 1); 318 ch.put(S(4)); 319 ch.put(S(5)); 320 { 321 FixedRingBuffer!(S, 100) buf; 322 ch.consumeAll(buf); 323 assert(buf.length == 2); 324 assert(buf[0].i == 4); 325 assert(buf[1].i == 5); 326 } 327 ch.put(S(2)); 328 assert(!ch.empty); 329 ch.close(); 330 assert(!ch.empty); 331 S v; 332 assert(ch.tryConsumeOne(v)); 333 assert(v.i == 2); 334 assert(ch.empty); 335 assert(!ch.tryConsumeOne(v)); 336 } 337 338 @safe unittest { // make sure shared(Channel!T) can also be used 339 shared ch = createChannel!int; 340 ch.put(1); 341 assert(!ch.empty); 342 assert(ch.consumeOne == 1); 343 ch.close(); 344 assert(ch.empty); 345 } 346 347 @safe unittest { // ensure nothrow'ness for throwing struct 348 static struct S { 349 this(this) { throw new Exception("meh!"); } 350 } 351 auto ch = createChannel!S; 352 ch.put(S.init); 353 ch.put(S.init); 354 355 S s; 356 FixedRingBuffer!(S, 100, true) sb; 357 358 () nothrow { 359 assert(ch.tryConsumeOne(s)); 360 assert(ch.consumeAll(sb)); 361 assert(sb.length == 1); 362 ch.close(); 363 assert(ch.empty); 364 } (); 365 } 366 367 unittest { 368 import std.traits : EnumMembers; 369 import vibe.core.core : runTask; 370 371 void test(ChannelPriority prio) 372 { 373 auto ch = createChannel!int(ChannelConfig(prio)); 374 runTask(() nothrow { 375 try { 376 ch.put(1); 377 ch.put(2); 378 ch.put(3); 379 } catch (Exception e) assert(false, e.msg); 380 ch.close(); 381 }); 382 383 int i; 384 assert(ch.tryConsumeOne(i) && i == 1); 385 assert(ch.tryConsumeOne(i) && i == 2); 386 assert(ch.tryConsumeOne(i) && i == 3); 387 assert(!ch.tryConsumeOne(i)); 388 } 389 390 foreach (m; EnumMembers!ChannelPriority) 391 test(m); 392 }