Refactor Process to make writing child processes simple

This commit is contained in:
Aaron Piotrowski 2017-12-10 15:37:33 -06:00
parent cb15de11ff
commit 4d4841f449
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
10 changed files with 235 additions and 153 deletions

View File

@ -1,83 +0,0 @@
#!/usr/bin/env php
<?php
use Amp\Parallel\Sync;
use Amp\Parallel\Worker;
const AMP_WORKER = "amp-worker";
// Doesn't exist in phpdbg...
if (function_exists("cli_set_process_title")) {
@cli_set_process_title(AMP_WORKER);
}
// Redirect all output written using echo, print, printf, etc. to STDERR.
ob_start(function ($data) {
fwrite(STDERR, $data);
return '';
}, 1, PHP_OUTPUT_HANDLER_CLEANABLE | PHP_OUTPUT_HANDLER_FLUSHABLE);
(function () {
$paths = [
dirname(__DIR__, 3) . "/autoload.php",
dirname(__DIR__) . "/vendor/autoload.php",
];
foreach ($paths as $path) {
if (file_exists($path)) {
$autoloadPath = $path;
break;
}
}
if (!isset($autoloadPath)) {
fwrite(STDERR, "Could not locate autoload.php");
exit(1);
}
require $autoloadPath;
})();
Amp\Loop::run(function () {
$channel = new Sync\ChannelledSocket(STDIN, STDOUT);
try {
$environment = (function (): Worker\Environment {
$options = getopt("e:");
if (!isset($options["e"])) {
throw new Error("No environment class name provided");
}
$className = $options["e"];
if (!class_exists($className)) {
throw new Error(sprintf("Invalid environment class name '%s'", $className));
}
if (!is_subclass_of($className, Worker\Environment::class)) {
throw new Error(sprintf(
"The class '%s' does not implement '%s'",
$className,
Worker\Environment::class
));
}
return new $className;
})();
$runner = new Worker\TaskRunner($channel, $environment);
$result = new Sync\ExitSuccess(yield $runner->run());
} catch (Sync\ChannelException $exception) {
exit(1); // Parent context died, simply exit.
} catch (Throwable $exception) {
$result = new Sync\ExitFailure($exception);
}
try {
yield $channel->send($result);
} catch (Throwable $exception) {
exit(1); // Parent context died, simply exit.
}
});

View File

@ -0,0 +1,21 @@
<?php
// The function returned by this script is run by process.php in a separate process.
// echo, print, printf, etc. in this script are written to STDERR of the parent.
// $argc and $argv are available in this process as any other cli PHP script.
use Amp\Parallel\Sync\Channel;
return function (Channel $channel): \Generator {
printf("Received the following from parent: %s\n", yield $channel->receive());
print "Sleeping for 3 seconds...\n";
sleep(3); // Blocking call in process.
yield $channel->send("Data sent from child.");
print "Sleeping for 2 seconds...\n";
sleep(2); // Blocking call in process.
return 42;
};

30
examples/process.php Executable file
View File

@ -0,0 +1,30 @@
#!/usr/bin/env php
<?php
require dirname(__DIR__).'/vendor/autoload.php';
use Amp\Delayed;
use Amp\Loop;
use Amp\Parallel\Context\Process;
Loop::run(function () {
$timer = Loop::repeat(1000, function () {
static $i;
$i = $i ? ++$i : 1;
print "Demonstrating how alive the parent is for the {$i}th time.\n";
});
try {
// Create a new child process that does some blocking stuff.
$context = Process::spawn(__DIR__ . "/blocking-process.php");
print "Waiting 2 seconds to send start data...\n";
yield new Delayed(2000);
yield $context->send("Start data"); // Data sent to child process, received on line 9 of blocking-process.php
printf("Received the following from child: %s\n", yield $context->receive()); // Sent on line 14 of blocking-process.php
printf("Process ended with value %d!\n", yield $context->join());
} finally {
Loop::cancel($timer);
}
});

View File

@ -0,0 +1,80 @@
<?php
namespace Amp\Parallel\Context\Internal;
use Amp\Loop;
use Amp\Parallel\Sync;
use function Amp\call;
// Doesn't exist in phpdbg...
if (\function_exists("cli_set_process_title")) {
@\cli_set_process_title("amp-process");
}
// Redirect all output written using echo, print, printf, etc. to STDERR.
\ob_start(function ($data) {
\fwrite(STDERR, $data);
return '';
}, 1, PHP_OUTPUT_HANDLER_CLEANABLE | PHP_OUTPUT_HANDLER_FLUSHABLE);
(function () {
$paths = [
\dirname(__DIR__, 5) . "/autoload.php",
\dirname(__DIR__, 3) . "/vendor/autoload.php",
];
foreach ($paths as $path) {
if (\file_exists($path)) {
$autoloadPath = $path;
break;
}
}
if (!isset($autoloadPath)) {
\fwrite(STDERR, "Could not locate autoload.php");
exit(1);
}
require $autoloadPath;
})();
Loop::run(function () use ($argc, $argv) {
$channel = new Sync\ChannelledSocket(STDIN, STDOUT);
// Remove this scripts path from process arguments.
--$argc;
\array_shift($argv);
try {
// Protect current scope by requiring script within another function.
$callable = (function () use ($argc, $argv): callable {
if (!isset($argv[0])) {
throw new \Error("No script path given");
}
if (!\is_file($argv[0])) {
throw new \Error(\sprintf("No script found at '%s' (be sure to provide the full path to the script)", $argv[0]));
}
$callable = require $argv[0];
if (!\is_callable($callable)) {
throw new \Error("Script did not return a callable function");
}
return $callable;
})();
$result = new Sync\ExitSuccess(yield call($callable, $channel));
} catch (Sync\ChannelException $exception) {
exit(1); // Parent context died, simply exit.
} catch (\Throwable $exception) {
$result = new Sync\ExitFailure($exception);
}
try {
yield $channel->send($result);
} catch (\Throwable $exception) {
exit(1); // Parent context died, simply exit.
}
});

View File

@ -13,6 +13,8 @@ use function Amp\asyncCall;
use function Amp\call;
class Process implements Context {
const SCRIPT_PATH = __DIR__ . "/Internal/process-runner.php";
/** @var string|null Cached path to located PHP binary. */
private static $binaryPath;
@ -23,11 +25,28 @@ class Process implements Context {
private $channel;
/**
* Creates and starts the process at the given path using the optional PHP binary path.
*
* @param string|array $script Path to PHP script or array with first element as path and following elements options
* to the PHP script (e.g.: ['bin/worker', '-eOptionValue', '-nOptionValue'].
* to the PHP script (e.g.: ['bin/worker', 'Option1Value', 'Option2Value'].
* @param string $binary Path to PHP binary. Null will attempt to automatically locate the binary.
*
* @return \Amp\Parallel\Context\Process
*/
public static function spawn($script, string $binary = null): self {
$process = new self($script, $binary);
$process->start();
return $process;
}
/**
* @param string|array $script Path to PHP script or array with first element as path and following elements options
* to the PHP script (e.g.: ['bin/worker', 'Option1Value', 'Option2Value'].
* @param string $binary Path to PHP binary. Null will attempt to automatically locate the binary.
* @param string $cwd Working directory.
* @param mixed[] $env Array of environment variables.
*
* @throws \Error If the PHP binary path given cannot be found or is not executable.
*/
public function __construct($script, string $binary = null, string $cwd = "", array $env = []) {
$options = [
@ -52,7 +71,12 @@ class Process implements Context {
throw new \Error(\sprintf("The PHP binary path '%s' was not found or is not executable", $binary));
}
$command = \escapeshellarg($binary) . " " . $this->formatOptions($options) . " " . $script;
$command = \implode(" ", [
\escapeshellarg($binary),
$this->formatOptions($options),
self::SCRIPT_PATH,
$script,
]);
$this->process = new BaseProcess($command, $cwd, $env);
}
@ -73,7 +97,7 @@ class Process implements Context {
$result = [];
foreach ($options as $option => $value) {
$result[] = \sprintf("-d %s=%s", $option, $value);
$result[] = \sprintf("-d%s=%s", $option, $value);
}
return \implode(" ", $result);

View File

@ -0,0 +1,37 @@
<?php
namespace Amp\Parallel\Worker\Internal;
use Amp\Parallel\Sync;
use Amp\Parallel\Worker;
use Amp\Promise;
return function (Sync\Channel $channel) use ($argv): Promise {
if (!\defined("AMP_WORKER")) {
\define("AMP_WORKER", "amp-worker");
}
if (!isset($argv[1])) {
throw new \Error("No environment class name provided");
}
$className = $argv[1];
if (!\class_exists($className)) {
throw new \Error(\sprintf("Invalid environment class name '%s'", $className));
}
if (!\is_subclass_of($className, Worker\Environment::class)) {
throw new \Error(\sprintf(
"The class '%s' does not implement '%s'",
$className,
Worker\Environment::class
));
}
$environment = new $className;
$runner = new Worker\TaskRunner($channel, $environment);
return $runner->run();
};

View File

@ -8,19 +8,22 @@ use Amp\Parallel\Context\Process;
* A worker thread that executes task objects.
*/
class WorkerProcess extends AbstractWorker {
const SCRIPT_PATH = __DIR__ . "/Internal/worker-process.php";
/**
* @param string|null $binary Path to PHP binary. Null will attempt to automatically locate the binary.
* @param string $envClassName Name of class implementing \Amp\Parallel\Worker\Environment to instigate.
* Defaults to \Amp\Parallel\Worker\BasicEnvironment.
* @param mixed[] $env Array of environment variables to pass to the worker. Empty array inherits from the current
* PHP process. See the $env parameter of \Amp\Process\Process::__construct().
*
* @throws \Error If the PHP binary path given cannot be found or is not executable.
*/
public function __construct(string $binary = null, string $envClassName = BasicEnvironment::class, array $env = []) {
$dir = \dirname(__DIR__, 2) . '/bin';
$script = [
$dir . "/worker",
"-e" . $envClassName,
self::SCRIPT_PATH,
$envClassName,
];
parent::__construct(new Process($script, $binary, $dir, $env));
parent::__construct(new Process($script, $binary, __DIR__, $env));
}
}

View File

@ -9,10 +9,30 @@ use Amp\Promise;
class ProcessTest extends TestCase {
public function testBasicProcess() {
$process = new Process([
dirname(__DIR__) . "/bin/process",
"-sTest"
__DIR__ . "/test-process.php",
"Test"
]);
$process->start();
$this->assertSame("Test", Promise\wait($process->join()));
}
/**
* @expectedException \Amp\Parallel\Sync\PanicError
* @expectedExceptionMessage No string provided
*/
public function testFailingProcess() {
$process = new Process(__DIR__ . "/test-process.php");
$process->start();
Promise\wait($process->join());
}
/**
* @expectedException \Amp\Parallel\Sync\PanicError
* @expectedExceptionMessage No script found at 'test-process.php'
*/
public function testInvalidScriptPath() {
$process = new Process("test-process.php");
$process->start();
Promise\wait($process->join());
}
}

View File

@ -0,0 +1,11 @@
<?php
use Amp\Parallel\Sync\Channel;
return function (Channel $channel) use ($argv): string {
if (!isset($argv[1])) {
throw new Error("No string provided");
}
return $argv[1];
};

View File

@ -1,61 +0,0 @@
#!/usr/bin/env php
<?php
use Amp\Parallel\Sync;
// Doesn't exist in phpdbg...
if (function_exists("cli_set_process_title")) {
@cli_set_process_title("process-test");
}
// Redirect all output written using echo, print, printf, etc. to STDERR.
ob_start(function ($data) {
fwrite(STDERR, $data);
return '';
}, 1, PHP_OUTPUT_HANDLER_CLEANABLE | PHP_OUTPUT_HANDLER_FLUSHABLE);
(function () {
$paths = [
dirname(__DIR__, 2) . "/vendor/autoload.php",
];
foreach ($paths as $path) {
if (file_exists($path)) {
$autoloadPath = $path;
break;
}
}
if (!isset($autoloadPath)) {
fwrite(STDERR, "Could not locate autoload.php");
exit(1);
}
require $autoloadPath;
})();
Amp\Loop::run(function () {
$channel = new Sync\ChannelledSocket(STDIN, STDOUT);
try {
$value = (function (): string {
$options = getopt("s:");
if (!isset($options["s"])) {
throw new Error("No string provided");
}
return $options["s"];
})();
$result = new Sync\ExitSuccess($value);
} catch (Throwable $exception) {
$result = new Sync\ExitFailure($exception);
}
try {
yield $channel->send($result);
} catch (Throwable $exception) {
exit(1); // Parent context died, simply exit.
}
});