1 /** 2 Functions and structures for dealing with subprocesses and pipes. 3 4 This module is modeled after std.process, but provides a fiber-aware 5 alternative to it. All blocking operations will yield the calling fiber 6 instead of blocking it. 7 */ 8 module vibe.core.process; 9 10 public import std.process : Pid, Redirect; 11 static import std.process; 12 13 import core.time; 14 import std.array; 15 import std.typecons; 16 import std.exception : enforce; 17 import std.algorithm; 18 import eventcore.core; 19 import vibe.core.path; 20 import vibe.core.log; 21 import vibe.core.stream; 22 import vibe.internal.async; 23 import vibe.internal.array : BatchBuffer; 24 import vibe.core.internal.release; 25 26 @safe: 27 28 /** 29 Register a process with vibe for fibre-aware handling. This process can be 30 started from anywhere including external libraries or std.process. 31 32 Params: 33 pid = A Pid or OS process id 34 */ 35 Process adoptProcessID(Pid pid) 36 @trusted { 37 return adoptProcessID(pid.processID); 38 } 39 40 /// ditto 41 Process adoptProcessID(int pid) 42 { 43 auto p = eventDriver.processes.adopt(pid); 44 if (p == ProcessID.invalid) 45 throw new Exception("Failed to adopt process ID"); 46 return Process(p); 47 } 48 49 /** 50 Path to the user's preferred command interpreter. 51 52 See_Also: `nativeShell` 53 */ 54 @property NativePath userShell() { return NativePath(std.process.userShell); } 55 56 /** 57 The platform specific native shell path. 58 59 See_Also: `userShell` 60 */ 61 const NativePath nativeShell = NativePath(std.process.nativeShell); 62 63 /** 64 Equivalent to `std.process.Config` except with less flag support 65 */ 66 enum Config { 67 none = ProcessConfig.none, 68 newEnv = ProcessConfig.newEnv, 69 suppressConsole = ProcessConfig.suppressConsole, 70 detached = ProcessConfig.detached, 71 } 72 73 /** 74 Equivalent to `std.process.spawnProcess`. 75 76 Returns: 77 A reference to the running process. 78 79 See_Also: `pipeProcess`, `execute` 80 */ 81 Process spawnProcess( 82 scope string[] args, 83 const string[string] env = null, 84 Config config = Config.none, 85 scope NativePath workDir = NativePath.init) 86 @trusted { 87 auto process = eventDriver.processes.spawn( 88 args, 89 ProcessStdinFile(ProcessRedirect.inherit), 90 ProcessStdoutFile(ProcessRedirect.inherit), 91 ProcessStderrFile(ProcessRedirect.inherit), 92 env, 93 config, 94 workDir.toNativeString()); 95 96 if (process.pid == ProcessID.invalid) 97 throw new Exception("Failed to spawn process"); 98 99 return Process(process.pid); 100 } 101 102 /// ditto 103 Process spawnProcess( 104 scope string program, 105 const string[string] env = null, 106 Config config = Config.none, 107 scope NativePath workDir = NativePath.init) 108 { 109 return spawnProcess( 110 [program], 111 env, 112 config, 113 workDir 114 ); 115 } 116 117 /// ditto 118 Process spawnShell( 119 scope string command, 120 const string[string] env = null, 121 Config config = Config.none, 122 scope NativePath workDir = NativePath.init, 123 scope NativePath shellPath = nativeShell) 124 { 125 return spawnProcess( 126 shellCommand(command, shellPath), 127 env, 128 config, 129 workDir); 130 } 131 132 private string[] shellCommand(string command, NativePath shellPath) 133 { 134 version (Windows) 135 { 136 // CMD does not parse its arguments like other programs. 137 // It does not use CommandLineToArgvW. 138 // Instead, it treats the first and last quote specially. 139 // See CMD.EXE /? for details. 140 return [ 141 std.process.escapeShellFileName(shellPath.toNativeString()) 142 ~ ` /C "` ~ command ~ `"` 143 ]; 144 } 145 else version (Posix) 146 { 147 return [ 148 shellPath.toNativeString(), 149 "-c", 150 command, 151 ]; 152 } 153 } 154 155 /** 156 Represents a running process. 157 */ 158 struct Process { 159 private static struct Context { 160 //Duration waitTimeout; 161 shared(NativeEventDriver) driver; 162 } 163 164 private { 165 ProcessID m_pid; 166 Context* m_context; 167 } 168 169 private this(ProcessID p) 170 nothrow { 171 assert(p != ProcessID.invalid); 172 m_pid = p; 173 m_context = () @trusted { return &eventDriver.processes.userData!Context(p); } (); 174 m_context.driver = () @trusted { return cast(shared)eventDriver; } (); 175 } 176 177 this(this) 178 nothrow { 179 if (m_pid != ProcessID.invalid) 180 eventDriver.processes.addRef(m_pid); 181 } 182 183 ~this() 184 nothrow { 185 if (m_pid != ProcessID.invalid) 186 releaseHandle!"processes"(m_pid, m_context.driver); 187 } 188 189 /** 190 Check whether this is a valid process handle. The process may have 191 exited already. 192 */ 193 bool opCast(T)() const nothrow if (is(T == bool)) { return m_pid != ProcessID.invalid; } 194 195 /// 196 unittest { 197 Process p; 198 199 assert(!p); 200 } 201 202 /** 203 An operating system handle to the process. 204 */ 205 @property int pid() const nothrow @nogc { return cast(int)m_pid; } 206 207 /** 208 Whether the process has exited. 209 */ 210 @property bool exited() const nothrow { return eventDriver.processes.hasExited(m_pid); } 211 212 /** 213 Wait for the process to exit, allowing other fibers to continue in the 214 meantime. 215 216 Params: 217 timeout = Optionally wait until a timeout is reached. 218 219 Returns: 220 The exit code of the process. If a timeout is given and reached, a 221 null value is returned. 222 */ 223 int wait() 224 @blocking { 225 return asyncAwaitUninterruptible!(ProcessWaitCallback, 226 cb => eventDriver.processes.wait(m_pid, cb) 227 )[1]; 228 } 229 230 /// Ditto 231 Nullable!int wait(Duration timeout) 232 @blocking { 233 size_t waitId; 234 bool cancelled = false; 235 236 int code = asyncAwaitUninterruptible!(ProcessWaitCallback, 237 (cb) nothrow @safe { 238 waitId = eventDriver.processes.wait(m_pid, cb); 239 }, 240 (cb) nothrow @safe { 241 eventDriver.processes.cancelWait(m_pid, waitId); 242 cancelled = true; 243 }, 244 )(timeout)[1]; 245 246 if (cancelled) { 247 return Nullable!int.init; 248 } else { 249 return code.nullable; 250 } 251 } 252 253 /** 254 Kill the process. 255 256 By default on Linux this sends SIGTERM to the process. 257 258 Params: 259 signal = Optional parameter for the signal to send to the process. 260 */ 261 void kill() 262 { 263 version (Posix) 264 { 265 import core.sys.posix.signal : SIGTERM; 266 eventDriver.processes.kill(m_pid, SIGTERM); 267 } 268 else 269 { 270 eventDriver.processes.kill(m_pid, 1); 271 } 272 } 273 274 /// ditto 275 void kill(int signal) 276 { 277 eventDriver.processes.kill(m_pid, signal); 278 } 279 280 /** 281 Terminate the process immediately. 282 283 On Linux this sends SIGKILL to the process. 284 */ 285 void forceKill() 286 { 287 version (Posix) 288 { 289 import core.sys.posix.signal : SIGKILL; 290 eventDriver.processes.kill(m_pid, SIGKILL); 291 } 292 else 293 { 294 eventDriver.processes.kill(m_pid, 1); 295 } 296 } 297 298 /** 299 Wait for the process to exit until a timeout is reached. If the process 300 doesn't exit before the timeout, force kill it. 301 302 Returns: 303 The process exit code. 304 */ 305 int waitOrForceKill(Duration timeout) 306 @blocking { 307 auto code = wait(timeout); 308 309 if (code.isNull) { 310 forceKill(); 311 return wait(); 312 } else { 313 return code.get; 314 } 315 } 316 } 317 318 /** 319 A stream for tBatchBufferhe write end of a pipe. 320 */ 321 struct PipeInputStream { 322 private static struct Context { 323 BatchBuffer!ubyte readBuffer; 324 shared(NativeEventDriver) driver; 325 } 326 327 private { 328 PipeFD m_pipe; 329 Context* m_context; 330 } 331 332 private this(PipeFD pipe) 333 nothrow { 334 m_pipe = pipe; 335 if (pipe != PipeFD.invalid) { 336 m_context = () @trusted { return &eventDriver.pipes.userData!Context(pipe); } (); 337 m_context.readBuffer.capacity = 4096; 338 m_context.driver = () @trusted { return cast(shared)eventDriver; } (); 339 } 340 } 341 342 this(this) 343 nothrow { 344 if (m_pipe != PipeFD.invalid) 345 eventDriver.pipes.addRef(m_pipe); 346 } 347 348 ~this() 349 nothrow { 350 if (m_pipe != PipeFD.invalid) 351 releaseHandle!"pipes"(m_pipe, m_context.driver); 352 } 353 354 bool opCast(T)() const nothrow if (is(T == bool)) { return m_pipes != PipeFD.invalid; } 355 356 @property bool empty() @blocking { return leastSize == 0; } 357 @property ulong leastSize() 358 @blocking { 359 waitForData(); 360 return m_context ? m_context.readBuffer.length : 0; 361 } 362 @property bool dataAvailableForRead() { return waitForData(0.seconds); } 363 364 bool waitForData(Duration timeout = Duration.max) 365 @blocking { 366 if (!m_context) return false; 367 if (m_context.readBuffer.length > 0) return true; 368 auto mode = timeout <= 0.seconds ? IOMode.immediate : IOMode.once; 369 370 bool cancelled; 371 IOStatus status; 372 size_t nbytes; 373 374 alias waiter = Waitable!(PipeIOCallback, 375 cb => eventDriver.pipes.read(m_pipe, m_context.readBuffer.peekDst(), mode, cb), 376 (cb) { 377 cancelled = true; 378 eventDriver.pipes.cancelRead(m_pipe); 379 }, 380 (pipe, st, nb) { 381 // Handle closed pipes 382 if (m_pipe == PipeFD.invalid) { 383 cancelled = true; 384 return; 385 } 386 387 assert(pipe == m_pipe); 388 status = st; 389 nbytes = nb; 390 } 391 ); 392 393 asyncAwaitAny!(true, waiter)(timeout); 394 395 if (cancelled || !m_context) return false; 396 397 logTrace("Pipe %s, read %s bytes: %s", m_pipe, nbytes, status); 398 399 assert(m_context.readBuffer.length == 0); 400 m_context.readBuffer.putN(nbytes); 401 switch (status) { 402 case IOStatus.ok: break; 403 case IOStatus.disconnected: break; 404 case IOStatus.wouldBlock: 405 assert(mode == IOMode.immediate); 406 break; 407 default: 408 logDebug("Error status when waiting for data: %s", status); 409 break; 410 } 411 412 return m_context.readBuffer.length > 0; 413 } 414 415 const(ubyte)[] peek() { return m_context ? m_context.readBuffer.peek() : null; } 416 417 size_t read(scope ubyte[] dst, IOMode mode) 418 @blocking { 419 if (dst.empty) return 0; 420 421 if (m_context.readBuffer.length >= dst.length) { 422 m_context.readBuffer.read(dst); 423 return dst.length; 424 } 425 426 size_t nbytes = 0; 427 428 while (true) { 429 if (m_context.readBuffer.length == 0) { 430 if (mode == IOMode.immediate || mode == IOMode.once && nbytes > 0) 431 break; 432 433 enforce(waitForData(), "Reached end of stream while reading data."); 434 } 435 436 assert(m_context.readBuffer.length > 0); 437 auto l = min(dst.length, m_context.readBuffer.length); 438 m_context.readBuffer.read(dst[0 .. l]); 439 dst = dst[l .. $]; 440 nbytes += l; 441 if (dst.length == 0) 442 break; 443 } 444 445 return nbytes; 446 } 447 448 void read(scope ubyte[] dst) 449 @blocking { 450 auto r = read(dst, IOMode.all); 451 assert(r == dst.length); 452 } 453 454 /** 455 Close the read end of the pipe immediately. 456 457 Make sure that the pipe is not used after this is called and is released 458 as soon as possible. Due to implementation detail in eventcore this 459 reference could conflict with future pipes. 460 */ 461 void close() 462 nothrow { 463 if (m_pipe == PipeFD.invalid) return; 464 465 asyncAwaitUninterruptible!(PipeCloseCallback, 466 cb => eventDriver.pipes.close(m_pipe, cb) 467 ); 468 469 eventDriver.pipes.releaseRef(m_pipe); 470 m_pipe = PipeFD.invalid; 471 } 472 } 473 474 mixin validateInputStream!PipeInputStream; 475 476 /** 477 Stream for the read end of a pipe. 478 */ 479 struct PipeOutputStream { 480 private static struct Context { 481 shared(NativeEventDriver) driver; 482 } 483 484 private { 485 PipeFD m_pipe; 486 Context* m_context; 487 } 488 489 private this(PipeFD pipe) 490 nothrow { 491 m_pipe = pipe; 492 if (pipe != PipeFD.invalid) { 493 m_context = () @trusted { return &eventDriver.pipes.userData!Context(pipe); } (); 494 m_context.driver = () @trusted { return cast(shared)eventDriver; } (); 495 } 496 } 497 498 this(this) 499 nothrow { 500 if (m_pipe != PipeFD.invalid) 501 eventDriver.pipes.addRef(m_pipe); 502 } 503 504 ~this() 505 nothrow { 506 if (m_pipe != PipeFD.invalid) 507 releaseHandle!"pipes"(m_pipe, m_context.driver); 508 } 509 510 bool opCast(T)() const nothrow if (is(T == bool)) { return m_pipes != PipeFD.invalid; } 511 512 size_t write(in ubyte[] bytes, IOMode mode) 513 @blocking { 514 if (bytes.empty) return 0; 515 516 auto res = asyncAwait!(PipeIOCallback, 517 cb => eventDriver.pipes.write(m_pipe, bytes, mode, cb), 518 cb => eventDriver.pipes.cancelWrite(m_pipe)); 519 520 switch (res[1]) { 521 case IOStatus.ok: break; 522 case IOStatus.disconnected: break; 523 default: 524 throw new Exception("Error writing data to pipe."); 525 } 526 527 return res[2]; 528 } 529 530 void write(in ubyte[] bytes) @blocking { auto r = write(bytes, IOMode.all); assert(r == bytes.length); } 531 void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); } 532 533 void flush() {} 534 void finalize() {} 535 536 /** 537 Close the write end of the pipe immediately. 538 539 Make sure that the pipe is not used after this is called and is released 540 as soon as possible. Due to implementation detail in eventcore this 541 reference could conflict with future pipes. 542 */ 543 void close() 544 nothrow { 545 if (m_pipe == PipeFD.invalid) return; 546 547 asyncAwaitUninterruptible!(PipeCloseCallback, 548 cb => eventDriver.pipes.close(m_pipe, cb) 549 ); 550 551 eventDriver.pipes.releaseRef(m_pipe); 552 m_pipe = PipeFD.invalid; 553 } 554 } 555 556 mixin validateOutputStream!PipeOutputStream; 557 558 /** 559 A pipe created by `pipe`. 560 */ 561 struct Pipe { 562 /// Read end of the pipe 563 PipeInputStream readEnd; 564 /// Write end of the pipe 565 PipeOutputStream writeEnd; 566 567 /** 568 Close both ends of the pipe 569 */ 570 void close() 571 nothrow { 572 writeEnd.close(); 573 readEnd.close(); 574 } 575 } 576 577 /** 578 Create a pipe, async equivalent of `std.process.pipe`. 579 580 Returns: 581 A stream for each end of the pipe. 582 */ 583 Pipe pipe() 584 { 585 auto p = std.process.pipe(); 586 587 auto read = eventDriver.pipes.adopt(p.readEnd.fileno); 588 auto write = eventDriver.pipes.adopt(p.writeEnd.fileno); 589 590 return Pipe(PipeInputStream(read), PipeOutputStream(write)); 591 } 592 593 /** 594 Returned from `pipeProcess`. 595 596 See_Also: `pipeProcess`, `pipeShell` 597 */ 598 struct ProcessPipes { 599 Process process; 600 PipeOutputStream stdin; 601 PipeInputStream stdout; 602 PipeInputStream stderr; 603 } 604 605 /** 606 Equivalent to `std.process.pipeProcess`. 607 608 Returns: 609 A struct containing the process and created pipes. 610 611 See_Also: `spawnProcess`, `execute` 612 */ 613 ProcessPipes pipeProcess( 614 scope string[] args, 615 Redirect redirect = Redirect.all, 616 const string[string] env = null, 617 Config config = Config.none, 618 scope NativePath workDir = NativePath.init) 619 @trusted { 620 auto stdin = ProcessStdinFile(ProcessRedirect.inherit); 621 if (Redirect.stdin & redirect) { 622 stdin = ProcessStdinFile(ProcessRedirect.pipe); 623 } 624 625 auto stdout = ProcessStdoutFile(ProcessRedirect.inherit); 626 if (Redirect.stdoutToStderr & redirect) { 627 stdout = ProcessStdoutFile(ProcessStdoutRedirect.toStderr); 628 } else if (Redirect.stdout & redirect) { 629 stdout = ProcessStdoutFile(ProcessRedirect.pipe); 630 } 631 632 auto stderr = ProcessStderrFile(ProcessRedirect.inherit); 633 if (Redirect.stderrToStdout & redirect) { 634 stderr = ProcessStderrFile(ProcessStderrRedirect.toStdout); 635 } else if (Redirect.stderr & redirect) { 636 stderr = ProcessStderrFile(ProcessRedirect.pipe); 637 } 638 639 auto process = eventDriver.processes.spawn( 640 args, 641 stdin, 642 stdout, 643 stderr, 644 env, 645 config, 646 workDir.toNativeString()); 647 648 if (process.pid == ProcessID.invalid) 649 throw new Exception("Failed to spawn process"); 650 651 return ProcessPipes( 652 Process(process.pid), 653 PipeOutputStream(cast(PipeFD)process.stdin), 654 PipeInputStream(cast(PipeFD)process.stdout), 655 PipeInputStream(cast(PipeFD)process.stderr) 656 ); 657 } 658 659 /// ditto 660 ProcessPipes pipeProcess( 661 scope string program, 662 Redirect redirect = Redirect.all, 663 const string[string] env = null, 664 Config config = Config.none, 665 scope NativePath workDir = NativePath.init) 666 { 667 return pipeProcess( 668 [program], 669 redirect, 670 env, 671 config, 672 workDir 673 ); 674 } 675 676 /// ditto 677 ProcessPipes pipeShell( 678 scope string command, 679 Redirect redirect = Redirect.all, 680 const string[string] env = null, 681 Config config = Config.none, 682 scope NativePath workDir = NativePath.init, 683 scope NativePath shellPath = nativeShell) 684 { 685 return pipeProcess( 686 shellCommand(command, nativeShell), 687 redirect, 688 env, 689 config, 690 workDir); 691 } 692 693 /** 694 Equivalent to `std.process.execute`. 695 696 Returns: 697 Tuple containing the exit status and process output. 698 699 See_Also: `spawnProcess`, `pipeProcess` 700 */ 701 auto execute( 702 scope string[] args, 703 const string[string] env = null, 704 Config config = Config.none, 705 size_t maxOutput = size_t.max, 706 scope NativePath workDir = NativePath.init) 707 @blocking { 708 return executeImpl!pipeProcess(args, env, config, maxOutput, workDir); 709 } 710 711 /// ditto 712 auto execute( 713 scope string program, 714 const string[string] env = null, 715 Config config = Config.none, 716 size_t maxOutput = size_t.max, 717 scope NativePath workDir = NativePath.init) 718 @blocking @trusted { 719 return executeImpl!pipeProcess(program, env, config, maxOutput, workDir); 720 } 721 722 /// ditto 723 auto executeShell( 724 scope string command, 725 const string[string] env = null, 726 Config config = Config.none, 727 size_t maxOutput = size_t.max, 728 scope NativePath workDir = null, 729 NativePath shellPath = nativeShell) 730 @blocking { 731 return executeImpl!pipeShell(command, env, config, maxOutput, workDir, shellPath); 732 } 733 734 private auto executeImpl(alias spawn, Cmd, Args...)( 735 Cmd command, 736 const string[string] env, 737 Config config, 738 size_t maxOutput, 739 scope NativePath workDir, 740 Args args) 741 @blocking { 742 Redirect redirect = Redirect.stdout; 743 744 auto processPipes = spawn(command, redirect, env, config, workDir, args); 745 746 auto stringOutput = processPipes.stdout.collectOutput(maxOutput); 747 748 return Tuple!(int, "status", string, "output")(processPipes.process.wait(), stringOutput); 749 } 750 751 /* 752 Collect the string output of a stream in a blocking fashion. 753 754 Params: 755 stream = The input stream to collect from. 756 nbytes = The maximum number of bytes to collect. 757 758 Returns: 759 The collected data from the stream as a string. 760 */ 761 /// private 762 string collectOutput(InputStream)(InputStream stream, size_t nbytes = size_t.max) 763 @blocking @trusted if (isInputStream!InputStream) { 764 auto output = appender!string(); 765 if (nbytes != size_t.max) { 766 output.reserve(nbytes); 767 } 768 769 import vibe.internal.allocator : theAllocator, dispose; 770 771 scope buffer = cast(ubyte[]) theAllocator.allocate(64*1024); 772 scope (exit) theAllocator.dispose(buffer); 773 774 while (!stream.empty && nbytes > 0) { 775 size_t chunk = min(nbytes, stream.leastSize, buffer.length); 776 assert(chunk > 0, "leastSize returned zero for non-empty stream."); 777 778 stream.read(buffer[0..chunk]); 779 output.put(buffer[0..chunk]); 780 } 781 782 return output.data; 783 }