1 /** 2 Generic stream interface used by several stream-like classes. 3 4 This module defines the basic (buffered) stream primitives. For concrete stream types, take a 5 look at the `vibe.stream` package. The `vibe.stream.operations` module contains additional 6 high-level operations on streams, such as reading streams by line or as a whole. 7 8 Note that starting with vibe-core 1.0.0, streams can be of either `struct` or `class` type. 9 Any APIs that take streams as a parameter should use a template type parameter that is tested 10 using the appropriate trait (e.g. `isInputStream`) instead of assuming the specific interface 11 type (e.g. `InputStream`). 12 13 Copyright: © 2012-2016 Sönke Ludwig 14 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 15 Authors: Sönke Ludwig 16 */ 17 module vibe.core.stream; 18 19 import vibe.internal.traits : checkInterfaceConformance, validateInterfaceConformance; 20 import vibe.internal.interfaceproxy; 21 import core.time; 22 import std.algorithm; 23 import std.conv; 24 25 public import eventcore.driver : IOMode; 26 27 28 /** Pipes an InputStream directly into this OutputStream. 29 30 The number of bytes written is either the whole input stream when 31 `nbytes == ulong.max`, or exactly `nbytes` for `nbytes < ulong.max`. If the 32 input stream contains less than `nbytes` of data, an exception is thrown. 33 34 Returns: 35 The actual number of bytes written is returned. If `nbytes` is given 36 and not equal to `ulong.max`, íts value will be returned. 37 */ 38 ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink, 39 ulong nbytes, PipeMode mode = PipeMode.sequential) @blocking @trusted 40 if (isOutputStream!OutputStream && isInputStream!InputStream) 41 { 42 import vibe.internal.allocator : theAllocator, makeArray, dispose; 43 import vibe.core.core : runTask; 44 import vibe.core.sync : LocalManualEvent, createManualEvent; 45 import vibe.core.task : InterruptException; 46 47 final switch (mode) { 48 case PipeMode.sequential: 49 { 50 scope buffer = cast(ubyte[]) theAllocator.allocate(64*1024); 51 scope (exit) theAllocator.dispose(buffer); 52 53 ulong ret = 0; 54 55 if (nbytes == ulong.max) { 56 while (!source.empty) { 57 size_t chunk = min(source.leastSize, buffer.length); 58 assert(chunk > 0, "leastSize returned zero for non-empty stream."); 59 //logTrace("read pipe chunk %d", chunk); 60 source.read(buffer[0 .. chunk]); 61 sink.write(buffer[0 .. chunk]); 62 ret += chunk; 63 } 64 } else { 65 while (nbytes > 0) { 66 size_t chunk = min(nbytes, buffer.length); 67 //logTrace("read pipe chunk %d", chunk); 68 source.read(buffer[0 .. chunk]); 69 sink.write(buffer[0 .. chunk]); 70 nbytes -= chunk; 71 ret += chunk; 72 } 73 } 74 75 return ret; 76 } 77 case PipeMode.concurrent: 78 { 79 enum bufcount = 4; 80 enum bufsize = 4*1024*1024; 81 82 static struct ConcurrentPipeState { 83 InputStream source; 84 OutputStream sink; 85 ulong nbytes; 86 ubyte[][bufcount] buffers; 87 size_t[bufcount] bufferFill; 88 // buffer index that is being read/written 89 size_t read_idx = 0, write_idx = 0; 90 Exception readex; 91 bool done = false; 92 LocalManualEvent evt; 93 size_t bytesWritten; 94 95 void readLoop() 96 { 97 // gradually increased depending on read speed 98 size_t rbsize = 64*1024; 99 100 while (true) { 101 ulong remaining = nbytes == ulong.max ? source.leastSize : nbytes; 102 if (remaining == 0) break; 103 104 while (read_idx >= write_idx + buffers.length) 105 evt.wait(); 106 107 size_t chunk = min(remaining, rbsize); 108 auto bi = read_idx % bufcount; 109 110 auto tm = MonoTime.currTime; 111 source.read(buffers[bi][0 .. chunk]); 112 if (rbsize < bufsize && MonoTime.currTime - tm < 100.msecs) 113 rbsize *= 2; 114 if (nbytes != ulong.max) nbytes -= chunk; 115 bytesWritten += chunk; 116 bufferFill[bi] = chunk; 117 if (write_idx >= read_idx++) 118 evt.emit(); 119 } 120 } 121 122 void writeLoop() 123 { 124 while (read_idx > write_idx || !done) { 125 while (read_idx <= write_idx) { 126 if (done) return; 127 evt.wait(); 128 } 129 130 auto bi = write_idx % bufcount; 131 sink.write(buffers[bi][0 .. bufferFill[bi]]); 132 133 // notify reader that we just made a buffer available 134 if (write_idx++ <= read_idx - buffers.length) 135 evt.emit(); 136 } 137 } 138 } 139 140 scope buffer = cast(ubyte[]) theAllocator.allocate(bufcount * bufsize); 141 scope (exit) theAllocator.dispose(buffer); 142 143 ConcurrentPipeState state; 144 foreach (i; 0 .. bufcount) 145 state.buffers[i] = buffer[i*($/bufcount) .. (i+1)*($/bufcount)]; 146 swap(state.source, source); 147 swap(state.sink, sink); 148 state.nbytes = nbytes; 149 state.evt = createManualEvent(); 150 151 auto reader = runTask(function(ConcurrentPipeState* state) nothrow { 152 try state.readLoop(); 153 catch (InterruptException e) {} 154 catch (Exception e) state.readex = e; 155 state.done = true; 156 state.evt.emit(); 157 }, &state); 158 159 scope (failure) { 160 reader.interrupt(); 161 reader.joinUninterruptible(); 162 } 163 164 state.writeLoop(); 165 166 reader.join(); 167 168 if (state.readex) throw state.readex; 169 170 return state.bytesWritten; 171 } 172 } 173 } 174 /// ditto 175 ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink, 176 PipeMode mode = PipeMode.sequential) @blocking 177 if (isOutputStream!OutputStream && isInputStream!InputStream) 178 { 179 return pipe(source, sink, ulong.max, mode); 180 } 181 182 enum PipeMode { 183 /** Sequentially reads into a buffer and writes it out to the sink. 184 185 This mode reads and writes to the same buffer in a ping-pong fashion. 186 The memory overhead is low, but if the source does not support 187 read-ahead buffering, or the sink does not have an internal buffer that 188 is drained asynchronously, the total throghput will be reduced. 189 */ 190 sequential, 191 192 /** Uses a task to concurrently read and write. 193 194 This mode maximizes throughput at the expense of setting up a task and 195 associated sycnronization. 196 */ 197 concurrent 198 } 199 200 201 /** Marks a function as blocking. 202 203 Blocking in this case means that it may contain an operation that needs to wait for 204 external events, such as I/O operations, and may result in other tasks in the same 205 threa being executed before it returns. 206 207 Currently this attribute serves only as a documentation aid and is not enforced 208 or used for deducation in any way. 209 */ 210 struct blocking {} 211 212 /**************************************************************************************************/ 213 /* Public functions */ 214 /**************************************************************************************************/ 215 216 /** 217 Returns a `NullOutputStream` instance. 218 219 The instance will only be created on the first request and gets reused for 220 all subsequent calls from the same thread. 221 */ 222 NullOutputStream nullSink() @safe nothrow 223 { 224 static NullOutputStream ret; 225 if (!ret) ret = new NullOutputStream; 226 return ret; 227 } 228 229 /**************************************************************************************************/ 230 /* Public types */ 231 /**************************************************************************************************/ 232 233 /** 234 Interface for all classes implementing readable streams. 235 */ 236 interface InputStream { 237 @safe: 238 239 /** Returns true $(I iff) the end of the input stream has been reached. 240 241 For connection oriented streams, this function will block until either 242 new data arrives or the connection got closed. 243 */ 244 @property bool empty() @blocking; 245 246 /** (Scheduled for deprecation) Returns the maximum number of bytes that are known to remain available for read. 247 248 After `leastSize()` bytes have been read, the stream will either have reached EOS 249 and `empty()` returns `true`, or `leastSize()` returns again a number greater than `0`. 250 */ 251 @property ulong leastSize() @blocking; 252 253 /** (Scheduled for deprecation) Queries if there is data available for immediate, non-blocking read. 254 */ 255 @property bool dataAvailableForRead(); 256 257 /** Returns a temporary reference to the data that is currently buffered. 258 259 The returned slice typically has the size `leastSize()` or `0` if `dataAvailableForRead()` 260 returns `false`. Streams that don't have an internal buffer will always return an empty 261 slice. 262 263 Note that any method invocation on the same stream potentially invalidates the contents of 264 the returned buffer. 265 */ 266 const(ubyte)[] peek(); 267 268 /** Fills the preallocated array 'bytes' with data from the stream. 269 270 This function will continue read from the stream until the buffer has 271 been fully filled. 272 273 Params: 274 dst = The buffer into which to write the data that was read 275 mode = Optional reading mode (defaults to `IOMode.all`). 276 277 Return: 278 Returns the number of bytes read. The `dst` buffer will be filled up 279 to this index. The return value is guaranteed to be `dst.length` for 280 `IOMode.all`. 281 282 Throws: An exception if the operation reads past the end of the stream 283 284 See_Also: `readOnce`, `tryRead` 285 */ 286 size_t read(scope ubyte[] dst, IOMode mode) @blocking; 287 /// ditto 288 final void read(scope ubyte[] dst) @blocking { auto n = read(dst, IOMode.all); assert(n == dst.length); } 289 } 290 291 292 /** 293 Interface for all classes implementing writeable streams. 294 */ 295 interface OutputStream { 296 @safe: 297 298 /** Writes an array of bytes to the stream. 299 */ 300 size_t write(in ubyte[] bytes, IOMode mode) @blocking; 301 /// ditto 302 final void write(in ubyte[] bytes) @blocking { auto n = write(bytes, IOMode.all); assert(n == bytes.length); } 303 /// ditto 304 final void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); } 305 306 /** Flushes the stream and makes sure that all data is being written to the output device. 307 */ 308 void flush() @blocking; 309 310 /** Flushes and finalizes the stream. 311 312 Finalize has to be called on certain types of streams. No writes are possible after a 313 call to finalize(). 314 */ 315 void finalize() @blocking; 316 } 317 318 /** 319 Interface for all classes implementing readable and writable streams. 320 */ 321 interface Stream : InputStream, OutputStream { 322 } 323 324 325 /** 326 Interface for streams based on a connection. 327 328 Connection streams are based on streaming socket connections, pipes and similar end-to-end 329 streams. 330 331 See_also: `vibe.core.net.TCPConnection` 332 */ 333 interface ConnectionStream : Stream { 334 @safe: 335 336 /** Determines The current connection status. 337 338 If `connected` is `false`, writing to the connection will trigger an exception. Reading may 339 still succeed as long as there is data left in the input buffer. Use `InputStream.empty` 340 instead to determine when to stop reading. 341 */ 342 @property bool connected() const; 343 344 /** Actively closes the connection and frees associated resources. 345 346 Note that close must always be called, even if the remote has already closed the connection. 347 Failure to do so will result in resource and memory leakage. 348 349 Closing a connection implies a call to `finalize`, so that it doesn't need to be called 350 explicitly (it will be a no-op in that case). 351 */ 352 void close() @blocking; 353 354 /** Blocks until data becomes available for read. 355 356 The maximum wait time can be customized with the `timeout` parameter. If there is already 357 data availabe for read, or if the connection is closed, the function will return immediately 358 without blocking. 359 360 Params: 361 timeout = Optional timeout, the default value of `Duration.max` waits without a timeout. 362 363 Returns: 364 The function will return `true` if data becomes available before the timeout is reached. 365 If the connection gets closed, or the timeout gets reached, `false` is returned instead. 366 */ 367 bool waitForData(Duration timeout = Duration.max) @blocking; 368 } 369 370 371 /** 372 Interface for all streams supporting random access. 373 */ 374 interface RandomAccessStream : Stream { 375 @safe: 376 377 /// Returns the total size of the file. 378 @property ulong size() const nothrow; 379 380 /// Determines if this stream is readable. 381 @property bool readable() const nothrow; 382 383 /// Determines if this stream is writable. 384 @property bool writable() const nothrow; 385 386 /// Seeks to a specific position in the file if supported by the stream. 387 void seek(ulong offset) @blocking; 388 389 /// Returns the current offset of the file pointer 390 ulong tell() nothrow; 391 } 392 393 394 /** Extended form of a `RandomAccessStream` that supports truncation/extension. 395 */ 396 interface TruncatableStream : RandomAccessStream { 397 @safe: 398 399 // Note that truncate should be part of RandomAccessStream 400 /// Truncates or extends the size of the stream 401 void truncate(ulong size) @blocking; 402 } 403 404 405 /** Random access stream with support for explicit closing. 406 */ 407 interface ClosableRandomAccessStream : TruncatableStream { 408 @safe: 409 410 /// Determines if the file stream is still open and accessible 411 @property bool isOpen() const nothrow; 412 413 /** Actively closes the stream and frees associated resources. 414 415 Closing a stream implies a call to `finalize`, so that it doesn't need 416 to be called explicitly. 417 */ 418 void close() @blocking; 419 } 420 421 422 /** 423 Stream implementation acting as a sink with no function. 424 425 Any data written to the stream will be ignored and discarded. This stream type is useful if 426 the output of a particular stream is not needed but the stream needs to be drained. 427 */ 428 final class NullOutputStream : OutputStream { 429 size_t write(in ubyte[] bytes, IOMode) { return bytes.length; } 430 alias write = OutputStream.write; 431 void flush() {} 432 void finalize() {} 433 } 434 435 436 /// Generic storage for types that implement the `InputStream` interface 437 alias InputStreamProxy = InterfaceProxy!InputStream; 438 /// Generic storage for types that implement the `OutputStream` interface 439 alias OutputStreamProxy = InterfaceProxy!OutputStream; 440 /// Generic storage for types that implement the `Stream` interface 441 alias StreamProxy = InterfaceProxy!Stream; 442 /// Generic storage for types that implement the `ConnectionStream` interface 443 alias ConnectionStreamProxy = InterfaceProxy!ConnectionStream; 444 /// Generic storage for types that implement the `RandomAccessStream` interface 445 alias RandomAccessStreamProxy = InterfaceProxy!RandomAccessStream; 446 /// Generic storage for types that implement the `RandomAccessStream` interface 447 alias TruncatableStreamProxy = InterfaceProxy!TruncatableStream; 448 /// Generic storage for types that implement the `RandomAccessStream` interface 449 alias ClosableRandomAccessStreamProxy = InterfaceProxy!ClosableRandomAccessStream; 450 451 452 /** Tests if the given aggregate type is a valid input stream. 453 454 See_also: `validateInputStream` 455 */ 456 enum isInputStream(T) = checkInterfaceConformance!(T, InputStream) is null; 457 458 /** Tests if the given aggregate type is a valid output stream. 459 460 See_also: `validateOutputStream` 461 */ 462 enum isOutputStream(T) = checkInterfaceConformance!(T, OutputStream) is null; 463 464 /** Tests if the given aggregate type is a valid bidirectional stream. 465 466 See_also: `validateStream` 467 */ 468 enum isStream(T) = checkInterfaceConformance!(T, Stream) is null; 469 470 /** Tests if the given aggregate type is a valid connection stream. 471 472 See_also: `validateConnectionStream` 473 */ 474 enum isConnectionStream(T) = checkInterfaceConformance!(T, ConnectionStream) is null; 475 476 /** Tests if the given aggregate type is a valid random access stream. 477 478 See_also: `validateRandomAccessStream` 479 */ 480 enum isRandomAccessStream(T) = checkInterfaceConformance!(T, RandomAccessStream) is null; 481 482 /** Tests if the given aggregate type is a valid random access stream. 483 484 See_also: `validateRandomAccessStream` 485 */ 486 enum isTruncatableStream(T) = checkInterfaceConformance!(T, TruncatableStream) is null; 487 488 /** Tests if the given aggregate type is a valid random access stream. 489 490 See_also: `validateRandomAccessStream` 491 */ 492 enum isClosableRandomAccessStream(T) = checkInterfaceConformance!(T, ClosableRandomAccessStream) is null; 493 494 /** Verifies that the given type is a valid input stream. 495 496 A valid input stream type must implement all methods of the `InputStream` interface. Inheriting 497 form `InputStream` is not strictly necessary, which also enables struct types to be considered 498 as stream implementations. 499 500 See_Also: `isInputStream` 501 */ 502 mixin template validateInputStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, .InputStream); } 503 504 /** Verifies that the given type is a valid output stream. 505 506 A valid output stream type must implement all methods of the `OutputStream` interface. Inheriting 507 form `OutputStream` is not strictly necessary, which also enables struct types to be considered 508 as stream implementations. 509 510 See_Also: `isOutputStream` 511 */ 512 mixin template validateOutputStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, .OutputStream); } 513 514 /** Verifies that the given type is a valid bidirectional stream. 515 516 A valid stream type must implement all methods of the `Stream` interface. Inheriting 517 form `Stream` is not strictly necessary, which also enables struct types to be considered 518 as stream implementations. 519 520 See_Also: `isStream` 521 */ 522 mixin template validateStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, .Stream); } 523 524 /** Verifies that the given type is a valid connection stream. 525 526 A valid connection stream type must implement all methods of the `ConnectionStream` interface. 527 Inheriting form `ConnectionStream` is not strictly necessary, which also enables struct types 528 to be considered as stream implementations. 529 530 See_Also: `isConnectionStream` 531 */ 532 mixin template validateConnectionStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, .ConnectionStream); } 533 534 /** Verifies that the given type is a valid random access stream. 535 536 A valid random access stream type must implement all methods of the `RandomAccessStream` 537 interface. Inheriting form `RandomAccessStream` is not strictly necessary, which also enables 538 struct types to be considered as stream implementations. 539 540 See_Also: `isRandomAccessStream` 541 */ 542 mixin template validateRandomAccessStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, .RandomAccessStream); } 543 544 /** Verifies that the given type is a valid truncatable random access stream. 545 546 A valid random access stream type must implement all methods of the `TruncatableStream` 547 interface. Inheriting form `TruncatableStream` is not strictly necessary, which also enables 548 struct types to be considered as stream implementations. 549 550 See_Also: `isTruncatableStream` 551 */ 552 mixin template validateTruncatableStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, .TruncatableStream); } 553 554 /** Verifies that the given type is a valid closable random access stream. 555 556 A valid random access stream type must implement all methods of the `ClosableRandomAccessStream` 557 interface. Inheriting form `ClosableRandomAccessStream` is not strictly necessary, which also enables 558 struct types to be considered as stream implementations. 559 560 See_Also: `isClosableRandomAccessStream` 561 */ 562 mixin template validateClosableRandomAccessStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, .ClosableRandomAccessStream); }