1 /** 2 Functions and structures for dealing with subprocesses and pipes. 3 4 This module is modeled after std.process, but provides a fiber-aware 5 alternative to it. All blocking operations will yield the calling fiber 6 instead of blocking it. 7 */ 8 module vibe.core.process; 9 10 public import std.process : Pid, Redirect; 11 static import std.process; 12 13 import core.time; 14 import std.array; 15 import std.typecons; 16 import std.exception : enforce; 17 import std.algorithm; 18 import eventcore.core; 19 import vibe.core.path; 20 import vibe.core.log; 21 import vibe.core.stream; 22 import vibe.internal.async; 23 import vibe.internal.array : BatchBuffer; 24 import vibe.core.internal.release; 25 26 @safe: 27 28 /** 29 Register a process with vibe for fibre-aware handling. This process can be 30 started from anywhere including external libraries or std.process. 31 32 Params: 33 pid = A Pid or OS process id 34 */ 35 Process adoptProcessID(Pid pid) 36 @trusted { 37 return adoptProcessID(pid.processID); 38 } 39 40 /// ditto 41 Process adoptProcessID(int pid) 42 { 43 auto p = eventDriver.processes.adopt(pid); 44 if (p == ProcessID.invalid) 45 throw new Exception("Failed to adopt process ID"); 46 return Process(p); 47 } 48 49 /** 50 Path to the user's preferred command interpreter. 51 52 See_Also: `nativeShell` 53 */ 54 @property NativePath userShell() { return NativePath(std.process.userShell); } 55 56 /** 57 The platform specific native shell path. 58 59 See_Also: `userShell` 60 */ 61 const NativePath nativeShell = NativePath(std.process.nativeShell); 62 63 /** 64 Equivalent to `std.process.Config` except with less flag support 65 */ 66 enum Config { 67 none = ProcessConfig.none, 68 newEnv = ProcessConfig.newEnv, 69 suppressConsole = ProcessConfig.suppressConsole, 70 detached = ProcessConfig.detached, 71 } 72 73 /** 74 Equivalent to `std.process.spawnProcess`. 75 76 Returns: 77 A reference to the running process. 78 79 See_Also: `pipeProcess`, `execute` 80 */ 81 Process spawnProcess( 82 scope string[] args, 83 const string[string] env = null, 84 Config config = Config.none, 85 scope NativePath workDir = NativePath.init) 86 @trusted { 87 auto process = eventDriver.processes.spawn( 88 args, 89 ProcessStdinFile(ProcessRedirect.inherit), 90 ProcessStdoutFile(ProcessRedirect.inherit), 91 ProcessStderrFile(ProcessRedirect.inherit), 92 env, 93 config, 94 workDir.toNativeString()); 95 96 if (process.pid == ProcessID.invalid) 97 throw new Exception("Failed to spawn process"); 98 99 return Process(process.pid); 100 } 101 102 /// ditto 103 Process spawnProcess( 104 scope string program, 105 const string[string] env = null, 106 Config config = Config.none, 107 scope NativePath workDir = NativePath.init) 108 { 109 return spawnProcess( 110 [program], 111 env, 112 config, 113 workDir 114 ); 115 } 116 117 /// ditto 118 Process spawnShell( 119 scope string command, 120 const string[string] env = null, 121 Config config = Config.none, 122 scope NativePath workDir = NativePath.init, 123 scope NativePath shellPath = nativeShell) 124 { 125 return spawnProcess( 126 shellCommand(command, shellPath), 127 env, 128 config, 129 workDir); 130 } 131 132 private string[] shellCommand(string command, NativePath shellPath) 133 { 134 version (Windows) 135 { 136 // CMD does not parse its arguments like other programs. 137 // It does not use CommandLineToArgvW. 138 // Instead, it treats the first and last quote specially. 139 // See CMD.EXE /? for details. 140 return [ 141 std.process.escapeShellFileName(shellPath.toNativeString()) 142 ~ ` /C "` ~ command ~ `"` 143 ]; 144 } 145 else version (Posix) 146 { 147 return [ 148 shellPath.toNativeString(), 149 "-c", 150 command, 151 ]; 152 } 153 } 154 155 /** 156 Represents a running process. 157 */ 158 struct Process { 159 private static struct Context { 160 //Duration waitTimeout; 161 shared(NativeEventDriver) driver; 162 } 163 164 private { 165 ProcessID m_pid; 166 Context* m_context; 167 } 168 169 private this(ProcessID p) 170 nothrow { 171 assert(p != ProcessID.invalid); 172 m_pid = p; 173 m_context = () @trusted { return &eventDriver.processes.userData!Context(p); } (); 174 m_context.driver = () @trusted { return cast(shared)eventDriver; } (); 175 } 176 177 this(this) 178 nothrow { 179 if (m_pid != ProcessID.invalid) 180 eventDriver.processes.addRef(m_pid); 181 } 182 183 ~this() 184 nothrow { 185 if (m_pid != ProcessID.invalid) 186 releaseHandle!"processes"(m_pid, m_context.driver); 187 } 188 189 /** 190 Check whether this is a valid process handle. The process may have 191 exited already. 192 */ 193 bool opCast(T)() const nothrow if (is(T == bool)) { return m_pid != ProcessID.invalid; } 194 195 /// 196 unittest { 197 Process p; 198 199 assert(!p); 200 } 201 202 /** 203 An operating system handle to the process. 204 */ 205 @property int pid() const nothrow @nogc { return cast(int)m_pid; } 206 207 /** 208 Whether the process has exited. 209 */ 210 @property bool exited() const nothrow { return eventDriver.processes.hasExited(m_pid); } 211 212 /** 213 Wait for the process to exit, allowing other fibers to continue in the 214 meantime. 215 216 Params: 217 timeout = Optionally wait until a timeout is reached. 218 219 Returns: 220 The exit code of the process. If a timeout is given and reached, a 221 null value is returned. 222 */ 223 int wait() 224 @blocking { 225 return asyncAwaitUninterruptible!(ProcessWaitCallback, 226 cb => eventDriver.processes.wait(m_pid, cb) 227 )[1]; 228 } 229 230 /// Ditto 231 Nullable!int wait(Duration timeout) 232 @blocking { 233 size_t waitId; 234 bool cancelled = false; 235 236 int code = asyncAwaitUninterruptible!(ProcessWaitCallback, 237 (cb) nothrow @safe { 238 waitId = eventDriver.processes.wait(m_pid, cb); 239 }, 240 (cb) nothrow @safe { 241 eventDriver.processes.cancelWait(m_pid, waitId); 242 cancelled = true; 243 }, 244 )(timeout)[1]; 245 246 if (cancelled) { 247 return Nullable!int.init; 248 } else { 249 return code.nullable; 250 } 251 } 252 253 /** 254 Kill the process. 255 256 By default on Linux this sends SIGTERM to the process. 257 258 Params: 259 signal = Optional parameter for the signal to send to the process. 260 */ 261 void kill() 262 { 263 version (Posix) 264 { 265 import core.sys.posix.signal : SIGTERM; 266 eventDriver.processes.kill(m_pid, SIGTERM); 267 } 268 else 269 { 270 eventDriver.processes.kill(m_pid, 1); 271 } 272 } 273 274 /// ditto 275 void kill(int signal) 276 { 277 eventDriver.processes.kill(m_pid, signal); 278 } 279 280 /** 281 Terminate the process immediately. 282 283 On Linux this sends SIGKILL to the process. 284 */ 285 void forceKill() 286 { 287 version (Posix) 288 { 289 import core.sys.posix.signal : SIGKILL; 290 eventDriver.processes.kill(m_pid, SIGKILL); 291 } 292 else 293 { 294 eventDriver.processes.kill(m_pid, 1); 295 } 296 } 297 298 /** 299 Wait for the process to exit until a timeout is reached. If the process 300 doesn't exit before the timeout, force kill it. 301 302 Returns: 303 The process exit code. 304 */ 305 int waitOrForceKill(Duration timeout) 306 @blocking { 307 auto code = wait(timeout); 308 309 if (code.isNull) { 310 forceKill(); 311 return wait(); 312 } else { 313 return code.get; 314 } 315 } 316 } 317 318 /** 319 A stream for tBatchBufferhe write end of a pipe. 320 */ 321 struct PipeInputStream { 322 private static struct Context { 323 BatchBuffer!ubyte readBuffer; 324 shared(NativeEventDriver) driver; 325 } 326 327 private { 328 PipeFD m_pipe; 329 Context* m_context; 330 } 331 332 private this(PipeFD pipe) 333 nothrow { 334 m_pipe = pipe; 335 if (pipe != PipeFD.invalid) { 336 m_context = () @trusted { return &eventDriver.pipes.userData!Context(pipe); } (); 337 m_context.readBuffer.capacity = 4096; 338 m_context.driver = () @trusted { return cast(shared)eventDriver; } (); 339 } 340 } 341 342 this(this) 343 nothrow { 344 if (m_pipe != PipeFD.invalid) 345 eventDriver.pipes.addRef(m_pipe); 346 } 347 348 ~this() 349 nothrow { 350 if (m_pipe != PipeFD.invalid) 351 releaseHandle!"pipes"(m_pipe, m_context.driver); 352 } 353 354 bool opCast(T)() const nothrow if (is(T == bool)) { return m_pipes != PipeFD.invalid; } 355 356 @property bool empty() @blocking { return leastSize == 0; } 357 @property ulong leastSize() 358 @blocking { 359 waitForData(); 360 return m_context ? m_context.readBuffer.length : 0; 361 } 362 @property bool dataAvailableForRead() { return waitForData(0.seconds); } 363 364 bool waitForData(Duration timeout = Duration.max) 365 @blocking { 366 if (!m_context) return false; 367 if (m_context.readBuffer.length > 0) return true; 368 auto mode = timeout <= 0.seconds ? IOMode.immediate : IOMode.once; 369 370 bool cancelled; 371 IOStatus status; 372 size_t nbytes; 373 374 alias waiter = Waitable!(PipeIOCallback, 375 cb => eventDriver.pipes.read(m_pipe, m_context.readBuffer.peekDst(), mode, cb), 376 (cb) { 377 cancelled = true; 378 eventDriver.pipes.cancelRead(m_pipe); 379 }, 380 (pipe, st, nb) { 381 // Handle closed pipes 382 if (m_pipe == PipeFD.invalid) { 383 cancelled = true; 384 return; 385 } 386 387 assert(pipe == m_pipe); 388 status = st; 389 nbytes = nb; 390 } 391 ); 392 393 asyncAwaitAny!(true, waiter)(timeout); 394 395 if (cancelled || !m_context) return false; 396 397 logTrace("Pipe %s, read %s bytes: %s", m_pipe, nbytes, status); 398 399 assert(m_context.readBuffer.length == 0); 400 m_context.readBuffer.putN(nbytes); 401 switch (status) { 402 case IOStatus.ok: break; 403 case IOStatus.disconnected: break; 404 case IOStatus.wouldBlock: 405 assert(mode == IOMode.immediate); 406 break; 407 default: 408 logDebug("Error status when waiting for data: %s", status); 409 break; 410 } 411 412 return m_context.readBuffer.length > 0; 413 } 414 415 const(ubyte)[] peek() { return m_context ? m_context.readBuffer.peek() : null; } 416 417 size_t read(scope ubyte[] dst, IOMode mode) 418 @blocking { 419 if (dst.empty) return 0; 420 421 if (m_context.readBuffer.length >= dst.length) { 422 m_context.readBuffer.read(dst); 423 return dst.length; 424 } 425 426 size_t nbytes = 0; 427 428 while (true) { 429 if (m_context.readBuffer.length == 0) { 430 if (mode == IOMode.immediate || mode == IOMode.once && nbytes > 0) 431 break; 432 433 enforce(waitForData(), "Reached end of stream while reading data."); 434 } 435 436 assert(m_context.readBuffer.length > 0); 437 auto l = min(dst.length, m_context.readBuffer.length); 438 m_context.readBuffer.read(dst[0 .. l]); 439 dst = dst[l .. $]; 440 nbytes += l; 441 if (dst.length == 0) 442 break; 443 } 444 445 return nbytes; 446 } 447 448 void read(scope ubyte[] dst) 449 @blocking { 450 auto r = read(dst, IOMode.all); 451 assert(r == dst.length); 452 } 453 454 /** 455 Close the read end of the pipe immediately. 456 457 Make sure that the pipe is not used after this is called and is released 458 as soon as possible. Due to implementation detail in eventcore this 459 reference could conflict with future pipes. 460 */ 461 void close() 462 nothrow { 463 if (m_pipe == PipeFD.invalid) return; 464 465 asyncAwaitUninterruptible!(PipeCloseCallback, 466 cb => eventDriver.pipes.close(m_pipe, cb) 467 ); 468 469 eventDriver.pipes.releaseRef(m_pipe); 470 m_pipe = PipeFD.invalid; 471 } 472 } 473 474 mixin validateInputStream!PipeInputStream; 475 476 /** 477 Stream for the read end of a pipe. 478 */ 479 struct PipeOutputStream { 480 private static struct Context { 481 shared(NativeEventDriver) driver; 482 } 483 484 private { 485 PipeFD m_pipe; 486 Context* m_context; 487 } 488 489 private this(PipeFD pipe) 490 nothrow { 491 m_pipe = pipe; 492 if (pipe != PipeFD.invalid) { 493 m_context = () @trusted { return &eventDriver.pipes.userData!Context(pipe); } (); 494 m_context.driver = () @trusted { return cast(shared)eventDriver; } (); 495 } 496 } 497 498 this(this) 499 nothrow { 500 if (m_pipe != PipeFD.invalid) 501 eventDriver.pipes.addRef(m_pipe); 502 } 503 504 ~this() 505 nothrow { 506 if (m_pipe != PipeFD.invalid) 507 releaseHandle!"pipes"(m_pipe, m_context.driver); 508 } 509 510 bool opCast(T)() const nothrow if (is(T == bool)) { return m_pipes != PipeFD.invalid; } 511 512 size_t write(in ubyte[] bytes, IOMode mode) 513 @blocking { 514 if (bytes.empty) return 0; 515 516 auto res = asyncAwait!(PipeIOCallback, 517 cb => eventDriver.pipes.write(m_pipe, bytes, mode, cb), 518 cb => eventDriver.pipes.cancelWrite(m_pipe)); 519 520 switch (res[1]) { 521 case IOStatus.ok: break; 522 case IOStatus.disconnected: break; 523 default: 524 throw new Exception("Error writing data to pipe."); 525 } 526 527 return res[2]; 528 } 529 530 void write(in ubyte[] bytes) @blocking { auto r = write(bytes, IOMode.all); assert(r == bytes.length); } 531 void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); } 532 533 void flush() {} 534 void finalize() {} 535 536 /** 537 Close the write end of the pipe immediately. 538 539 Make sure that the pipe is not used after this is called and is released 540 as soon as possible. Due to implementation detail in eventcore this 541 reference could conflict with future pipes. 542 */ 543 void close() 544 nothrow { 545 if (m_pipe == PipeFD.invalid) return; 546 547 asyncAwaitUninterruptible!(PipeCloseCallback, 548 cb => eventDriver.pipes.close(m_pipe, cb) 549 ); 550 551 eventDriver.pipes.releaseRef(m_pipe); 552 m_pipe = PipeFD.invalid; 553 } 554 } 555 556 mixin validateOutputStream!PipeOutputStream; 557 558 /** 559 A pipe created by `pipe`. 560 */ 561 struct Pipe { 562 /// Read end of the pipe 563 PipeInputStream readEnd; 564 /// Write end of the pipe 565 PipeOutputStream writeEnd; 566 567 /** 568 Close both ends of the pipe 569 */ 570 void close() 571 nothrow { 572 writeEnd.close(); 573 readEnd.close(); 574 } 575 } 576 577 /** 578 Create a pipe, async equivalent of `std.process.pipe`. 579 580 Returns: 581 A stream for each end of the pipe. 582 */ 583 Pipe pipe() 584 { 585 import std.stdio : StdioException; 586 587 version (Posix) { 588 import core.sys.posix.unistd : pipe; 589 int[2] fds; 590 if (pipe(fds) != 0) 591 throw new StdioException("Unable to create pipe"); 592 int readFD = fds[0]; 593 int writeFD = fds[1]; 594 } 595 else version (Windows) { 596 import core.sys.windows.winbase : CreatePipe, GetLastError; 597 import core.sys.windows.windef; 598 import std.windows.syserror : sysErrorString; 599 HANDLE readHandle; 600 HANDLE writeHandle; 601 if (!() @trusted { return CreatePipe(&readHandle, &writeHandle, null, 0); }()) { 602 throw new StdioException( 603 "Error creating pipe (" ~ sysErrorString(GetLastError()) ~ ')', 604 0); 605 } 606 version (CRuntime_DigitalMars) { 607 import core.stdc.stdio : _handleToFD, FHND_DEVICE; 608 int readFD = _handleToFD(readHandle, FHND_DEVICE); 609 int writeFD = _handleToFD(writeHandle, FHND_DEVICE); 610 } else { 611 import core.stdc.stdint : intptr_t; 612 import core.stdc.stdio : _open_osfhandle, O_RDONLY, O_APPEND; 613 int readFD = () @trusted { return _open_osfhandle(cast(intptr_t)readHandle, O_RDONLY); }(); 614 int writeFD = () @trusted { return _open_osfhandle(cast(intptr_t)writeHandle, O_APPEND); }(); 615 } 616 } 617 618 auto read = eventDriver.pipes.adopt(readFD); 619 auto write = eventDriver.pipes.adopt(writeFD); 620 return Pipe(PipeInputStream(read), PipeOutputStream(write)); 621 } 622 623 /** 624 Returned from `pipeProcess`. 625 626 See_Also: `pipeProcess`, `pipeShell` 627 */ 628 struct ProcessPipes { 629 Process process; 630 PipeOutputStream stdin; 631 PipeInputStream stdout; 632 PipeInputStream stderr; 633 } 634 635 /** 636 Equivalent to `std.process.pipeProcess`. 637 638 Returns: 639 A struct containing the process and created pipes. 640 641 See_Also: `spawnProcess`, `execute` 642 */ 643 ProcessPipes pipeProcess( 644 scope string[] args, 645 Redirect redirect = Redirect.all, 646 const string[string] env = null, 647 Config config = Config.none, 648 scope NativePath workDir = NativePath.init) 649 @trusted { 650 auto stdin = ProcessStdinFile(ProcessRedirect.inherit); 651 if (Redirect.stdin & redirect) { 652 stdin = ProcessStdinFile(ProcessRedirect.pipe); 653 } 654 655 auto stdout = ProcessStdoutFile(ProcessRedirect.inherit); 656 if (Redirect.stdoutToStderr & redirect) { 657 stdout = ProcessStdoutFile(ProcessStdoutRedirect.toStderr); 658 } else if (Redirect.stdout & redirect) { 659 stdout = ProcessStdoutFile(ProcessRedirect.pipe); 660 } 661 662 auto stderr = ProcessStderrFile(ProcessRedirect.inherit); 663 if (Redirect.stderrToStdout & redirect) { 664 stderr = ProcessStderrFile(ProcessStderrRedirect.toStdout); 665 } else if (Redirect.stderr & redirect) { 666 stderr = ProcessStderrFile(ProcessRedirect.pipe); 667 } 668 669 auto process = eventDriver.processes.spawn( 670 args, 671 stdin, 672 stdout, 673 stderr, 674 env, 675 config, 676 workDir.toNativeString()); 677 678 if (process.pid == ProcessID.invalid) 679 throw new Exception("Failed to spawn process"); 680 681 return ProcessPipes( 682 Process(process.pid), 683 PipeOutputStream(cast(PipeFD)process.stdin), 684 PipeInputStream(cast(PipeFD)process.stdout), 685 PipeInputStream(cast(PipeFD)process.stderr) 686 ); 687 } 688 689 /// ditto 690 ProcessPipes pipeProcess( 691 scope string program, 692 Redirect redirect = Redirect.all, 693 const string[string] env = null, 694 Config config = Config.none, 695 scope NativePath workDir = NativePath.init) 696 { 697 return pipeProcess( 698 [program], 699 redirect, 700 env, 701 config, 702 workDir 703 ); 704 } 705 706 /// ditto 707 ProcessPipes pipeShell( 708 scope string command, 709 Redirect redirect = Redirect.all, 710 const string[string] env = null, 711 Config config = Config.none, 712 scope NativePath workDir = NativePath.init, 713 scope NativePath shellPath = nativeShell) 714 { 715 return pipeProcess( 716 shellCommand(command, nativeShell), 717 redirect, 718 env, 719 config, 720 workDir); 721 } 722 723 /** 724 Equivalent to `std.process.execute`. 725 726 Returns: 727 Tuple containing the exit status and process output. 728 729 See_Also: `spawnProcess`, `pipeProcess` 730 */ 731 auto execute( 732 scope string[] args, 733 const string[string] env = null, 734 Config config = Config.none, 735 size_t maxOutput = size_t.max, 736 scope NativePath workDir = NativePath.init) 737 @blocking { 738 return executeImpl!pipeProcess(args, env, config, maxOutput, workDir); 739 } 740 741 /// ditto 742 auto execute( 743 scope string program, 744 const string[string] env = null, 745 Config config = Config.none, 746 size_t maxOutput = size_t.max, 747 scope NativePath workDir = NativePath.init) 748 @blocking @trusted { 749 return executeImpl!pipeProcess(program, env, config, maxOutput, workDir); 750 } 751 752 /// ditto 753 auto executeShell( 754 scope string command, 755 const string[string] env = null, 756 Config config = Config.none, 757 size_t maxOutput = size_t.max, 758 scope NativePath workDir = null, 759 NativePath shellPath = nativeShell) 760 @blocking { 761 return executeImpl!pipeShell(command, env, config, maxOutput, workDir, shellPath); 762 } 763 764 private auto executeImpl(alias spawn, Cmd, Args...)( 765 Cmd command, 766 const string[string] env, 767 Config config, 768 size_t maxOutput, 769 scope NativePath workDir, 770 Args args) 771 @blocking { 772 Redirect redirect = Redirect.stdout; 773 774 auto processPipes = spawn(command, redirect, env, config, workDir, args); 775 776 auto stringOutput = processPipes.stdout.collectOutput(maxOutput); 777 778 return Tuple!(int, "status", string, "output")(processPipes.process.wait(), stringOutput); 779 } 780 781 /* 782 Collect the string output of a stream in a blocking fashion. 783 784 Params: 785 stream = The input stream to collect from. 786 nbytes = The maximum number of bytes to collect. 787 788 Returns: 789 The collected data from the stream as a string. 790 */ 791 /// private 792 string collectOutput(InputStream)(InputStream stream, size_t nbytes = size_t.max) 793 @blocking @trusted if (isInputStream!InputStream) { 794 auto output = appender!string(); 795 if (nbytes != size_t.max) { 796 output.reserve(nbytes); 797 } 798 799 import vibe.internal.allocator : theAllocator, dispose; 800 801 scope buffer = cast(ubyte[]) theAllocator.allocate(64*1024); 802 scope (exit) theAllocator.dispose(buffer); 803 804 while (!stream.empty && nbytes > 0) { 805 size_t chunk = min(nbytes, stream.leastSize, buffer.length); 806 assert(chunk > 0, "leastSize returned zero for non-empty stream."); 807 808 stream.read(buffer[0..chunk]); 809 output.put(buffer[0..chunk]); 810 } 811 812 return output.data; 813 }