1 /**
2 	Generic stream interface used by several stream-like classes.
3 
4 	This module defines the basic (buffered) stream primitives. For concrete stream types, take a
5 	look at the `vibe.stream` package. The `vibe.stream.operations` module contains additional
6 	high-level operations on streams, such as reading streams by line or as a whole.
7 
8 	Note that starting with vibe-core 1.0.0, streams can be of either `struct`  or `class` type.
9 	Any APIs that take streams as a parameter should use a template type parameter that is tested
10 	using the appropriate trait (e.g. `isInputStream`) instead of assuming the specific interface
11 	type (e.g. `InputStream`).
12 
13 	Copyright: © 2012-2016 Sönke Ludwig
14 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
15 	Authors: Sönke Ludwig
16 */
17 module vibe.core.stream;
18 
19 import vibe.internal.traits : checkInterfaceConformance, validateInterfaceConformance;
20 import vibe.internal.interfaceproxy;
21 import core.time;
22 import std.algorithm;
23 import std.conv;
24 
25 public import eventcore.driver : IOMode;
26 
27 
28 /** Pipes an InputStream directly into this OutputStream.
29 
30 	The number of bytes written is either the whole input stream when
31 	`nbytes == ulong.max`, or exactly `nbytes` for `nbytes < ulong.max`. If the
32 	input stream contains less than `nbytes` of data, an exception is thrown.
33 
34 	Returns:
35 		The actual number of bytes written is returned. If `nbytes` is given
36 		and not equal to `ulong.max`, íts value will be returned.
37 */
38 ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink,
39 	ulong nbytes, PipeMode mode = PipeMode.sequential) @blocking @trusted
40 	if (isOutputStream!OutputStream && isInputStream!InputStream)
41 {
42 	import vibe.internal.allocator : theAllocator, makeArray, dispose;
43 	import vibe.core.core : runTask;
44 	import vibe.core.sync : LocalManualEvent, createManualEvent;
45 	import vibe.core.task : InterruptException;
46 
47 	final switch (mode) {
48 		case PipeMode.sequential:
49 			{
50 				scope buffer = cast(ubyte[]) theAllocator.allocate(64*1024);
51 				scope (exit) theAllocator.dispose(buffer);
52 
53 				ulong ret = 0;
54 
55 				if (nbytes == ulong.max) {
56 					while (!source.empty) {
57 						size_t chunk = min(source.leastSize, buffer.length);
58 						assert(chunk > 0, "leastSize returned zero for non-empty stream.");
59 						//logTrace("read pipe chunk %d", chunk);
60 						source.read(buffer[0 .. chunk]);
61 						sink.write(buffer[0 .. chunk]);
62 						ret += chunk;
63 					}
64 				} else {
65 					while (nbytes > 0) {
66 						size_t chunk = min(nbytes, buffer.length);
67 						//logTrace("read pipe chunk %d", chunk);
68 						source.read(buffer[0 .. chunk]);
69 						sink.write(buffer[0 .. chunk]);
70 						nbytes -= chunk;
71 						ret += chunk;
72 					}
73 				}
74 
75 				return ret;
76 			}
77 		case PipeMode.concurrent:
78 			{
79 				enum bufcount = 4;
80 				enum bufsize = 4*1024*1024;
81 
82 				static struct ConcurrentPipeState {
83 					InputStream source;
84 					OutputStream sink;
85 					ulong nbytes;
86 					ubyte[][bufcount] buffers;
87 					size_t[bufcount] bufferFill;
88 					// buffer index that is being read/written
89 					size_t read_idx = 0, write_idx = 0;
90 					Exception readex;
91 					bool done = false;
92 					LocalManualEvent evt;
93 					size_t bytesWritten;
94 
95 					void readLoop()
96 					{
97 						// gradually increased depending on read speed
98 						size_t rbsize = 64*1024;
99 
100 						while (true) {
101 							ulong remaining = nbytes == ulong.max ? source.leastSize : nbytes;
102 							if (remaining == 0) break;
103 
104 							while (read_idx >= write_idx + buffers.length)
105 								evt.wait();
106 
107 							size_t chunk = min(remaining, rbsize);
108 							auto bi = read_idx % bufcount;
109 
110 							auto tm = MonoTime.currTime;
111 							source.read(buffers[bi][0 .. chunk]);
112 							if (rbsize < bufsize && MonoTime.currTime - tm < 100.msecs)
113 								rbsize *= 2;
114 							if (nbytes != ulong.max) nbytes -= chunk;
115 							bytesWritten += chunk;
116 							bufferFill[bi] = chunk;
117 							if (write_idx >= read_idx++)
118 								evt.emit();
119 						}
120 					}
121 
122 					void writeLoop()
123 					{
124 						while (read_idx > write_idx || !done) {
125 							while (read_idx <= write_idx) {
126 								if (done) return;
127 								evt.wait();
128 							}
129 
130 							auto bi = write_idx % bufcount;
131 							sink.write(buffers[bi][0 .. bufferFill[bi]]);
132 
133 							// notify reader that we just made a buffer available
134 							if (write_idx++ <= read_idx - buffers.length)
135 								evt.emit();
136 						}
137 					}
138 				}
139 
140 				scope buffer = cast(ubyte[]) theAllocator.allocate(bufcount * bufsize);
141 				scope (exit) theAllocator.dispose(buffer);
142 
143 				ConcurrentPipeState state;
144 				foreach (i; 0 .. bufcount)
145 					state.buffers[i] = buffer[i*($/bufcount) .. (i+1)*($/bufcount)];
146 				swap(state.source, source);
147 				swap(state.sink, sink);
148 				state.nbytes = nbytes;
149 				state.evt = createManualEvent();
150 
151 				auto reader = runTask(function(ConcurrentPipeState* state) nothrow {
152 						try state.readLoop();
153 						catch (InterruptException e) {}
154 						catch (Exception e) state.readex = e;
155 						state.done = true;
156 						state.evt.emit();
157 					}, &state);
158 
159 				scope (failure) {
160 					reader.interrupt();
161 					reader.joinUninterruptible();
162 				}
163 
164 				state.writeLoop();
165 
166 				reader.join();
167 
168 				if (state.readex) throw state.readex;
169 
170 				return state.bytesWritten;
171 			}
172 	}
173 }
174 /// ditto
175 ulong pipe(InputStream, OutputStream)(InputStream source, OutputStream sink,
176 	PipeMode mode = PipeMode.sequential) @blocking
177 	if (isOutputStream!OutputStream && isInputStream!InputStream)
178 {
179 	return pipe(source, sink, ulong.max, mode);
180 }
181 
182 enum PipeMode {
183 	/** Sequentially reads into a buffer and writes it out to the sink.
184 
185 		This mode reads and writes to the same buffer in a ping-pong fashion.
186 		The memory overhead is low, but if the source does not support
187 		read-ahead buffering, or the sink does not have an internal buffer that
188 		is drained asynchronously, the total throghput will be reduced.
189 	*/
190 	sequential,
191 
192 	/** Uses a task to concurrently read and write.
193 
194 		This mode maximizes throughput at the expense of setting up a task and
195 		associated sycnronization.
196 	*/
197 	concurrent
198 }
199 
200 
201 /** Marks a function as blocking.
202 
203 	Blocking in this case means that it may contain an operation that needs to wait for
204 	external events, such as I/O operations, and may result in other tasks in the same
205 	threa being executed before it returns.
206 
207 	Currently this attribute serves only as a documentation aid and is not enforced
208 	or used for deducation in any way.
209 */
210 struct blocking {}
211 
212 /**************************************************************************************************/
213 /* Public functions                                                                               */
214 /**************************************************************************************************/
215 
216 /**
217 	Returns a `NullOutputStream` instance.
218 
219 	The instance will only be created on the first request and gets reused for
220 	all subsequent calls from the same thread.
221 */
222 NullOutputStream nullSink() @safe nothrow
223 {
224 	static NullOutputStream ret;
225 	if (!ret) ret = new NullOutputStream;
226 	return ret;
227 }
228 
229 /**************************************************************************************************/
230 /* Public types                                                                                   */
231 /**************************************************************************************************/
232 
233 /**
234 	Interface for all classes implementing readable streams.
235 */
236 interface InputStream {
237 	@safe:
238 
239 	/** Returns true $(I iff) the end of the input stream has been reached.
240 
241 		For connection oriented streams, this function will block until either
242 		new data arrives or the connection got closed.
243 	*/
244 	@property bool empty() @blocking;
245 
246 	/**	(Scheduled for deprecation) Returns the maximum number of bytes that are known to remain available for read.
247 
248 		After `leastSize()` bytes have been read, the stream will either have reached EOS
249 		and `empty()` returns `true`, or `leastSize()` returns again a number greater than `0`.
250 	*/
251 	@property ulong leastSize() @blocking;
252 
253 	/** (Scheduled for deprecation) Queries if there is data available for immediate, non-blocking read.
254 	*/
255 	@property bool dataAvailableForRead();
256 
257 	/** Returns a temporary reference to the data that is currently buffered.
258 
259 		The returned slice typically has the size `leastSize()` or `0` if `dataAvailableForRead()`
260 		returns `false`. Streams that don't have an internal buffer will always return an empty
261 		slice.
262 
263 		Note that any method invocation on the same stream potentially invalidates the contents of
264 		the returned buffer.
265 	*/
266 	const(ubyte)[] peek();
267 
268 	/**	Fills the preallocated array 'bytes' with data from the stream.
269 
270 		This function will continue read from the stream until the buffer has
271 		been fully filled.
272 
273 		Params:
274 			dst = The buffer into which to write the data that was read
275 			mode = Optional reading mode (defaults to `IOMode.all`).
276 
277 		Return:
278 			Returns the number of bytes read. The `dst` buffer will be filled up
279 			to this index. The return value is guaranteed to be `dst.length` for
280 			`IOMode.all`.
281 
282 		Throws: An exception if the operation reads past the end of the stream
283 
284 		See_Also: `readOnce`, `tryRead`
285 	*/
286 	size_t read(scope ubyte[] dst, IOMode mode) @blocking;
287 	/// ditto
288 	final void read(scope ubyte[] dst) @blocking { auto n = read(dst, IOMode.all); assert(n == dst.length); }
289 }
290 
291 
292 /**
293 	Interface for all classes implementing writeable streams.
294 */
295 interface OutputStream {
296 	@safe:
297 
298 	/** Writes an array of bytes to the stream.
299 	*/
300 	size_t write(in ubyte[] bytes, IOMode mode) @blocking;
301 	/// ditto
302 	final void write(in ubyte[] bytes) @blocking { auto n = write(bytes, IOMode.all); assert(n == bytes.length); }
303 	/// ditto
304 	final void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); }
305 
306 	/** Flushes the stream and makes sure that all data is being written to the output device.
307 	*/
308 	void flush() @blocking;
309 
310 	/** Flushes and finalizes the stream.
311 
312 		Finalize has to be called on certain types of streams. No writes are possible after a
313 		call to finalize().
314 	*/
315 	void finalize() @blocking;
316 }
317 
318 /**
319 	Interface for all classes implementing readable and writable streams.
320 */
321 interface Stream : InputStream, OutputStream {
322 }
323 
324 
325 /**
326 	Interface for streams based on a connection.
327 
328 	Connection streams are based on streaming socket connections, pipes and similar end-to-end
329 	streams.
330 
331 	See_also: `vibe.core.net.TCPConnection`
332 */
333 interface ConnectionStream : Stream {
334 	@safe:
335 
336 	/** Determines The current connection status.
337 
338 		If `connected` is `false`, writing to the connection will trigger an exception. Reading may
339 		still succeed as long as there is data left in the input buffer. Use `InputStream.empty`
340 		instead to determine when to stop reading.
341 	*/
342 	@property bool connected() const;
343 
344 	/** Actively closes the connection and frees associated resources.
345 
346 		Note that close must always be called, even if the remote has already closed the connection.
347 		Failure to do so will result in resource and memory leakage.
348 
349 		Closing a connection implies a call to `finalize`, so that it doesn't need to be called
350 		explicitly (it will be a no-op in that case).
351 	*/
352 	void close() @blocking;
353 
354 	/** Blocks until data becomes available for read.
355 
356 		The maximum wait time can be customized with the `timeout` parameter. If there is already
357 		data availabe for read, or if the connection is closed, the function will return immediately
358 		without blocking.
359 
360 		Params:
361 			timeout = Optional timeout, the default value of `Duration.max` waits without a timeout.
362 
363 		Returns:
364 			The function will return `true` if data becomes available before the timeout is reached.
365 			If the connection gets closed, or the timeout gets reached, `false` is returned instead.
366 	*/
367 	bool waitForData(Duration timeout = Duration.max) @blocking;
368 }
369 
370 
371 /**
372 	Interface for all streams supporting random access.
373 */
374 interface RandomAccessStream : Stream {
375 	@safe:
376 
377 	/// Returns the total size of the file.
378 	@property ulong size() const nothrow;
379 
380 	/// Determines if this stream is readable.
381 	@property bool readable() const nothrow;
382 
383 	/// Determines if this stream is writable.
384 	@property bool writable() const nothrow;
385 
386 	/// Seeks to a specific position in the file if supported by the stream.
387 	void seek(ulong offset) @blocking;
388 
389 	/// Returns the current offset of the file pointer
390 	ulong tell() nothrow;
391 }
392 
393 
394 /** Extended form of a `RandomAccessStream` that supports truncation/extension.
395 */
396 interface TruncatableStream : RandomAccessStream {
397 @safe:
398 
399 	// Note that truncate should be part of RandomAccessStream
400 	/// Truncates or extends the size of the stream
401 	void truncate(ulong size) @blocking;
402 }
403 
404 
405 /** Random access stream with support for explicit closing.
406 */
407 interface ClosableRandomAccessStream : TruncatableStream {
408 @safe:
409 
410 	/// Determines if the file stream is still open and accessible
411 	@property bool isOpen() const nothrow;
412 
413 	/** Actively closes the stream and frees associated resources.
414 
415 		Closing a stream implies a call to `finalize`, so that it doesn't need
416 		to be called explicitly.
417 	*/
418 	void close() @blocking;
419 }
420 
421 
422 /**
423 	Stream implementation acting as a sink with no function.
424 
425 	Any data written to the stream will be ignored and discarded. This stream type is useful if
426 	the output of a particular stream is not needed but the stream needs to be drained.
427 */
428 final class NullOutputStream : OutputStream {
429 	size_t write(in ubyte[] bytes, IOMode) { return bytes.length; }
430 	alias write = OutputStream.write;
431 	void flush() {}
432 	void finalize() {}
433 }
434 
435 
436 /// Generic storage for types that implement the `InputStream` interface
437 alias InputStreamProxy = InterfaceProxy!InputStream;
438 /// Generic storage for types that implement the `OutputStream` interface
439 alias OutputStreamProxy = InterfaceProxy!OutputStream;
440 /// Generic storage for types that implement the `Stream` interface
441 alias StreamProxy = InterfaceProxy!Stream;
442 /// Generic storage for types that implement the `ConnectionStream` interface
443 alias ConnectionStreamProxy = InterfaceProxy!ConnectionStream;
444 /// Generic storage for types that implement the `RandomAccessStream` interface
445 alias RandomAccessStreamProxy = InterfaceProxy!RandomAccessStream;
446 /// Generic storage for types that implement the `RandomAccessStream` interface
447 alias TruncatableStreamProxy = InterfaceProxy!TruncatableStream;
448 /// Generic storage for types that implement the `RandomAccessStream` interface
449 alias ClosableRandomAccessStreamProxy = InterfaceProxy!ClosableRandomAccessStream;
450 
451 
452 /** Tests if the given aggregate type is a valid input stream.
453 
454 	See_also: `validateInputStream`
455 */
456 enum isInputStream(T) = checkInterfaceConformance!(T, InputStream) is null;
457 
458 /** Tests if the given aggregate type is a valid output stream.
459 
460 	See_also: `validateOutputStream`
461 */
462 enum isOutputStream(T) = checkInterfaceConformance!(T, OutputStream) is null;
463 
464 /** Tests if the given aggregate type is a valid bidirectional stream.
465 
466 	See_also: `validateStream`
467 */
468 enum isStream(T) = checkInterfaceConformance!(T, Stream) is null;
469 
470 /** Tests if the given aggregate type is a valid connection stream.
471 
472 	See_also: `validateConnectionStream`
473 */
474 enum isConnectionStream(T) = checkInterfaceConformance!(T, ConnectionStream) is null;
475 
476 /** Tests if the given aggregate type is a valid random access stream.
477 
478 	See_also: `validateRandomAccessStream`
479 */
480 enum isRandomAccessStream(T) = checkInterfaceConformance!(T, RandomAccessStream) is null;
481 
482 /** Tests if the given aggregate type is a valid random access stream.
483 
484 	See_also: `validateRandomAccessStream`
485 */
486 enum isTruncatableStream(T) = checkInterfaceConformance!(T, TruncatableStream) is null;
487 
488 /** Tests if the given aggregate type is a valid random access stream.
489 
490 	See_also: `validateRandomAccessStream`
491 */
492 enum isClosableRandomAccessStream(T) = checkInterfaceConformance!(T, ClosableRandomAccessStream) is null;
493 
494 /** Verifies that the given type is a valid input stream.
495 
496 	A valid input stream type must implement all methods of the `InputStream` interface. Inheriting
497 	form `InputStream` is not strictly necessary, which also enables struct types to be considered
498 	as stream implementations.
499 
500 	See_Also: `isInputStream`
501 */
502 mixin template validateInputStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, .InputStream); }
503 
504 /** Verifies that the given type is a valid output stream.
505 
506 	A valid output stream type must implement all methods of the `OutputStream` interface. Inheriting
507 	form `OutputStream` is not strictly necessary, which also enables struct types to be considered
508 	as stream implementations.
509 
510 	See_Also: `isOutputStream`
511 */
512 mixin template validateOutputStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, .OutputStream); }
513 
514 /** Verifies that the given type is a valid bidirectional stream.
515 
516 	A valid stream type must implement all methods of the `Stream` interface. Inheriting
517 	form `Stream` is not strictly necessary, which also enables struct types to be considered
518 	as stream implementations.
519 
520 	See_Also: `isStream`
521 */
522 mixin template validateStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, .Stream); }
523 
524 /** Verifies that the given type is a valid connection stream.
525 
526 	A valid connection stream type must implement all methods of the `ConnectionStream` interface.
527 	Inheriting form `ConnectionStream` is not strictly necessary, which also enables struct types
528 	to be considered as stream implementations.
529 
530 	See_Also: `isConnectionStream`
531 */
532 mixin template validateConnectionStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, .ConnectionStream); }
533 
534 /** Verifies that the given type is a valid random access stream.
535 
536 	A valid random access stream type must implement all methods of the `RandomAccessStream`
537 	interface. Inheriting form `RandomAccessStream` is not strictly necessary, which also enables
538 	struct types to be considered as stream implementations.
539 
540 	See_Also: `isRandomAccessStream`
541 */
542 mixin template validateRandomAccessStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, .RandomAccessStream); }
543 
544 /** Verifies that the given type is a valid truncatable random access stream.
545 
546 	A valid random access stream type must implement all methods of the `TruncatableStream`
547 	interface. Inheriting form `TruncatableStream` is not strictly necessary, which also enables
548 	struct types to be considered as stream implementations.
549 
550 	See_Also: `isTruncatableStream`
551 */
552 mixin template validateTruncatableStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, .TruncatableStream); }
553 
554 /** Verifies that the given type is a valid closable random access stream.
555 
556 	A valid random access stream type must implement all methods of the `ClosableRandomAccessStream`
557 	interface. Inheriting form `ClosableRandomAccessStream` is not strictly necessary, which also enables
558 	struct types to be considered as stream implementations.
559 
560 	See_Also: `isClosableRandomAccessStream`
561 */
562 mixin template validateClosableRandomAccessStream(T) { import vibe.internal.traits : validateInterfaceConformance; mixin validateInterfaceConformance!(T, .ClosableRandomAccessStream); }