mirror of
https://github.com/amphp/parallel.git
synced 2025-02-22 13:52:23 +01:00
Use amphp/process package
This commit is contained in:
parent
08f6eb1958
commit
dc1aea8654
21
bin/worker
21
bin/worker
@ -1,12 +1,10 @@
|
||||
#!/usr/bin/env php
|
||||
<?php declare(strict_types = 1);
|
||||
|
||||
use Amp\Parallel\{ ChannelException, SerializationException} ;
|
||||
use Amp\Parallel\Sync\{ ChannelledSocket, Internal\ExitFailure, Internal\ExitSuccess };
|
||||
use Amp\Parallel\Worker\{ BasicEnvironment, Internal\TaskRunner };
|
||||
|
||||
@cli_set_process_title("amp-worker");
|
||||
error_reporting(E_ALL);
|
||||
@cli_set_process_title('amp-worker');
|
||||
|
||||
// Redirect all output written using echo, print, printf, etc. to STDERR.
|
||||
ob_start(function ($data) {
|
||||
@ -16,8 +14,8 @@ ob_start(function ($data) {
|
||||
|
||||
(function () {
|
||||
$paths = [
|
||||
dirname(__DIR__, 3) . DIRECTORY_SEPARATOR . 'autoload.php',
|
||||
dirname(__DIR__) . DIRECTORY_SEPARATOR . 'vendor' . DIRECTORY_SEPARATOR . 'autoload.php',
|
||||
dirname(__DIR__, 3) . '/autoload.php',
|
||||
dirname(__DIR__) . '/vendor/autoload.php',
|
||||
];
|
||||
|
||||
$autoloadPath = null;
|
||||
@ -47,16 +45,5 @@ AsyncInterop\Loop::execute(Amp\wrap(function () {
|
||||
$result = new ExitFailure($exception);
|
||||
}
|
||||
|
||||
// Attempt to return the result.
|
||||
try {
|
||||
try {
|
||||
return yield $channel->send($result);
|
||||
} catch (SerializationException $exception) {
|
||||
// Serializing the result failed. Send the reason why.
|
||||
return yield $channel->send(new ExitFailure($exception));
|
||||
}
|
||||
} catch (ChannelException $exception) {
|
||||
// The result was not sendable! The parent context must have died or killed the context.
|
||||
return 0;
|
||||
}
|
||||
$channel->send($result); // Do not yield sending result on channel, process does not care if result arrives.
|
||||
}));
|
||||
|
@ -21,7 +21,8 @@
|
||||
}
|
||||
],
|
||||
"require": {
|
||||
"amphp/amp": "^2.0"
|
||||
"amphp/amp": "^2.0",
|
||||
"amphp/process": "dev-amp_v2 as 0.2"
|
||||
},
|
||||
"require-dev": {
|
||||
"amphp/loop": "dev-master",
|
||||
|
@ -2,24 +2,29 @@
|
||||
|
||||
namespace Amp\Parallel\Process;
|
||||
|
||||
use Amp\Parallel\{ Process as ProcessContext, StatusError, Strand, SynchronizationError };
|
||||
use Amp\Coroutine;
|
||||
use Amp\Parallel\{ ContextException, Process as ProcessContext, StatusError, Strand, SynchronizationError };
|
||||
use Amp\Parallel\Sync\{ ChannelledSocket, Internal\ExitStatus };
|
||||
use Amp\Process\Process;
|
||||
use AsyncInterop\Promise;
|
||||
|
||||
class ChannelledProcess implements ProcessContext, Strand {
|
||||
/** @var \Amp\Parallel\Process\Process */
|
||||
/** @var \Amp\Process\Process */
|
||||
private $process;
|
||||
|
||||
/** @var \Amp\Parallel\Sync\Channel */
|
||||
private $channel;
|
||||
|
||||
/** @var \AsyncInterop\Promise */
|
||||
private $promise;
|
||||
|
||||
/**
|
||||
* @param string $path Path to PHP script.
|
||||
* @param string $cwd Working directory.
|
||||
* @param mixed[] $env Array of environment variables.
|
||||
*/
|
||||
public function __construct(string $path, string $cwd = "", array $env = []) {
|
||||
$command = \PHP_BINARY . " " . $path;
|
||||
$command = \PHP_BINARY . " " . \escapeshellarg($path);
|
||||
$this->process = new Process($command, $cwd, $env);
|
||||
}
|
||||
|
||||
@ -35,7 +40,7 @@ class ChannelledProcess implements ProcessContext, Strand {
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function start() {
|
||||
$this->process->start();
|
||||
$this->promise = $this->process->execute();
|
||||
$this->channel = new ChannelledSocket($this->process->getStdOut(), $this->process->getStdIn(), false);
|
||||
}
|
||||
|
||||
@ -58,7 +63,7 @@ class ChannelledProcess implements ProcessContext, Strand {
|
||||
if ($data instanceof ExitStatus) {
|
||||
$data = $data->getResult();
|
||||
throw new SynchronizationError(\sprintf(
|
||||
"Thread unexpectedly exited with result of type: %s",
|
||||
"Process unexpectedly exited with result of type: %s",
|
||||
\is_object($data) ? \get_class($data) : \gettype($data)
|
||||
));
|
||||
}
|
||||
@ -86,7 +91,30 @@ class ChannelledProcess implements ProcessContext, Strand {
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function join(): Promise {
|
||||
return $this->process->join();
|
||||
if ($this->channel === null) {
|
||||
throw new StatusError("The process has not been started");
|
||||
}
|
||||
|
||||
return new Coroutine($this->doJoin());
|
||||
}
|
||||
|
||||
private function doJoin(): \Generator {
|
||||
try {
|
||||
$data = yield $this->channel->receive();
|
||||
if (!$data instanceof ExitStatus) {
|
||||
throw new SynchronizationError("Did not receive an exit status from process");
|
||||
}
|
||||
} catch (\Throwable $exception) {
|
||||
$this->kill();
|
||||
throw $exception;
|
||||
}
|
||||
|
||||
$code = yield $this->promise;
|
||||
if ($code !== 0) {
|
||||
throw new ContextException(\sprintf("Process exited with code %d", $code));
|
||||
}
|
||||
|
||||
return $data->getResult();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1,347 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Parallel\Process;
|
||||
|
||||
use Amp\Deferred;
|
||||
use Amp\Parallel\{ ContextException, Process as ProcessContext, StatusError };
|
||||
use AsyncInterop\{ Loop, Promise };
|
||||
|
||||
class Process implements ProcessContext {
|
||||
/** @var resource|null */
|
||||
private $process;
|
||||
|
||||
/** @var string */
|
||||
private $command;
|
||||
|
||||
/** @var string */
|
||||
private $cwd = "";
|
||||
|
||||
/** @var array */
|
||||
private $env = [];
|
||||
|
||||
/** @var array */
|
||||
private $options;
|
||||
|
||||
/** @var resource|null */
|
||||
private $stdin;
|
||||
|
||||
/** @var resource|null */
|
||||
private $stdout;
|
||||
|
||||
/** @var resource|null */
|
||||
private $stderr;
|
||||
|
||||
/** @var int */
|
||||
private $pid = 0;
|
||||
|
||||
/** @var int */
|
||||
private $oid = 0;
|
||||
|
||||
/** @var \Amp\Deferred|null */
|
||||
private $deferred;
|
||||
|
||||
/** @var string */
|
||||
private $watcher;
|
||||
|
||||
/**
|
||||
* @param string $command Command to run.
|
||||
* @param string|null $cwd Working directory or use an empty string to use the working directory of the current
|
||||
* PHP process.
|
||||
* @param mixed[] $env Environment variables or use an empty array to inherit from the current PHP process.
|
||||
* @param mixed[] $options Options for proc_open().
|
||||
*/
|
||||
public function __construct(string $command, string $cwd = null, array $env = [], array $options = []) {
|
||||
$this->command = $command;
|
||||
|
||||
if ($cwd !== null) {
|
||||
$this->cwd = $cwd;
|
||||
}
|
||||
|
||||
foreach ($env as $key => $value) {
|
||||
if (!\is_array($value)) { // $env cannot accept array values.
|
||||
$this->env[(string) $key] = (string) $value;
|
||||
}
|
||||
}
|
||||
|
||||
$this->options = $options;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the process if it is still running.
|
||||
*/
|
||||
public function __destruct() {
|
||||
if (\getmypid() === $this->oid) {
|
||||
$this->kill(); // Will only terminate if the process is still running.
|
||||
}
|
||||
|
||||
Loop::cancel($this->watcher);
|
||||
|
||||
if (\is_resource($this->stdin)) {
|
||||
\fclose($this->stdin);
|
||||
}
|
||||
|
||||
if (\is_resource($this->stdout)) {
|
||||
\fclose($this->stdout);
|
||||
}
|
||||
|
||||
if (\is_resource($this->stderr)) {
|
||||
\fclose($this->stderr);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Resets process values.
|
||||
*/
|
||||
public function __clone() {
|
||||
$this->process = null;
|
||||
$this->deferred = null;
|
||||
$this->watcher = null;
|
||||
$this->pid = 0;
|
||||
$this->oid = 0;
|
||||
$this->stdin = null;
|
||||
$this->stdout = null;
|
||||
$this->stderr = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws \Amp\Parallel\ContextException If starting the process fails.
|
||||
* @throws \Amp\Parallel\StatusError If the process is already running.
|
||||
*/
|
||||
public function start() {
|
||||
if ($this->deferred !== null) {
|
||||
throw new StatusError("The process has already been started");
|
||||
}
|
||||
|
||||
$this->deferred = $deferred = new Deferred;
|
||||
|
||||
$fd = [
|
||||
["pipe", "r"], // stdin
|
||||
["pipe", "w"], // stdout
|
||||
["pipe", "w"], // stderr
|
||||
["pipe", "w"], // exit code pipe
|
||||
];
|
||||
|
||||
$nd = \strncasecmp(\PHP_OS, "WIN", 3) === 0 ? "NUL" : "/dev/null";
|
||||
|
||||
$command = \sprintf('(%s) 3>%s; code=$?; echo $code >&3; exit $code', $this->command, $nd);
|
||||
|
||||
$this->process = @\proc_open($command, $fd, $pipes, $this->cwd ?: null, $this->env ?: null, $this->options);
|
||||
|
||||
if (!\is_resource($this->process)) {
|
||||
$message = "Could not start process";
|
||||
if ($error = \error_get_last()) {
|
||||
$message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]);
|
||||
}
|
||||
throw new ContextException($message);
|
||||
}
|
||||
|
||||
$this->oid = \getmypid();
|
||||
$status = \proc_get_status($this->process);
|
||||
|
||||
if (!$status) {
|
||||
\proc_close($this->process);
|
||||
$this->process = null;
|
||||
throw new ContextException("Could not get process status");
|
||||
}
|
||||
|
||||
$this->pid = $status["pid"];
|
||||
|
||||
foreach ($pipes as $pipe) {
|
||||
\stream_set_blocking($pipe, false);
|
||||
}
|
||||
|
||||
$this->stdin = $stdin = $pipes[0];
|
||||
$this->stdout = $pipes[1];
|
||||
$this->stderr = $pipes[2];
|
||||
$stream = $pipes[3];
|
||||
|
||||
$process = &$this->process;
|
||||
|
||||
$this->watcher = Loop::onReadable($stream, static function ($watcher, $resource) use (
|
||||
&$process, $deferred, $stdin
|
||||
) {
|
||||
Loop::cancel($watcher);
|
||||
|
||||
try {
|
||||
try {
|
||||
if (!\is_resource($resource) || \feof($resource)) {
|
||||
throw new ContextException("Process ended unexpectedly");
|
||||
}
|
||||
$code = @\fread($resource, 1);
|
||||
if (!\strlen($code) || !\is_numeric($code)) {
|
||||
throw new ContextException("Process ended without providing a status code");
|
||||
}
|
||||
} finally {
|
||||
if (\is_resource($resource)) {
|
||||
\fclose($resource);
|
||||
}
|
||||
if (\is_resource($process)) {
|
||||
\proc_close($process);
|
||||
$process = null;
|
||||
}
|
||||
if (\is_resource($stdin)) {
|
||||
\fclose($stdin);
|
||||
}
|
||||
}
|
||||
} catch (\Throwable $exception) {
|
||||
$deferred->fail($exception);
|
||||
return;
|
||||
}
|
||||
|
||||
$deferred->resolve((int) $code);
|
||||
});
|
||||
|
||||
Loop::disable($this->watcher);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return \AsyncInterop\Promise<int> Resolves with exit status.
|
||||
*
|
||||
* @throws \Amp\Parallel\StatusError If the process has not been started.
|
||||
*/
|
||||
public function join(): Promise {
|
||||
if ($this->deferred === null) {
|
||||
throw new StatusError("The process has not been started");
|
||||
}
|
||||
|
||||
Loop::enable($this->watcher);
|
||||
|
||||
return $this->deferred->promise();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function kill() {
|
||||
if (\is_resource($this->process)) {
|
||||
// Forcefully kill the process using SIGKILL.
|
||||
\proc_terminate($this->process, 9);
|
||||
|
||||
// "Detach" from the process and let it die asynchronously.
|
||||
$this->process = null;
|
||||
|
||||
Loop::cancel($this->watcher);
|
||||
|
||||
$this->deferred->fail(new ContextException("The process was killed"));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends the given signal to the process.
|
||||
*
|
||||
* @param int $signo Signal number to send to process.
|
||||
*
|
||||
* @throws \Amp\Parallel\StatusError If the process is not running.
|
||||
*/
|
||||
public function signal(int $signo) {
|
||||
if (!$this->isRunning()) {
|
||||
throw new StatusError("The process is not running");
|
||||
}
|
||||
|
||||
\proc_terminate($this->process, (int) $signo);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the PID of the child process. Value is only meaningful if the process has been started and PHP was not
|
||||
* compiled with --enable-sigchild.
|
||||
*
|
||||
* @return int
|
||||
*/
|
||||
public function getPid(): int {
|
||||
return $this->pid;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the command to execute.
|
||||
*
|
||||
* @return string The command to execute.
|
||||
*/
|
||||
public function getCommand(): string {
|
||||
return $this->command;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the current working directory.
|
||||
*
|
||||
* @return string The current working directory or null if inherited from the current PHP process.
|
||||
*/
|
||||
public function getWorkingDirectory(): string {
|
||||
if ($this->cwd === "") {
|
||||
return \getcwd() ?: "";
|
||||
}
|
||||
|
||||
return $this->cwd;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the environment variables array.
|
||||
*
|
||||
* @return mixed[] Array of environment variables.
|
||||
*/
|
||||
public function getEnv(): array {
|
||||
return $this->env;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the options to pass to proc_open().
|
||||
*
|
||||
* @return mixed[] Array of options.
|
||||
*/
|
||||
public function getOptions(): array {
|
||||
return $this->options;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines if the process is still running.
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function isRunning(): bool {
|
||||
return \is_resource($this->process);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the process input stream (STDIN).
|
||||
*
|
||||
* @return resource
|
||||
*
|
||||
* @throws \Amp\Parallel\StatusError If the process is not running.
|
||||
*/
|
||||
public function getStdIn() {
|
||||
if ($this->stdin === null) {
|
||||
throw new StatusError("The process has not been started");
|
||||
}
|
||||
|
||||
return $this->stdin;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the process output stream (STDOUT).
|
||||
*
|
||||
* @return resource
|
||||
*
|
||||
* @throws \Amp\Parallel\StatusError If the process is not running.
|
||||
*/
|
||||
public function getStdOut() {
|
||||
if ($this->stdout === null) {
|
||||
throw new StatusError("The process has not been started");
|
||||
}
|
||||
|
||||
return $this->stdout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the process error stream (STDERR).
|
||||
*
|
||||
* @return resource
|
||||
*
|
||||
* @throws \Amp\Parallel\StatusError If the process is not running.
|
||||
*/
|
||||
public function getStdErr() {
|
||||
if ($this->stderr === null) {
|
||||
throw new StatusError("The process has not been started");
|
||||
}
|
||||
|
||||
return $this->stderr;
|
||||
}
|
||||
}
|
@ -1,239 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Parallel\Process;
|
||||
|
||||
use Amp\{ Coroutine, Deferred, Emitter, Failure, Stream, Success };
|
||||
use Amp\Parallel\{ ContextException, Process as ProcessContext, StatusError };
|
||||
use AsyncInterop\{ Loop, Promise };
|
||||
|
||||
class StreamedProcess implements ProcessContext {
|
||||
const CHUNK_SIZE = 8192;
|
||||
|
||||
/** @var \Amp\Parallel\Process\Process */
|
||||
private $process;
|
||||
|
||||
/** @var \Amp\Emitter Emits bytes read from STDOUT. */
|
||||
private $stdoutEmitter;
|
||||
|
||||
/** @var \Amp\Emitter Emits bytes read from STDERR. */
|
||||
private $stderrEmitter;
|
||||
|
||||
/** @var string|null */
|
||||
private $stdinWatcher;
|
||||
|
||||
/** @var string|null */
|
||||
private $stdoutWatcher;
|
||||
|
||||
/** @var string|null */
|
||||
private $stderrWatcher;
|
||||
|
||||
/** @var \SplQueue Queue of data to write to STDIN. */
|
||||
private $writeQueue;
|
||||
|
||||
/** @var \AsyncInterop\Promise Promise resolved when process ends. */
|
||||
private $promise;
|
||||
|
||||
/**
|
||||
* @param string $command Command to run.
|
||||
* @param string|null $cwd Working directory or use an empty string to use the working directory of the current
|
||||
* PHP process.
|
||||
* @param mixed[] $env Environment variables or use an empty array to inherit from the current PHP process.
|
||||
* @param mixed[] $options Options for proc_open().
|
||||
*/
|
||||
public function __construct(string $command, string $cwd = null, array $env = [], array $options = []) {
|
||||
$this->process = new Process($command, $cwd, $env, $options);
|
||||
$this->stdoutEmitter = new Emitter;
|
||||
$this->stderrEmitter = new Emitter;
|
||||
$this->writeQueue = new \SplQueue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resets process values.
|
||||
*/
|
||||
public function __clone() {
|
||||
$this->process = clone $this->process;
|
||||
$this->stdinWatcher = null;
|
||||
$this->stdoutWatcher = null;
|
||||
$this->stderrWatcher = null;
|
||||
$this->stdoutEmitter = new Emitter;
|
||||
$this->stderrEmitter = new Emitter;
|
||||
$this->writeQueue = new \SplQueue;
|
||||
$this->promise = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function start() {
|
||||
$this->process->start();
|
||||
|
||||
$writes = $this->writeQueue;
|
||||
$this->stdinWatcher = Loop::onWritable($this->process->getStdIn(), static function ($watcher, $resource) use ($writes) {
|
||||
while (!$writes->isEmpty()) {
|
||||
/** @var \Amp\Deferred $deferred */
|
||||
list($data, $previous, $deferred) = $writes->shift();
|
||||
$length = \strlen($data);
|
||||
|
||||
if ($length === 0) {
|
||||
$deferred->resolve(0);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full.
|
||||
$written = @\fwrite($resource, $data);
|
||||
|
||||
if ($written === false || $written === 0) {
|
||||
$message = "Failed to write to STDIN";
|
||||
if ($error = \error_get_last()) {
|
||||
$message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]);
|
||||
}
|
||||
$deferred->fail(new ContextException($message));
|
||||
return;
|
||||
}
|
||||
|
||||
if ($length <= $written) {
|
||||
$deferred->resolve($written + $previous);
|
||||
continue;
|
||||
}
|
||||
|
||||
$data = \substr($data, $written);
|
||||
$writes->unshift([$data, $written + $previous, $deferred]);
|
||||
return;
|
||||
}
|
||||
});
|
||||
Loop::disable($this->stdinWatcher);
|
||||
|
||||
$callback = static function ($watcher, $resource, Emitter $emitter) {
|
||||
// Error reporting suppressed since fread() produces a warning if the stream unexpectedly closes.
|
||||
if (@\feof($resource) || ($data = @\fread($resource, self::CHUNK_SIZE)) === false) {
|
||||
Loop::disable($watcher);
|
||||
return;
|
||||
}
|
||||
|
||||
if ($data !== "") {
|
||||
$emitter->emit($data);
|
||||
}
|
||||
};
|
||||
|
||||
$this->stdoutWatcher = Loop::onReadable($this->process->getStdOut(), $callback, $this->stdoutEmitter);
|
||||
$this->stderrWatcher = Loop::onReadable($this->process->getStdErr(), $callback, $this->stderrEmitter);
|
||||
|
||||
$this->promise = $this->process->join();
|
||||
$this->promise->when(function (\Throwable $exception = null, int $code = null) {
|
||||
Loop::cancel($this->stdinWatcher);
|
||||
Loop::cancel($this->stdoutWatcher);
|
||||
Loop::cancel($this->stderrWatcher);
|
||||
|
||||
if ($exception) {
|
||||
$this->stdoutEmitter->fail($exception);
|
||||
$this->stderrEmitter->fail($exception);
|
||||
return;
|
||||
}
|
||||
|
||||
$this->stdoutEmitter->resolve($code);
|
||||
$this->stderrEmitter->resolve($code);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function isRunning(): bool {
|
||||
return $this->process->isRunning();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $data
|
||||
*
|
||||
* @return \AsyncInterop\Promise
|
||||
*/
|
||||
public function write(string $data): Promise {
|
||||
$length = \strlen($data);
|
||||
$written = 0;
|
||||
|
||||
if ($this->writeQueue->isEmpty()) {
|
||||
if ($length === 0) {
|
||||
return new Success(0);
|
||||
}
|
||||
|
||||
// Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full.
|
||||
$written = @\fwrite($this->process->getStdIn(), $data, self::CHUNK_SIZE);
|
||||
|
||||
if ($written === false) {
|
||||
$message = "Failed to write to stream";
|
||||
if ($error = \error_get_last()) {
|
||||
$message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]);
|
||||
}
|
||||
return new Failure(new ContextException($message));
|
||||
}
|
||||
|
||||
if ($length <= $written) {
|
||||
return new Success($written);
|
||||
}
|
||||
|
||||
$data = \substr($data, $written);
|
||||
}
|
||||
|
||||
return new Coroutine($this->doWrite($data, $written));
|
||||
}
|
||||
|
||||
private function doWrite(string $data, int $written): \Generator {
|
||||
$deferred = new Deferred;
|
||||
$this->writeQueue->push([$data, $written, $deferred]);
|
||||
|
||||
Loop::enable($this->stdinWatcher);
|
||||
|
||||
try {
|
||||
$written = yield $deferred->promise();
|
||||
} catch (\Throwable $exception) {
|
||||
$this->kill();
|
||||
throw $exception;
|
||||
} finally {
|
||||
if ($this->writeQueue->isEmpty()) {
|
||||
Loop::disable($this->stdinWatcher);
|
||||
}
|
||||
}
|
||||
|
||||
return $written;
|
||||
}
|
||||
|
||||
public function getStdOut(): Stream {
|
||||
return $this->stdoutEmitter->stream();
|
||||
}
|
||||
|
||||
public function getStdErr(): Stream {
|
||||
return $this->stderrEmitter->stream();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function join(): Promise {
|
||||
if ($this->promise === null) {
|
||||
throw new StatusError("The process has not been started");
|
||||
}
|
||||
|
||||
return $this->promise;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function kill() {
|
||||
$this->process->kill();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function getPid(): int {
|
||||
return $this->process->getPid();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function signal(int $signo) {
|
||||
$this->process->signal($signo);
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user