1 /** 2 Generic stream interface used by several stream-like classes. 3 4 This module defines the basic (buffered) stream primitives. For concrete stream types, take a 5 look at the `vibe.stream` package. The `vibe.stream.operations` module contains additional 6 high-level operations on streams, such as reading streams by line or as a whole. 7 8 Note that starting with vibe-core 1.0.0, streams can be of either `struct` or `class` type. 9 Any APIs that take streams as a parameter should use a template type parameter that is tested 10 using the appropriate trait (e.g. `isInputStream`) instead of assuming the specific interface 11 type (e.g. `InputStream`). 12 13 Copyright: © 2012-2016 Sönke Ludwig 14 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 15 Authors: Sönke Ludwig 16 */ 17 module vibe.core.stream; 18 19 import vibe.internal.traits : checkInterfaceConformance, validateInterfaceConformance; 20 import vibe.internal.interfaceproxy; 21 import core.time; 22 import std.algorithm; 23 import std.conv; 24 25 public import eventcore.driver : IOMode; 26 27 28 /** Pipes an InputStream directly into this OutputStream. 29 30 The number of bytes written is either the whole input stream when 31 `nbytes == ulong.max`, or exactly `nbytes` for `nbytes < ulong.max`. If the 32 input stream contains less than `nbytes` of data, an exception is thrown. 33 34 Returns: 35 The actual number of bytes written is returned. If `nbytes` is given 36 and not equal to `ulong.max`, íts value will be returned. 37 */ 38 ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink, ulong nbytes) 39 @blocking @trusted 40 if (isOutputStream!OutputStream && isInputStream!InputStream) 41 { 42 import vibe.internal.allocator : theAllocator, makeArray, dispose; 43 44 scope buffer = cast(ubyte[]) theAllocator.allocate(64*1024); 45 scope (exit) theAllocator.dispose(buffer); 46 47 //logTrace("default write %d bytes, empty=%s", nbytes, stream.empty); 48 ulong ret = 0; 49 if (nbytes == ulong.max) { 50 while (!source.empty) { 51 size_t chunk = min(source.leastSize, buffer.length); 52 assert(chunk > 0, "leastSize returned zero for non-empty stream."); 53 //logTrace("read pipe chunk %d", chunk); 54 source.read(buffer[0 .. chunk]); 55 sink.write(buffer[0 .. chunk]); 56 ret += chunk; 57 } 58 } else { 59 while (nbytes > 0) { 60 size_t chunk = min(nbytes, buffer.length); 61 //logTrace("read pipe chunk %d", chunk); 62 source.read(buffer[0 .. chunk]); 63 sink.write(buffer[0 .. chunk]); 64 nbytes -= chunk; 65 ret += chunk; 66 } 67 } 68 return ret; 69 } 70 /// ditto 71 ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink) 72 @blocking 73 if (isOutputStream!OutputStream && isInputStream!InputStream) 74 { 75 return pipe(source, sink, ulong.max); 76 } 77 78 79 /** Marks a function as blocking. 80 81 Blocking in this case means that it may contain an operation that needs to wait for 82 external events, such as I/O operations, and may result in other tasks in the same 83 threa being executed before it returns. 84 85 Currently this attribute serves only as a documentation aid and is not enforced 86 or used for deducation in any way. 87 */ 88 struct blocking {} 89 90 /**************************************************************************************************/ 91 /* Public functions */ 92 /**************************************************************************************************/ 93 94 /** 95 Returns a `NullOutputStream` instance. 96 97 The instance will only be created on the first request and gets reused for 98 all subsequent calls from the same thread. 99 */ 100 NullOutputStream nullSink() @safe nothrow 101 { 102 static NullOutputStream ret; 103 if (!ret) ret = new NullOutputStream; 104 return ret; 105 } 106 107 /**************************************************************************************************/ 108 /* Public types */ 109 /**************************************************************************************************/ 110 111 /** 112 Interface for all classes implementing readable streams. 113 */ 114 interface InputStream { 115 @safe: 116 117 /** Returns true $(I iff) the end of the input stream has been reached. 118 119 For connection oriented streams, this function will block until either 120 new data arrives or the connection got closed. 121 */ 122 @property bool empty() @blocking; 123 124 /** (Scheduled for deprecation) Returns the maximum number of bytes that are known to remain available for read. 125 126 After `leastSize()` bytes have been read, the stream will either have reached EOS 127 and `empty()` returns `true`, or `leastSize()` returns again a number greater than `0`. 128 */ 129 @property ulong leastSize() @blocking; 130 131 /** (Scheduled for deprecation) Queries if there is data available for immediate, non-blocking read. 132 */ 133 @property bool dataAvailableForRead(); 134 135 /** Returns a temporary reference to the data that is currently buffered. 136 137 The returned slice typically has the size `leastSize()` or `0` if `dataAvailableForRead()` 138 returns `false`. Streams that don't have an internal buffer will always return an empty 139 slice. 140 141 Note that any method invocation on the same stream potentially invalidates the contents of 142 the returned buffer. 143 */ 144 const(ubyte)[] peek(); 145 146 /** Fills the preallocated array 'bytes' with data from the stream. 147 148 This function will continue read from the stream until the buffer has 149 been fully filled. 150 151 Params: 152 dst = The buffer into which to write the data that was read 153 mode = Optional reading mode (defaults to `IOMode.all`). 154 155 Return: 156 Returns the number of bytes read. The `dst` buffer will be filled up 157 to this index. The return value is guaranteed to be `dst.length` for 158 `IOMode.all`. 159 160 Throws: An exception if the operation reads past the end of the stream 161 162 See_Also: `readOnce`, `tryRead` 163 */ 164 size_t read(scope ubyte[] dst, IOMode mode) @blocking; 165 /// ditto 166 final void read(scope ubyte[] dst) @blocking { auto n = read(dst, IOMode.all); assert(n == dst.length); } 167 } 168 169 170 /** 171 Interface for all classes implementing writeable streams. 172 */ 173 interface OutputStream { 174 @safe: 175 176 /** Writes an array of bytes to the stream. 177 */ 178 size_t write(in ubyte[] bytes, IOMode mode) @blocking; 179 /// ditto 180 final void write(in ubyte[] bytes) @blocking { auto n = write(bytes, IOMode.all); assert(n == bytes.length); } 181 /// ditto 182 final void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); } 183 184 /** Flushes the stream and makes sure that all data is being written to the output device. 185 */ 186 void flush() @blocking; 187 188 /** Flushes and finalizes the stream. 189 190 Finalize has to be called on certain types of streams. No writes are possible after a 191 call to finalize(). 192 */ 193 void finalize() @blocking; 194 } 195 196 /** 197 Interface for all classes implementing readable and writable streams. 198 */ 199 interface Stream : InputStream, OutputStream { 200 } 201 202 203 /** 204 Interface for streams based on a connection. 205 206 Connection streams are based on streaming socket connections, pipes and similar end-to-end 207 streams. 208 209 See_also: `vibe.core.net.TCPConnection` 210 */ 211 interface ConnectionStream : Stream { 212 @safe: 213 214 /** Determines The current connection status. 215 216 If `connected` is `false`, writing to the connection will trigger an exception. Reading may 217 still succeed as long as there is data left in the input buffer. Use `InputStream.empty` 218 instead to determine when to stop reading. 219 */ 220 @property bool connected() const; 221 222 /** Actively closes the connection and frees associated resources. 223 224 Note that close must always be called, even if the remote has already closed the connection. 225 Failure to do so will result in resource and memory leakage. 226 227 Closing a connection implies a call to `finalize`, so that it doesn't need to be called 228 explicitly (it will be a no-op in that case). 229 */ 230 void close() @blocking; 231 232 /** Blocks until data becomes available for read. 233 234 The maximum wait time can be customized with the `timeout` parameter. If there is already 235 data availabe for read, or if the connection is closed, the function will return immediately 236 without blocking. 237 238 Params: 239 timeout = Optional timeout, the default value of `Duration.max` waits without a timeout. 240 241 Returns: 242 The function will return `true` if data becomes available before the timeout is reached. 243 If the connection gets closed, or the timeout gets reached, `false` is returned instead. 244 */ 245 bool waitForData(Duration timeout = Duration.max) @blocking; 246 } 247 248 249 /** 250 Interface for all streams supporting random access. 251 */ 252 interface RandomAccessStream : Stream { 253 @safe: 254 255 /// Returns the total size of the file. 256 @property ulong size() const nothrow; 257 258 /// Determines if this stream is readable. 259 @property bool readable() const nothrow; 260 261 /// Determines if this stream is writable. 262 @property bool writable() const nothrow; 263 264 /// Seeks to a specific position in the file if supported by the stream. 265 void seek(ulong offset) @blocking; 266 267 /// Returns the current offset of the file pointer 268 ulong tell() nothrow; 269 } 270 271 272 /** 273 Stream implementation acting as a sink with no function. 274 275 Any data written to the stream will be ignored and discarded. This stream type is useful if 276 the output of a particular stream is not needed but the stream needs to be drained. 277 */ 278 final class NullOutputStream : OutputStream { 279 size_t write(in ubyte[] bytes, IOMode) { return bytes.length; } 280 alias write = OutputStream.write; 281 void flush() {} 282 void finalize() {} 283 } 284 285 286 /// Generic storage for types that implement the `InputStream` interface 287 alias InputStreamProxy = InterfaceProxy!InputStream; 288 /// Generic storage for types that implement the `OutputStream` interface 289 alias OutputStreamProxy = InterfaceProxy!OutputStream; 290 /// Generic storage for types that implement the `Stream` interface 291 alias StreamProxy = InterfaceProxy!Stream; 292 /// Generic storage for types that implement the `ConnectionStream` interface 293 alias ConnectionStreamProxy = InterfaceProxy!ConnectionStream; 294 /// Generic storage for types that implement the `RandomAccessStream` interface 295 alias RandomAccessStreamProxy = InterfaceProxy!RandomAccessStream; 296 297 298 /** Tests if the given aggregate type is a valid input stream. 299 300 See_also: `validateInputStream` 301 */ 302 enum isInputStream(T) = checkInterfaceConformance!(T, InputStream) is null; 303 304 /** Tests if the given aggregate type is a valid output stream. 305 306 See_also: `validateOutputStream` 307 */ 308 enum isOutputStream(T) = checkInterfaceConformance!(T, OutputStream) is null; 309 310 /** Tests if the given aggregate type is a valid bidirectional stream. 311 312 See_also: `validateStream` 313 */ 314 enum isStream(T) = checkInterfaceConformance!(T, Stream) is null; 315 316 /** Tests if the given aggregate type is a valid connection stream. 317 318 See_also: `validateConnectionStream` 319 */ 320 enum isConnectionStream(T) = checkInterfaceConformance!(T, ConnectionStream) is null; 321 322 /** Tests if the given aggregate type is a valid random access stream. 323 324 See_also: `validateRandomAccessStream` 325 */ 326 enum isRandomAccessStream(T) = checkInterfaceConformance!(T, RandomAccessStream) is null; 327 328 /** Verifies that the given type is a valid input stream. 329 330 A valid input stream type must implement all methods of the `InputStream` interface. Inheriting 331 form `InputStream` is not strictly necessary, which also enables struct types to be considered 332 as stream implementations. 333 334 See_Also: `isInputStream` 335 */ 336 mixin template validateInputStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, .InputStream); } 337 338 /** Verifies that the given type is a valid output stream. 339 340 A valid output stream type must implement all methods of the `OutputStream` interface. Inheriting 341 form `OutputStream` is not strictly necessary, which also enables struct types to be considered 342 as stream implementations. 343 344 See_Also: `isOutputStream` 345 */ 346 mixin template validateOutputStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, .OutputStream); } 347 348 /** Verifies that the given type is a valid bidirectional stream. 349 350 A valid stream type must implement all methods of the `Stream` interface. Inheriting 351 form `Stream` is not strictly necessary, which also enables struct types to be considered 352 as stream implementations. 353 354 See_Also: `isStream` 355 */ 356 mixin template validateStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, .Stream); } 357 358 /** Verifies that the given type is a valid connection stream. 359 360 A valid connection stream type must implement all methods of the `ConnectionStream` interface. 361 Inheriting form `ConnectionStream` is not strictly necessary, which also enables struct types 362 to be considered as stream implementations. 363 364 See_Also: `isConnectionStream` 365 */ 366 mixin template validateConnectionStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, .ConnectionStream); } 367 368 /** Verifies that the given type is a valid random access stream. 369 370 A valid random access stream type must implement all methods of the `RandomAccessStream` 371 interface. Inheriting form `RandomAccessStream` is not strictly necessary, which also enables 372 struct types to be considered as stream implementations. 373 374 See_Also: `isRandomAccessStream` 375 */ 376 mixin template validateRandomAccessStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, .RandomAccessStream); }