1 /**
2 	Generic stream interface used by several stream-like classes.
3 
4 	This module defines the basic (buffered) stream primitives. For concrete stream types, take a
5 	look at the `vibe.stream` package. The `vibe.stream.operations` module contains additional
6 	high-level operations on streams, such as reading streams by line or as a whole.
7 
8 	Note that starting with vibe-core 1.0.0, streams can be of either `struct`  or `class` type.
9 	Any APIs that take streams as a parameter should use a template type parameter that is tested
10 	using the appropriate trait (e.g. `isInputStream`) instead of assuming the specific interface
11 	type (e.g. `InputStream`).
12 
13 	Copyright: © 2012-2016 Sönke Ludwig
14 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
15 	Authors: Sönke Ludwig
16 */
17 module vibe.core.stream;
18 
19 import vibe.internal.traits : checkInterfaceConformance, validateInterfaceConformance;
20 import vibe.internal.interfaceproxy;
21 import core.time;
22 import std.algorithm;
23 import std.conv;
24 
25 public import eventcore.driver : IOMode;
26 
27 
28 /** Pipes an InputStream directly into this OutputStream.
29 
30 	The number of bytes written is either the whole input stream when
31 	`nbytes == ulong.max`, or exactly `nbytes` for `nbytes < ulong.max`. If the
32 	input stream contains less than `nbytes` of data, an exception is thrown.
33 
34 	Returns:
35 		The actual number of bytes written is returned. If `nbytes` is given
36 		and not equal to `ulong.max`, íts value will be returned.
37 */
38 ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink, ulong nbytes)
39 	@blocking @trusted
40 	if (isOutputStream!OutputStream && isInputStream!InputStream)
41 {
42 	import vibe.internal.allocator : theAllocator, makeArray, dispose;
43 
44 	scope buffer = cast(ubyte[]) theAllocator.allocate(64*1024);
45 	scope (exit) theAllocator.dispose(buffer);
46 
47 	//logTrace("default write %d bytes, empty=%s", nbytes, stream.empty);
48 	ulong ret = 0;
49 	if (nbytes == ulong.max) {
50 		while (!source.empty) {
51 			size_t chunk = min(source.leastSize, buffer.length);
52 			assert(chunk > 0, "leastSize returned zero for non-empty stream.");
53 			//logTrace("read pipe chunk %d", chunk);
54 			source.read(buffer[0 .. chunk]);
55 			sink.write(buffer[0 .. chunk]);
56 			ret += chunk;
57 		}
58 	} else {
59 		while (nbytes > 0) {
60 			size_t chunk = min(nbytes, buffer.length);
61 			//logTrace("read pipe chunk %d", chunk);
62 			source.read(buffer[0 .. chunk]);
63 			sink.write(buffer[0 .. chunk]);
64 			nbytes -= chunk;
65 			ret += chunk;
66 		}
67 	}
68 	return ret;
69 }
70 /// ditto
71 ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink)
72 	@blocking
73 	if (isOutputStream!OutputStream && isInputStream!InputStream)
74 {
75 	return pipe(source, sink, ulong.max);
76 }
77 
78 
79 /** Marks a function as blocking.
80 
81 	Blocking in this case means that it may contain an operation that needs to wait for
82 	external events, such as I/O operations, and may result in other tasks in the same
83 	threa being executed before it returns.
84 
85 	Currently this attribute serves only as a documentation aid and is not enforced
86 	or used for deducation in any way.
87 */
88 struct blocking {}
89 
90 /**************************************************************************************************/
91 /* Public functions                                                                               */
92 /**************************************************************************************************/
93 
94 /**
95 	Returns a `NullOutputStream` instance.
96 
97 	The instance will only be created on the first request and gets reused for
98 	all subsequent calls from the same thread.
99 */
100 NullOutputStream nullSink() @safe nothrow
101 {
102 	static NullOutputStream ret;
103 	if (!ret) ret = new NullOutputStream;
104 	return ret;
105 }
106 
107 /**************************************************************************************************/
108 /* Public types                                                                                   */
109 /**************************************************************************************************/
110 
111 /**
112 	Interface for all classes implementing readable streams.
113 */
114 interface InputStream {
115 	@safe:
116 
117 	/** Returns true $(I iff) the end of the input stream has been reached.
118 
119 		For connection oriented streams, this function will block until either
120 		new data arrives or the connection got closed.
121 	*/
122 	@property bool empty() @blocking;
123 
124 	/**	(Scheduled for deprecation) Returns the maximum number of bytes that are known to remain available for read.
125 
126 		After `leastSize()` bytes have been read, the stream will either have reached EOS
127 		and `empty()` returns `true`, or `leastSize()` returns again a number greater than `0`.
128 	*/
129 	@property ulong leastSize() @blocking;
130 
131 	/** (Scheduled for deprecation) Queries if there is data available for immediate, non-blocking read.
132 	*/
133 	@property bool dataAvailableForRead();
134 
135 	/** Returns a temporary reference to the data that is currently buffered.
136 
137 		The returned slice typically has the size `leastSize()` or `0` if `dataAvailableForRead()`
138 		returns `false`. Streams that don't have an internal buffer will always return an empty
139 		slice.
140 
141 		Note that any method invocation on the same stream potentially invalidates the contents of
142 		the returned buffer.
143 	*/
144 	const(ubyte)[] peek();
145 
146 	/**	Fills the preallocated array 'bytes' with data from the stream.
147 
148 		This function will continue read from the stream until the buffer has
149 		been fully filled.
150 
151 		Params:
152 			dst = The buffer into which to write the data that was read
153 			mode = Optional reading mode (defaults to `IOMode.all`).
154 
155 		Return:
156 			Returns the number of bytes read. The `dst` buffer will be filled up
157 			to this index. The return value is guaranteed to be `dst.length` for
158 			`IOMode.all`.
159 
160 		Throws: An exception if the operation reads past the end of the stream
161 
162 		See_Also: `readOnce`, `tryRead`
163 	*/
164 	size_t read(scope ubyte[] dst, IOMode mode) @blocking;
165 	/// ditto
166 	final void read(scope ubyte[] dst) @blocking { auto n = read(dst, IOMode.all); assert(n == dst.length); }
167 }
168 
169 
170 /**
171 	Interface for all classes implementing writeable streams.
172 */
173 interface OutputStream {
174 	@safe:
175 
176 	/** Writes an array of bytes to the stream.
177 	*/
178 	size_t write(in ubyte[] bytes, IOMode mode) @blocking;
179 	/// ditto
180 	final void write(in ubyte[] bytes) @blocking { auto n = write(bytes, IOMode.all); assert(n == bytes.length); }
181 	/// ditto
182 	final void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); }
183 
184 	/** Flushes the stream and makes sure that all data is being written to the output device.
185 	*/
186 	void flush() @blocking;
187 
188 	/** Flushes and finalizes the stream.
189 
190 		Finalize has to be called on certain types of streams. No writes are possible after a
191 		call to finalize().
192 	*/
193 	void finalize() @blocking;
194 }
195 
196 /**
197 	Interface for all classes implementing readable and writable streams.
198 */
199 interface Stream : InputStream, OutputStream {
200 }
201 
202 
203 /**
204 	Interface for streams based on a connection.
205 
206 	Connection streams are based on streaming socket connections, pipes and similar end-to-end
207 	streams.
208 
209 	See_also: `vibe.core.net.TCPConnection`
210 */
211 interface ConnectionStream : Stream {
212 	@safe:
213 
214 	/** Determines The current connection status.
215 
216 		If `connected` is `false`, writing to the connection will trigger an exception. Reading may
217 		still succeed as long as there is data left in the input buffer. Use `InputStream.empty`
218 		instead to determine when to stop reading.
219 	*/
220 	@property bool connected() const;
221 
222 	/** Actively closes the connection and frees associated resources.
223 
224 		Note that close must always be called, even if the remote has already closed the connection.
225 		Failure to do so will result in resource and memory leakage.
226 
227 		Closing a connection implies a call to `finalize`, so that it doesn't need to be called
228 		explicitly (it will be a no-op in that case).
229 	*/
230 	void close() @blocking;
231 
232 	/** Blocks until data becomes available for read.
233 
234 		The maximum wait time can be customized with the `timeout` parameter. If there is already
235 		data availabe for read, or if the connection is closed, the function will return immediately
236 		without blocking.
237 
238 		Params:
239 			timeout = Optional timeout, the default value of `Duration.max` waits without a timeout.
240 
241 		Returns:
242 			The function will return `true` if data becomes available before the timeout is reached.
243 			If the connection gets closed, or the timeout gets reached, `false` is returned instead.
244 	*/
245 	bool waitForData(Duration timeout = Duration.max) @blocking;
246 }
247 
248 
249 /**
250 	Interface for all streams supporting random access.
251 */
252 interface RandomAccessStream : Stream {
253 	@safe:
254 
255 	/// Returns the total size of the file.
256 	@property ulong size() const nothrow;
257 
258 	/// Determines if this stream is readable.
259 	@property bool readable() const nothrow;
260 
261 	/// Determines if this stream is writable.
262 	@property bool writable() const nothrow;
263 
264 	/// Seeks to a specific position in the file if supported by the stream.
265 	void seek(ulong offset) @blocking;
266 
267 	/// Returns the current offset of the file pointer
268 	ulong tell() nothrow;
269 }
270 
271 
272 /**
273 	Stream implementation acting as a sink with no function.
274 
275 	Any data written to the stream will be ignored and discarded. This stream type is useful if
276 	the output of a particular stream is not needed but the stream needs to be drained.
277 */
278 final class NullOutputStream : OutputStream {
279 	size_t write(in ubyte[] bytes, IOMode) { return bytes.length; }
280 	alias write = OutputStream.write;
281 	void flush() {}
282 	void finalize() {}
283 }
284 
285 
286 /// Generic storage for types that implement the `InputStream` interface
287 alias InputStreamProxy = InterfaceProxy!InputStream;
288 /// Generic storage for types that implement the `OutputStream` interface
289 alias OutputStreamProxy = InterfaceProxy!OutputStream;
290 /// Generic storage for types that implement the `Stream` interface
291 alias StreamProxy = InterfaceProxy!Stream;
292 /// Generic storage for types that implement the `ConnectionStream` interface
293 alias ConnectionStreamProxy = InterfaceProxy!ConnectionStream;
294 /// Generic storage for types that implement the `RandomAccessStream` interface
295 alias RandomAccessStreamProxy = InterfaceProxy!RandomAccessStream;
296 
297 
298 /** Tests if the given aggregate type is a valid input stream.
299 
300 	See_also: `validateInputStream`
301 */
302 enum isInputStream(T) = checkInterfaceConformance!(T, InputStream) is null;
303 
304 /** Tests if the given aggregate type is a valid output stream.
305 
306 	See_also: `validateOutputStream`
307 */
308 enum isOutputStream(T) = checkInterfaceConformance!(T, OutputStream) is null;
309 
310 /** Tests if the given aggregate type is a valid bidirectional stream.
311 
312 	See_also: `validateStream`
313 */
314 enum isStream(T) = checkInterfaceConformance!(T, Stream) is null;
315 
316 /** Tests if the given aggregate type is a valid connection stream.
317 
318 	See_also: `validateConnectionStream`
319 */
320 enum isConnectionStream(T) = checkInterfaceConformance!(T, ConnectionStream) is null;
321 
322 /** Tests if the given aggregate type is a valid random access stream.
323 
324 	See_also: `validateRandomAccessStream`
325 */
326 enum isRandomAccessStream(T) = checkInterfaceConformance!(T, RandomAccessStream) is null;
327 
328 /** Verifies that the given type is a valid input stream.
329 
330 	A valid input stream type must implement all methods of the `InputStream` interface. Inheriting
331 	form `InputStream` is not strictly necessary, which also enables struct types to be considered
332 	as stream implementations.
333 
334 	See_Also: `isInputStream`
335 */
336 mixin template validateInputStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, .InputStream); }
337 
338 /** Verifies that the given type is a valid output stream.
339 
340 	A valid output stream type must implement all methods of the `OutputStream` interface. Inheriting
341 	form `OutputStream` is not strictly necessary, which also enables struct types to be considered
342 	as stream implementations.
343 
344 	See_Also: `isOutputStream`
345 */
346 mixin template validateOutputStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, .OutputStream); }
347 
348 /** Verifies that the given type is a valid bidirectional stream.
349 
350 	A valid stream type must implement all methods of the `Stream` interface. Inheriting
351 	form `Stream` is not strictly necessary, which also enables struct types to be considered
352 	as stream implementations.
353 
354 	See_Also: `isStream`
355 */
356 mixin template validateStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, .Stream); }
357 
358 /** Verifies that the given type is a valid connection stream.
359 
360 	A valid connection stream type must implement all methods of the `ConnectionStream` interface.
361 	Inheriting form `ConnectionStream` is not strictly necessary, which also enables struct types
362 	to be considered as stream implementations.
363 
364 	See_Also: `isConnectionStream`
365 */
366 mixin template validateConnectionStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, .ConnectionStream); }
367 
368 /** Verifies that the given type is a valid random access stream.
369 
370 	A valid random access stream type must implement all methods of the `RandomAccessStream`
371 	interface. Inheriting form `RandomAccessStream` is not strictly necessary, which also enables
372 	struct types to be considered as stream implementations.
373 
374 	See_Also: `isRandomAccessStream`
375 */
376 mixin template validateRandomAccessStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, .RandomAccessStream); }