1 /**
2 	Functions and structures for dealing with subprocesses and pipes.
3 
4 	This module is modeled after std.process, but provides a fiber-aware
5 	alternative to it. All blocking operations will yield the calling fiber
6 	instead of blocking it.
7 */
8 module vibe.core.process;
9 
10 public import std.process : Pid, Redirect;
11 static import std.process;
12 
13 import core.time;
14 import std.array;
15 import std.typecons;
16 import std.exception : enforce;
17 import std.algorithm;
18 import eventcore.core;
19 import vibe.core.path;
20 import vibe.core.log;
21 import vibe.core.stream;
22 import vibe.internal.async;
23 import vibe.internal.array : BatchBuffer;
24 import vibe.core.internal.release;
25 
26 @safe:
27 
28 /**
29 	Register a process with vibe for fibre-aware handling. This process can be
30 	started from anywhere including external libraries or std.process.
31 
32 	Params:
33 		pid = A Pid or OS process id
34 */
35 Process adoptProcessID(Pid pid)
36 @trusted {
37 	return adoptProcessID(pid.processID);
38 }
39 
40 /// ditto
41 Process adoptProcessID(int pid)
42 {
43 	auto p = eventDriver.processes.adopt(pid);
44 	if (p == ProcessID.invalid)
45 		throw new Exception("Failed to adopt process ID");
46 	return Process(p);
47 }
48 
49 /**
50 	Path to the user's preferred command interpreter.
51 
52 	See_Also: `nativeShell`
53 */
54 @property NativePath userShell() { return NativePath(std.process.userShell); }
55 
56 /**
57 	The platform specific native shell path.
58 
59 	See_Also: `userShell`
60 */
61 const NativePath nativeShell = NativePath(std.process.nativeShell);
62 
63 /**
64 	Equivalent to `std.process.Config` except with less flag support
65 */
66 enum Config {
67 	none = ProcessConfig.none,
68 	newEnv = ProcessConfig.newEnv,
69 	suppressConsole = ProcessConfig.suppressConsole,
70 	detached = ProcessConfig.detached,
71 }
72 
73 /**
74 	Equivalent to `std.process.spawnProcess`.
75 
76 	Returns:
77 		A reference to the running process.
78 
79 	See_Also: `pipeProcess`, `execute`
80 */
81 Process spawnProcess(
82 	scope string[] args,
83 	const string[string] env = null,
84 	Config config = Config.none,
85 	scope NativePath workDir = NativePath.init)
86 @trusted {
87 	auto process = eventDriver.processes.spawn(
88 		args,
89 		ProcessStdinFile(ProcessRedirect.inherit),
90 		ProcessStdoutFile(ProcessRedirect.inherit),
91 		ProcessStderrFile(ProcessRedirect.inherit),
92 		env,
93 		config,
94 		workDir.toNativeString());
95 
96 	if (process.pid == ProcessID.invalid)
97 		throw new Exception("Failed to spawn process");
98 
99 	return Process(process.pid);
100 }
101 
102 /// ditto
103 Process spawnProcess(
104 	scope string program,
105 	const string[string] env = null,
106 	Config config = Config.none,
107 	scope NativePath workDir = NativePath.init)
108 {
109 	return spawnProcess(
110 		[program],
111 		env,
112 		config,
113 		workDir
114 	);
115 }
116 
117 /// ditto
118 Process spawnShell(
119 	scope string command,
120 	const string[string] env = null,
121 	Config config = Config.none,
122 	scope NativePath workDir = NativePath.init,
123 	scope NativePath shellPath = nativeShell)
124 {
125 	return spawnProcess(
126 		shellCommand(command, shellPath),
127 		env,
128 		config,
129 		workDir);
130 }
131 
132 private string[] shellCommand(string command, NativePath shellPath)
133 {
134 	version (Windows)
135 	{
136 		// CMD does not parse its arguments like other programs.
137 		// It does not use CommandLineToArgvW.
138 		// Instead, it treats the first and last quote specially.
139 		// See CMD.EXE /? for details.
140 		return [
141 			std.process.escapeShellFileName(shellPath.toNativeString())
142 				~ ` /C "` ~ command ~ `"`
143 		];
144 	}
145 	else version (Posix)
146 	{
147 		return [
148 			shellPath.toNativeString(),
149 			"-c",
150 			command,
151 		];
152 	}
153 }
154 
155 /**
156 	Represents a running process.
157 */
158 struct Process {
159 	private static struct Context {
160 		//Duration waitTimeout;
161 		shared(NativeEventDriver) driver;
162 	}
163 
164 	private {
165 		ProcessID m_pid;
166 		Context* m_context;
167 	}
168 
169 	private this(ProcessID p)
170 	nothrow {
171 		assert(p != ProcessID.invalid);
172 		m_pid = p;
173 		m_context = () @trusted { return &eventDriver.processes.userData!Context(p); } ();
174 		m_context.driver = () @trusted { return cast(shared)eventDriver; } ();
175 	}
176 
177 	this(this)
178 	nothrow {
179 		if (m_pid != ProcessID.invalid)
180 			eventDriver.processes.addRef(m_pid);
181 	}
182 
183 	~this()
184 	nothrow {
185 		if (m_pid != ProcessID.invalid)
186 			releaseHandle!"processes"(m_pid, m_context.driver);
187 	}
188 
189 	/**
190 		Check whether this is a valid process handle. The process may have
191 		exited already.
192 	*/
193 	bool opCast(T)() const nothrow if (is(T == bool)) { return m_pid != ProcessID.invalid; }
194 
195 	///
196 	unittest {
197 		Process p;
198 
199 		assert(!p);
200 	}
201 
202 	/**
203 		An operating system handle to the process.
204 	*/
205 	@property int pid() const nothrow @nogc { return cast(int)m_pid; }
206 
207 	/**
208 		Whether the process has exited.
209 	*/
210 	@property bool exited() const nothrow { return eventDriver.processes.hasExited(m_pid); }
211 
212 	/**
213 		Wait for the process to exit, allowing other fibers to continue in the
214 		meantime.
215 
216 		Params:
217 			timeout = Optionally wait until a timeout is reached.
218 
219 		Returns:
220 			The exit code of the process. If a timeout is given and reached, a
221 			null value is returned.
222 	*/
223 	int wait()
224 	@blocking {
225 		return asyncAwaitUninterruptible!(ProcessWaitCallback,
226 			cb => eventDriver.processes.wait(m_pid, cb)
227 		)[1];
228 	}
229 
230 	/// Ditto
231 	Nullable!int wait(Duration timeout)
232 	@blocking {
233 		size_t waitId;
234 		bool cancelled = false;
235 
236 		int code = asyncAwaitUninterruptible!(ProcessWaitCallback,
237 			(cb) nothrow @safe {
238 				waitId = eventDriver.processes.wait(m_pid, cb);
239 			},
240 			(cb) nothrow @safe {
241 				eventDriver.processes.cancelWait(m_pid, waitId);
242 				cancelled = true;
243 			},
244 		)(timeout)[1];
245 
246 		if (cancelled) {
247 			return Nullable!int.init;
248 		} else {
249 			return code.nullable;
250 		}
251 	}
252 
253 	/**
254 		Kill the process.
255 
256 		By default on Linux this sends SIGTERM to the process.
257 
258 		Params:
259 			signal = Optional parameter for the signal to send to the process.
260 	*/
261 	void kill()
262 	{
263 		version (Posix)
264 		{
265 			import core.sys.posix.signal : SIGTERM;
266 			eventDriver.processes.kill(m_pid, SIGTERM);
267 		}
268 		else
269 		{
270 			eventDriver.processes.kill(m_pid, 1);
271 		}
272 	}
273 
274 	/// ditto
275 	void kill(int signal)
276 	{
277 		eventDriver.processes.kill(m_pid, signal);
278 	}
279 
280 	/**
281 		Terminate the process immediately.
282 
283 		On Linux this sends SIGKILL to the process.
284 	*/
285 	void forceKill()
286 	{
287 		version (Posix)
288 		{
289 			import core.sys.posix.signal : SIGKILL;
290 			eventDriver.processes.kill(m_pid, SIGKILL);
291 		}
292 		else
293 		{
294 			eventDriver.processes.kill(m_pid, 1);
295 		}
296 	}
297 
298 	/**
299 		Wait for the process to exit until a timeout is reached. If the process
300 		doesn't exit before the timeout, force kill it.
301 
302 		Returns:
303 			The process exit code.
304 	*/
305 	int waitOrForceKill(Duration timeout)
306 	@blocking {
307 		auto code = wait(timeout);
308 
309 		if (code.isNull) {
310 			forceKill();
311 			return wait();
312 		} else {
313 			return code.get;
314 		}
315 	}
316 }
317 
318 /**
319 	A stream for tBatchBufferhe write end of a pipe.
320 */
321 struct PipeInputStream {
322 	private static struct Context {
323 		BatchBuffer!ubyte readBuffer;
324 		shared(NativeEventDriver) driver;
325 	}
326 
327 	private {
328 		PipeFD m_pipe;
329 		Context* m_context;
330 	}
331 
332 	private this(PipeFD pipe)
333 	nothrow {
334 		m_pipe = pipe;
335 		if (pipe != PipeFD.invalid) {
336 			m_context = () @trusted { return &eventDriver.pipes.userData!Context(pipe); } ();
337 			m_context.readBuffer.capacity = 4096;
338 			m_context.driver = () @trusted { return cast(shared)eventDriver; } ();
339 		}
340 	}
341 
342 	this(this)
343 	nothrow {
344 		if (m_pipe != PipeFD.invalid)
345 			eventDriver.pipes.addRef(m_pipe);
346 	}
347 
348 	~this()
349 	nothrow {
350 		if (m_pipe != PipeFD.invalid)
351 			releaseHandle!"pipes"(m_pipe, m_context.driver);
352 	}
353 
354 	bool opCast(T)() const nothrow if (is(T == bool)) { return m_pipes != PipeFD.invalid; }
355 
356 	@property bool empty() @blocking { return leastSize == 0; }
357 	@property ulong leastSize()
358 	@blocking {
359 		waitForData();
360 		return m_context ? m_context.readBuffer.length : 0;
361 	}
362 	@property bool dataAvailableForRead() { return waitForData(0.seconds); }
363 
364 	bool waitForData(Duration timeout = Duration.max)
365 	@blocking {
366 		if (!m_context) return false;
367 		if (m_context.readBuffer.length > 0) return true;
368 		auto mode = timeout <= 0.seconds ? IOMode.immediate : IOMode.once;
369 
370 		bool cancelled;
371 		IOStatus status;
372 		size_t nbytes;
373 
374 		alias waiter = Waitable!(PipeIOCallback,
375 			cb => eventDriver.pipes.read(m_pipe, m_context.readBuffer.peekDst(), mode, cb),
376 			(cb) {
377 				cancelled = true;
378 				eventDriver.pipes.cancelRead(m_pipe);
379 			},
380 			(pipe, st, nb) {
381 				// Handle closed pipes
382 				if (m_pipe == PipeFD.invalid) {
383 					cancelled = true;
384 					return;
385 				}
386 
387 				assert(pipe == m_pipe);
388 				status = st;
389 				nbytes = nb;
390 			}
391 		);
392 
393 		asyncAwaitAny!(true, waiter)(timeout);
394 
395 		if (cancelled || !m_context) return false;
396 
397 		logTrace("Pipe %s, read %s bytes: %s", m_pipe, nbytes, status);
398 
399 		assert(m_context.readBuffer.length == 0);
400 		m_context.readBuffer.putN(nbytes);
401 		switch (status) {
402 			case IOStatus.ok: break;
403 			case IOStatus.disconnected: break;
404 			case IOStatus.wouldBlock:
405 				assert(mode == IOMode.immediate);
406 				break;
407 			default:
408 				logDebug("Error status when waiting for data: %s", status);
409 				break;
410 		}
411 
412 		return m_context.readBuffer.length > 0;
413 	}
414 
415 	const(ubyte)[] peek() { return m_context ? m_context.readBuffer.peek() : null; }
416 
417 	size_t read(scope ubyte[] dst, IOMode mode)
418 	@blocking {
419 		if (dst.empty) return 0;
420 
421 		if (m_context.readBuffer.length >= dst.length) {
422 			m_context.readBuffer.read(dst);
423 			return dst.length;
424 		}
425 
426 		size_t nbytes = 0;
427 
428 		while (true) {
429 			if (m_context.readBuffer.length == 0) {
430 				if (mode == IOMode.immediate || mode == IOMode.once && nbytes > 0)
431 					break;
432 
433 				enforce(waitForData(), "Reached end of stream while reading data.");
434 			}
435 
436 			assert(m_context.readBuffer.length > 0);
437 			auto l = min(dst.length, m_context.readBuffer.length);
438 			m_context.readBuffer.read(dst[0 .. l]);
439 			dst = dst[l .. $];
440 			nbytes += l;
441 			if (dst.length == 0)
442 				break;
443 		}
444 
445 		return nbytes;
446 	}
447 
448 	void read(scope ubyte[] dst)
449 	@blocking {
450 		auto r = read(dst, IOMode.all);
451 		assert(r == dst.length);
452 	}
453 
454 	/**
455 		Close the read end of the pipe immediately.
456 
457 		Make sure that the pipe is not used after this is called and is released
458 		as soon as possible. Due to implementation detail in eventcore this
459 		reference could conflict with future pipes.
460 	*/
461 	void close()
462 	nothrow {
463 		if (m_pipe == PipeFD.invalid) return;
464 
465 		asyncAwaitUninterruptible!(PipeCloseCallback,
466 			cb => eventDriver.pipes.close(m_pipe, cb)
467 		);
468 
469 		eventDriver.pipes.releaseRef(m_pipe);
470 		m_pipe = PipeFD.invalid;
471 	}
472 }
473 
474 mixin validateInputStream!PipeInputStream;
475 
476 /**
477 	Stream for the read end of a pipe.
478 */
479 struct PipeOutputStream {
480 	private static struct Context {
481 		shared(NativeEventDriver) driver;
482 	}
483 
484 	private {
485 		PipeFD m_pipe;
486 		Context* m_context;
487 	}
488 
489 	private this(PipeFD pipe)
490 	nothrow {
491 		m_pipe = pipe;
492 		if (pipe != PipeFD.invalid) {
493 			m_context = () @trusted { return &eventDriver.pipes.userData!Context(pipe); } ();
494 			m_context.driver = () @trusted { return cast(shared)eventDriver; } ();
495 		}
496 	}
497 
498 	this(this)
499 	nothrow {
500 		if (m_pipe != PipeFD.invalid)
501 			eventDriver.pipes.addRef(m_pipe);
502 	}
503 
504 	~this()
505 	nothrow {
506 		if (m_pipe != PipeFD.invalid)
507 			releaseHandle!"pipes"(m_pipe, m_context.driver);
508 	}
509 
510 	bool opCast(T)() const nothrow if (is(T == bool)) { return m_pipes != PipeFD.invalid; }
511 
512 	size_t write(in ubyte[] bytes, IOMode mode)
513 	@blocking {
514 		if (bytes.empty) return 0;
515 
516 		auto res = asyncAwait!(PipeIOCallback,
517 			cb => eventDriver.pipes.write(m_pipe, bytes, mode, cb),
518 			cb => eventDriver.pipes.cancelWrite(m_pipe));
519 
520 		switch (res[1]) {
521 			case IOStatus.ok: break;
522 			case IOStatus.disconnected: break;
523 			default:
524 				throw new Exception("Error writing data to pipe.");
525 		}
526 
527 		return res[2];
528 	}
529 
530 	void write(in ubyte[] bytes) @blocking { auto r = write(bytes, IOMode.all); assert(r == bytes.length); }
531 	void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); }
532 
533 	void flush() {}
534 	void finalize() {}
535 
536 	/**
537 		Close the write end of the pipe immediately.
538 
539 		Make sure that the pipe is not used after this is called and is released
540 		as soon as possible. Due to implementation detail in eventcore this
541 		reference could conflict with future pipes.
542 	*/
543 	void close()
544 	nothrow {
545 		if (m_pipe == PipeFD.invalid) return;
546 
547 		asyncAwaitUninterruptible!(PipeCloseCallback,
548 			cb => eventDriver.pipes.close(m_pipe, cb)
549 		);
550 
551 		eventDriver.pipes.releaseRef(m_pipe);
552 		m_pipe = PipeFD.invalid;
553 	}
554 }
555 
556 mixin validateOutputStream!PipeOutputStream;
557 
558 /**
559 	A pipe created by `pipe`.
560 */
561 struct Pipe {
562 	/// Read end of the pipe
563 	PipeInputStream readEnd;
564 	/// Write end of the pipe
565 	PipeOutputStream writeEnd;
566 
567 	/**
568 		Close both ends of the pipe
569 	*/
570 	void close()
571 	nothrow {
572 		writeEnd.close();
573 		readEnd.close();
574 	}
575 }
576 
577 /**
578 	Create a pipe, async equivalent of `std.process.pipe`.
579 
580 	Returns:
581 		A stream for each end of the pipe.
582 */
583 Pipe pipe()
584 {
585 	auto p = std.process.pipe();
586 
587 	auto read = eventDriver.pipes.adopt(p.readEnd.fileno);
588 	auto write = eventDriver.pipes.adopt(p.writeEnd.fileno);
589 
590 	return Pipe(PipeInputStream(read), PipeOutputStream(write));
591 }
592 
593 /**
594 	Returned from `pipeProcess`.
595 
596 	See_Also: `pipeProcess`, `pipeShell`
597 */
598 struct ProcessPipes {
599 	Process process;
600 	PipeOutputStream stdin;
601 	PipeInputStream stdout;
602 	PipeInputStream stderr;
603 }
604 
605 /**
606 	Equivalent to `std.process.pipeProcess`.
607 
608 	Returns:
609 		A struct containing the process and created pipes.
610 
611 	See_Also: `spawnProcess`, `execute`
612 */
613 ProcessPipes pipeProcess(
614 	scope string[] args,
615 	Redirect redirect = Redirect.all,
616 	const string[string] env = null,
617 	Config config = Config.none,
618 	scope NativePath workDir = NativePath.init)
619 @trusted {
620 	auto stdin = ProcessStdinFile(ProcessRedirect.inherit);
621 	if (Redirect.stdin & redirect) {
622 		stdin = ProcessStdinFile(ProcessRedirect.pipe);
623 	}
624 
625 	auto stdout = ProcessStdoutFile(ProcessRedirect.inherit);
626 	if (Redirect.stdoutToStderr & redirect) {
627 		stdout = ProcessStdoutFile(ProcessStdoutRedirect.toStderr);
628 	} else if (Redirect.stdout & redirect) {
629 		stdout = ProcessStdoutFile(ProcessRedirect.pipe);
630 	}
631 
632 	auto stderr = ProcessStderrFile(ProcessRedirect.inherit);
633 	if (Redirect.stderrToStdout & redirect) {
634 		stderr = ProcessStderrFile(ProcessStderrRedirect.toStdout);
635 	} else if (Redirect.stderr & redirect) {
636 		stderr = ProcessStderrFile(ProcessRedirect.pipe);
637 	}
638 
639 	auto process = eventDriver.processes.spawn(
640 		args,
641 		stdin,
642 		stdout,
643 		stderr,
644 		env,
645 		config,
646 		workDir.toNativeString());
647 
648 	if (process.pid == ProcessID.invalid)
649 		throw new Exception("Failed to spawn process");
650 
651 	return ProcessPipes(
652 		Process(process.pid),
653 		PipeOutputStream(cast(PipeFD)process.stdin),
654 		PipeInputStream(cast(PipeFD)process.stdout),
655 		PipeInputStream(cast(PipeFD)process.stderr)
656 	);
657 }
658 
659 /// ditto
660 ProcessPipes pipeProcess(
661 	scope string program,
662 	Redirect redirect = Redirect.all,
663 	const string[string] env = null,
664 	Config config = Config.none,
665 	scope NativePath workDir = NativePath.init)
666 {
667 	return pipeProcess(
668 		[program],
669 		redirect,
670 		env,
671 		config,
672 		workDir
673 	);
674 }
675 
676 /// ditto
677 ProcessPipes pipeShell(
678 	scope string command,
679 	Redirect redirect = Redirect.all,
680 	const string[string] env = null,
681 	Config config = Config.none,
682 	scope NativePath workDir = NativePath.init,
683 	scope NativePath shellPath = nativeShell)
684 {
685 	return pipeProcess(
686 		shellCommand(command, nativeShell),
687 		redirect,
688 		env,
689 		config,
690 		workDir);
691 }
692 
693 /**
694 	Equivalent to `std.process.execute`.
695 
696 	Returns:
697 		Tuple containing the exit status and process output.
698 
699 	See_Also: `spawnProcess`, `pipeProcess`
700 */
701 auto execute(
702 	scope string[] args,
703 	const string[string] env = null,
704 	Config config = Config.none,
705 	size_t maxOutput = size_t.max,
706 	scope NativePath workDir = NativePath.init)
707 @blocking {
708 	return executeImpl!pipeProcess(args, env, config, maxOutput, workDir);
709 }
710 
711 /// ditto
712 auto execute(
713 	scope string program,
714 	const string[string] env = null,
715 	Config config = Config.none,
716 	size_t maxOutput = size_t.max,
717 	scope NativePath workDir = NativePath.init)
718 @blocking @trusted {
719 	return executeImpl!pipeProcess(program, env, config, maxOutput, workDir);
720 }
721 
722 /// ditto
723 auto executeShell(
724 	scope string command,
725 	const string[string] env = null,
726 	Config config = Config.none,
727 	size_t maxOutput = size_t.max,
728 	scope NativePath workDir = null,
729 	NativePath shellPath = nativeShell)
730 @blocking {
731 	return executeImpl!pipeShell(command, env, config, maxOutput, workDir, shellPath);
732 }
733 
734 private auto executeImpl(alias spawn, Cmd, Args...)(
735 	Cmd command,
736 	const string[string] env,
737 	Config config,
738 	size_t maxOutput,
739 	scope NativePath workDir,
740 	Args args)
741 @blocking {
742 	Redirect redirect = Redirect.stdout;
743 
744 	auto processPipes = spawn(command, redirect, env, config, workDir, args);
745 
746 	auto stringOutput = processPipes.stdout.collectOutput(maxOutput);
747 
748 	return Tuple!(int, "status", string, "output")(processPipes.process.wait(), stringOutput);
749 }
750 
751 /*
752 	Collect the string output of a stream in a blocking fashion.
753 
754 	Params:
755 		stream = The input stream to collect from.
756 		nbytes = The maximum number of bytes to collect.
757 
758 	Returns:
759 		The collected data from the stream as a string.
760 */
761 /// private
762 string collectOutput(InputStream)(InputStream stream, size_t nbytes = size_t.max)
763 @blocking @trusted if (isInputStream!InputStream) {
764 	auto output = appender!string();
765 	if (nbytes != size_t.max) {
766 		output.reserve(nbytes);
767 	}
768 
769 	import vibe.internal.allocator : theAllocator, dispose;
770 
771 	scope buffer = cast(ubyte[]) theAllocator.allocate(64*1024);
772 	scope (exit) theAllocator.dispose(buffer);
773 
774 	while (!stream.empty && nbytes > 0) {
775 		size_t chunk = min(nbytes, stream.leastSize, buffer.length);
776 		assert(chunk > 0, "leastSize returned zero for non-empty stream.");
777 
778 		stream.read(buffer[0..chunk]);
779 		output.put(buffer[0..chunk]);
780 	}
781 
782 	return output.data;
783 }