Fix code style

This commit is contained in:
Aaron Piotrowski 2018-10-07 09:50:45 -05:00
parent 394eeb6ac0
commit b654463339
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
82 changed files with 730 additions and 415 deletions

View File

@ -1,40 +1,13 @@
<?php
return PhpCsFixer\Config::create()
->setRiskyAllowed(true)
->setRules([
"@PSR1" => true,
"@PSR2" => true,
"braces" => [
"allow_single_line_closure" => true,
"position_after_functions_and_oop_constructs" => "same",
],
"array_syntax" => ["syntax" => "short"],
"cast_spaces" => true,
"combine_consecutive_unsets" => true,
"function_to_constant" => true,
"no_multiline_whitespace_before_semicolons" => true,
"no_unused_imports" => true,
"no_useless_else" => true,
"no_useless_return" => true,
"no_whitespace_before_comma_in_array" => true,
"no_whitespace_in_blank_line" => true,
"non_printable_character" => true,
"normalize_index_brace" => true,
"ordered_imports" => true,
"php_unit_construct" => true,
"php_unit_dedicate_assert" => true,
"php_unit_fqcn_annotation" => true,
"phpdoc_summary" => true,
"phpdoc_types" => true,
"psr4" => true,
"return_type_declaration" => ["space_before" => "none"],
"short_scalar_cast" => true,
"single_blank_line_before_namespace" => true,
])
->setFinder(
PhpCsFixer\Finder::create()
->in(__DIR__ . "/examples")
->in(__DIR__ . "/lib")
->in(__DIR__ . "/test")
);
$config = new Amp\CodeStyle\Config();
$config->getFinder()
->in(__DIR__ . '/examples')
->in(__DIR__ . '/lib')
->in(__DIR__ . '/test');
$cacheDir = getenv('TRAVIS') ? getenv('HOME') . '/.php-cs-fixer' : __DIR__;
$config->setCacheFile($cacheDir . '/.php_cs.cache');
return $config;

View File

@ -39,3 +39,9 @@ script:
after_script:
- ./coveralls.phar -v
cache:
directories:
- $HOME/.composer/cache
- $HOME/.php-cs-fixer
- $HOME/.local

View File

@ -30,7 +30,7 @@
"require-dev": {
"phpunit/phpunit": "^6",
"amphp/phpunit-util": "^1",
"friendsofphp/php-cs-fixer": "^2.3"
"amphp/php-cs-fixer-config": "dev-master"
},
"suggest": {
"ext-pthreads": "Required for thread contexts"
@ -53,5 +53,13 @@
"platform": {
"php": "7.0.0"
}
},
"scripts": {
"check": [
"@cs",
"@test"
],
"cs": "php-cs-fixer fix -v --diff --dry-run",
"cs-fix": "php-cs-fixer fix -v --diff"
}
}

View File

@ -5,7 +5,8 @@ namespace Amp\Parallel\Example;
use Amp\Parallel\Worker\Environment;
use Amp\Parallel\Worker\Task;
class BlockingTask implements Task {
class BlockingTask implements Task
{
/**
* @var callable
*/
@ -20,7 +21,8 @@ class BlockingTask implements Task {
* @param callable $function Do not use a closure or non-serializable object.
* @param mixed ...$args Arguments to pass to the function. Must be serializable.
*/
public function __construct(callable $function, ...$args) {
public function __construct(callable $function, ...$args)
{
$this->function = $function;
$this->args = $args;
}
@ -28,11 +30,13 @@ class BlockingTask implements Task {
/**
* {@inheritdoc}
*/
public function run(Environment $environment) {
public function run(Environment $environment)
{
return ($this->function)(...$this->args);
}
public function getArgs() {
public function getArgs()
{
return $this->args;
}
}

View File

@ -7,15 +7,15 @@
use Amp\Parallel\Sync\Channel;
return function (Channel $channel): \Generator {
printf("Received the following from parent: %s\n", yield $channel->receive());
\printf("Received the following from parent: %s\n", yield $channel->receive());
print "Sleeping for 3 seconds...\n";
sleep(3); // Blocking call in process.
\sleep(3); // Blocking call in process.
yield $channel->send("Data sent from child.");
print "Sleeping for 2 seconds...\n";
sleep(2); // Blocking call in process.
\sleep(2); // Blocking call in process.
return 42;
};

View File

@ -15,7 +15,7 @@ return function (Channel $channel) use ($argv): \Generator {
$id = $argv[1];
printf("Child process using parcel ID %s\n", $id);
\printf("Child process using parcel ID %s\n", $id);
$parcel = SharedMemoryParcel::use($id);
@ -23,12 +23,12 @@ return function (Channel $channel) use ($argv): \Generator {
return $value + 1;
});
printf("Value after modifying in child thread: %s\n", $value);
\printf("Value after modifying in child thread: %s\n", $value);
yield new Delayed(500); // Parent process should access parcel during this time.
// Unwrapping the parcel now should give value from parent process.
printf("Value in child thread after being modified in main thread: %s\n", yield $parcel->unwrap());
\printf("Value in child thread after being modified in main thread: %s\n", yield $parcel->unwrap());
yield $parcel->synchronized(function (int $value) {
return $value + 1;

View File

@ -1,6 +1,6 @@
#!/usr/bin/env php
<?php
require dirname(__DIR__).'/vendor/autoload.php';
require \dirname(__DIR__).'/vendor/autoload.php';
use Amp\Delayed;
use Amp\Loop;
@ -24,8 +24,8 @@ Loop::run(function () {
yield $context->send("Start data"); // Data sent to child process, received on line 9 of blocking-process.php
printf("Received the following from child: %s\n", yield $context->receive()); // Sent on line 14 of blocking-process.php
printf("Process ended with value %d!\n", yield $context->join());
\printf("Received the following from child: %s\n", yield $context->receive()); // Sent on line 14 of blocking-process.php
\printf("Process ended with value %d!\n", yield $context->join());
} finally {
Loop::cancel($timer);
}

View File

@ -1,6 +1,6 @@
#!/usr/bin/env php
<?php
require dirname(__DIR__).'/vendor/autoload.php';
require \dirname(__DIR__).'/vendor/autoload.php';
use Amp\Delayed;
use Amp\Loop;
@ -9,7 +9,7 @@ use Amp\Parallel\Sync\SharedMemoryParcel;
Loop::run(function () {
// Create a parcel that then can be accessed in any number of child processes.
$parcel = SharedMemoryParcel::create($id = bin2hex(random_bytes(10)), 1);
$parcel = SharedMemoryParcel::create($id = \bin2hex(\random_bytes(10)), 1);
$context = Process::run([
__DIR__ . "/parcel-process.php",
@ -24,5 +24,5 @@ Loop::run(function () {
yield $context->join(); // Wait for child process to finish.
printf("Final value of parcel: %d\n", yield $parcel->unwrap());
\printf("Final value of parcel: %d\n", yield $parcel->unwrap());
});

View File

@ -1,6 +1,6 @@
#!/usr/bin/env php
<?php
require dirname(__DIR__).'/vendor/autoload.php';
require \dirname(__DIR__).'/vendor/autoload.php';
use Amp\Delayed;
use Amp\Loop;
@ -17,15 +17,15 @@ Loop::run(function () {
try {
// Create a new child thread that does some blocking stuff.
$context = Thread::run(function (Channel $channel): \Generator {
printf("Received the following from parent: %s\n", yield $channel->receive());
\printf("Received the following from parent: %s\n", yield $channel->receive());
print "Sleeping for 3 seconds...\n";
sleep(3); // Blocking call in thread.
\sleep(3); // Blocking call in thread.
yield $channel->send("Data sent from child.");
print "Sleeping for 2 seconds...\n";
sleep(2); // Blocking call in thread.
\sleep(2); // Blocking call in thread.
return 42;
});
@ -35,8 +35,8 @@ Loop::run(function () {
yield $context->send("Start data");
printf("Received the following from child: %s\n", yield $context->receive());
printf("Thread ended with value %d!\n", yield $context->join());
\printf("Received the following from child: %s\n", yield $context->receive());
\printf("Thread ended with value %d!\n", yield $context->join());
} finally {
Loop::cancel($timer);
}

View File

@ -1,6 +1,6 @@
#!/usr/bin/env php
<?php
require dirname(__DIR__).'/vendor/autoload.php';
require \dirname(__DIR__).'/vendor/autoload.php';
use Amp\Delayed;
use Amp\Loop;
@ -17,12 +17,12 @@ Loop::run(function () {
return $value + 1;
});
printf("Value after modifying in child thread: %s\n", $value);
\printf("Value after modifying in child thread: %s\n", $value);
yield new Delayed(500); // Main thread should access parcel during this time.
// Unwrapping the parcel now should give value from main thread.
printf("Value in child thread after being modified in main thread: %s\n", yield $parcel->unwrap());
\printf("Value in child thread after being modified in main thread: %s\n", yield $parcel->unwrap());
yield $parcel->synchronized(function (int $value) {
return $value + 1;
@ -37,5 +37,5 @@ Loop::run(function () {
yield $context->join(); // Wait for child thread to finish.
printf("Final value of parcel: %d\n", yield $parcel->unwrap());
\printf("Final value of parcel: %d\n", yield $parcel->unwrap());
});

View File

@ -1,6 +1,6 @@
#!/usr/bin/env php
<?php
require dirname(__DIR__).'/vendor/autoload.php';
require \dirname(__DIR__).'/vendor/autoload.php';
use Amp\Coroutine;
use Amp\Loop;
@ -20,7 +20,7 @@ $tasks = [
// Event loop for parallel tasks
Loop::run(function () use (&$results, &$tasks) {
$timer = Loop::repeat(200, function () {
printf(".");
\printf(".");
});
Loop::unreference($timer);
@ -32,12 +32,12 @@ Loop::run(function () use (&$results, &$tasks) {
$coroutines[] = function () use ($pool, $task, &$results) {
$result = yield $pool->enqueue($task);
$url = $task->getArgs()[0];
printf("\nRead from %s: %d bytes\n", $url, strlen($result));
\printf("\nRead from %s: %d bytes\n", $url, \strlen($result));
$results[$url] = $result;
};
}
$coroutines = array_map(function (callable $coroutine): Coroutine {
$coroutines = \array_map(function (callable $coroutine): Coroutine {
return new Coroutine($coroutine());
}, $coroutines);
@ -47,4 +47,4 @@ Loop::run(function () use (&$results, &$tasks) {
});
echo "\nResult array keys:\n";
echo var_export(array_keys($results), true);
echo \var_export(\array_keys($results), true);

View File

@ -1,6 +1,6 @@
#!/usr/bin/env php
<?php
require dirname(__DIR__).'/vendor/autoload.php';
require \dirname(__DIR__).'/vendor/autoload.php';
use Amp\Parallel\Example\BlockingTask;
use Amp\Parallel\Worker\DefaultWorkerFactory;
@ -11,8 +11,8 @@ Amp\Loop::run(function () {
$worker = $factory->create();
$result = yield $worker->enqueue(new BlockingTask('file_get_contents', 'https://google.com'));
printf("Read %d bytes\n", strlen($result));
\printf("Read %d bytes\n", \strlen($result));
$code = yield $worker->shutdown();
printf("Code: %d\n", $code);
\printf("Code: %d\n", $code);
});

View File

@ -5,7 +5,8 @@ namespace Amp\Parallel\Context;
use Amp\Parallel\Sync\Channel;
use Amp\Promise;
interface Context extends Channel {
interface Context extends Channel
{
/**
* @return bool
*/

View File

@ -2,5 +2,6 @@
namespace Amp\Parallel\Context;
class ContextException extends \Exception {
class ContextException extends \Exception
{
}

View File

@ -8,7 +8,8 @@ use Amp\Parallel\Sync\ChannelledSocket;
use Amp\Promise;
use function Amp\call;
class ProcessHub {
class ProcessHub
{
/** @var resource|null */
private $server;
@ -21,7 +22,8 @@ class ProcessHub {
/** @var Deferred|null */
private $acceptor;
public function __construct() {
public function __construct()
{
$this->uri = "unix://" . \tempnam(\sys_get_temp_dir(), "amp-cluster-ipc-") . ".sock";
$this->server = \stream_socket_server($this->uri, $errno, $errstr, \STREAM_SERVER_BIND | \STREAM_SERVER_LISTEN);
@ -46,16 +48,19 @@ class ProcessHub {
Loop::disable($this->watcher);
}
public function __destruct() {
public function __destruct()
{
Loop::cancel($this->watcher);
\fclose($this->server);
}
public function getUri(): string {
public function getUri(): string
{
return $this->uri;
}
public function accept(): Promise {
public function accept(): Promise
{
return call(function () {
while ($this->acceptor) {
yield $this->acceptor->promise();

View File

@ -16,7 +16,8 @@ use function Amp\call;
*
* @internal
*/
class Thread extends \Thread {
class Thread extends \Thread
{
const KILL_CHECK_FREQUENCY = 250;
/** @var callable The function to execute in the thread. */
@ -38,7 +39,8 @@ class Thread extends \Thread {
* @param callable $function The function to execute in the thread.
* @param mixed[] $args Arguments to pass to the function.
*/
public function __construct($socket, callable $function, array $args = []) {
public function __construct($socket, callable $function, array $args = [])
{
$this->function = $function;
$this->args = $args;
$this->socket = $socket;
@ -49,7 +51,8 @@ class Thread extends \Thread {
*
* @codeCoverageIgnore Only executed in thread.
*/
public function run() {
public function run()
{
/* First thing we need to do is re-initialize the class autoloader. If
* we don't do this first, any object of a class that was loaded after
* the thread started will just be garbage data and unserializable
@ -106,7 +109,8 @@ class Thread extends \Thread {
/**
* Sets a local variable to true so the running event loop can check for a kill signal.
*/
public function kill() {
public function kill()
{
return $this->killed = true;
}
@ -117,7 +121,8 @@ class Thread extends \Thread {
*
* @codeCoverageIgnore Only executed in thread.
*/
private function execute(Channel $channel): \Generator {
private function execute(Channel $channel): \Generator
{
try {
$result = new ExitSuccess(yield call($this->function, $channel, ...$this->args));
} catch (\Throwable $exception) {

View File

@ -12,7 +12,8 @@ use Amp\Promise;
use function Amp\asyncCall;
use function Amp\call;
class Process implements Context {
class Process implements Context
{
const SCRIPT_PATH = __DIR__ . "/Internal/process-runner.php";
/** @var ByteStream\ResourceOutputStream */
@ -47,7 +48,8 @@ class Process implements Context {
*
* @return Promise<Process>
*/
public static function run($script, string $cwd = null, array $env = [], string $binary = null): Promise {
public static function run($script, string $cwd = null, array $env = [], string $binary = null): Promise
{
$process = new self($script, $cwd, $env, $binary);
return call(function () use ($process) {
yield $process->start();
@ -64,7 +66,8 @@ class Process implements Context {
*
* @throws \Error If the PHP binary path given cannot be found or is not executable.
*/
public function __construct($script, string $cwd = null, array $env = [], string $binary = null) {
public function __construct($script, string $cwd = null, array $env = [], string $binary = null)
{
$this->hub = Loop::getState(self::class);
if (!$this->hub instanceof Internal\ProcessHub) {
$this->hub = new Internal\ProcessHub;
@ -141,7 +144,8 @@ class Process implements Context {
$this->process = new BaseProcess($command, $cwd, $env);
}
private static function locateBinary(): string {
private static function locateBinary(): string
{
$executable = \strncasecmp(\PHP_OS, "WIN", 3) === 0 ? "php.exe" : "php";
$paths = \array_filter(\explode(\PATH_SEPARATOR, \getenv("PATH")));
@ -158,7 +162,8 @@ class Process implements Context {
throw new \Error("Could not locate PHP executable binary");
}
private function formatOptions(array $options) {
private function formatOptions(array $options)
{
$result = [];
foreach ($options as $option => $value) {
@ -171,13 +176,15 @@ class Process implements Context {
/**
* Private method to prevent cloning.
*/
private function __clone() {
private function __clone()
{
}
/**
* {@inheritdoc}
*/
public function start(): Promise {
public function start(): Promise
{
return call(function () {
$this->process->start();
@ -197,14 +204,16 @@ class Process implements Context {
/**
* {@inheritdoc}
*/
public function isRunning(): bool {
public function isRunning(): bool
{
return $this->process->isRunning();
}
/**
* {@inheritdoc}
*/
public function receive(): Promise {
public function receive(): Promise
{
if ($this->channel === null) {
throw new StatusError("The process has not been started");
}
@ -231,7 +240,8 @@ class Process implements Context {
/**
* {@inheritdoc}
*/
public function send($data): Promise {
public function send($data): Promise
{
if ($this->channel === null) {
throw new StatusError("The process has not been started");
}
@ -246,7 +256,8 @@ class Process implements Context {
/**
* {@inheritdoc}
*/
public function join(): Promise {
public function join(): Promise
{
if ($this->channel === null) {
throw new StatusError("The process has not been started");
}
@ -281,7 +292,8 @@ class Process implements Context {
* @throws \Amp\Process\ProcessException
* @throws \Amp\Process\StatusError
*/
public function signal(int $signo) {
public function signal(int $signo)
{
$this->process->signal($signo);
}
@ -293,14 +305,16 @@ class Process implements Context {
* @return \Amp\Promise
* @throws \Amp\Process\StatusError
*/
public function getPid(): Promise {
public function getPid(): Promise
{
return $this->process->getPid();
}
/**
* {@inheritdoc}
*/
public function kill() {
public function kill()
{
$this->process->kill();
}
}

View File

@ -2,5 +2,6 @@
namespace Amp\Parallel\Context;
class StatusError extends \Error {
class StatusError extends \Error
{
}

View File

@ -19,7 +19,8 @@ use function Amp\call;
* maintained both in the context that creates the thread and in the thread
* itself.
*/
class Thread implements Context {
class Thread implements Context
{
const EXIT_CHECK_FREQUENCY = 250;
/** @var Internal\Thread An internal thread instance. */
@ -48,7 +49,8 @@ class Thread implements Context {
*
* @return bool True if threading is enabled, otherwise false.
*/
public static function supported(): bool {
public static function supported(): bool
{
return \extension_loaded('pthreads');
}
@ -61,7 +63,8 @@ class Thread implements Context {
*
* @return Promise<Thread> The thread object that was spawned.
*/
public static function run(callable $function, ...$args): Promise {
public static function run(callable $function, ...$args): Promise
{
$thread = new self($function, ...$args);
return call(function () use ($thread) {
yield $thread->start();
@ -78,7 +81,8 @@ class Thread implements Context {
*
* @throws \Error Thrown if the pthreads extension is not available.
*/
public function __construct(callable $function, ...$args) {
public function __construct(callable $function, ...$args)
{
if (!self::supported()) {
throw new \Error("The pthreads extension is required to create threads.");
}
@ -91,7 +95,8 @@ class Thread implements Context {
* Returns the thread to the condition before starting. The new thread can be started and run independently of the
* first thread.
*/
public function __clone() {
public function __clone()
{
$this->thread = null;
$this->socket = null;
$this->channel = null;
@ -103,7 +108,8 @@ class Thread implements Context {
*
* @throws \Amp\Parallel\Context\ContextException
*/
public function __destruct() {
public function __destruct()
{
if (\getmypid() === $this->oid) {
$this->kill();
}
@ -114,7 +120,8 @@ class Thread implements Context {
*
* @return bool True if the context is running, otherwise false.
*/
public function isRunning(): bool {
public function isRunning(): bool
{
return $this->channel !== null;
}
@ -124,7 +131,8 @@ class Thread implements Context {
* @throws \Amp\Parallel\Context\StatusError If the thread has already been started.
* @throws \Amp\Parallel\Context\ContextException If starting the thread was unsuccessful.
*/
public function start(): Promise {
public function start(): Promise
{
if ($this->oid !== 0) {
throw new StatusError('The thread has already been started.');
}
@ -173,7 +181,8 @@ class Thread implements Context {
*
* @throws ContextException If killing the thread was unsuccessful.
*/
public function kill() {
public function kill()
{
if ($this->thread !== null) {
try {
if ($this->thread->isRunning() && !$this->thread->kill()) {
@ -188,7 +197,8 @@ class Thread implements Context {
/**
* Closes channel and socket if still open.
*/
private function close() {
private function close()
{
if ($this->channel !== null) {
$this->channel->close();
}
@ -207,7 +217,8 @@ class Thread implements Context {
* @throws SynchronizationError Thrown if an exit status object is not received.
* @throws ContextException If the context stops responding.
*/
public function join(): Promise {
public function join(): Promise
{
if ($this->channel == null || $this->thread === null) {
throw new StatusError('The thread has not been started or has already finished.');
}
@ -243,7 +254,8 @@ class Thread implements Context {
/**
* {@inheritdoc}
*/
public function receive(): Promise {
public function receive(): Promise
{
if ($this->channel === null) {
throw new StatusError('The process has not been started.');
}
@ -272,7 +284,8 @@ class Thread implements Context {
/**
* {@inheritdoc}
*/
public function send($data): Promise {
public function send($data): Promise
{
if ($this->channel === null) {
throw new StatusError('The thread has not been started or has already finished.');
}

View File

@ -7,7 +7,8 @@ use Amp\Promise;
/**
* Interface for sending messages between execution contexts.
*/
interface Channel {
interface Channel
{
/**
* @return \Amp\Promise<mixed>
*

View File

@ -2,8 +2,10 @@
namespace Amp\Parallel\Sync;
class ChannelException extends \Exception {
public function __construct(string $message, \Throwable $previous = null) {
class ChannelException extends \Exception
{
public function __construct(string $message, \Throwable $previous = null)
{
parent::__construct($message, 0, $previous);
}
}

View File

@ -5,7 +5,8 @@ namespace Amp\Parallel\Sync;
use Amp\CallableMaker;
use Amp\Parser\Parser;
class ChannelParser extends Parser {
class ChannelParser extends Parser
{
use CallableMaker;
const HEADER_LENGTH = 5;
@ -13,7 +14,8 @@ class ChannelParser extends Parser {
/**
* @param callable(mixed $data) Callback invoked when data is parsed.
*/
public function __construct(callable $callback) {
public function __construct(callable $callback)
{
parent::__construct(self::parser($callback, self::callableFromStaticMethod("errorHandler")));
}
@ -24,7 +26,8 @@ class ChannelParser extends Parser {
*
* @throws \Amp\Parallel\Sync\SerializationException
*/
public function encode($data): string {
public function encode($data): string
{
try {
$data = \serialize($data);
} catch (\Throwable $exception) {
@ -46,7 +49,8 @@ class ChannelParser extends Parser {
* @throws \Amp\Parallel\Sync\ChannelException
* @throws \Amp\Parallel\Sync\SerializationException
*/
private static function parser(callable $push, callable $errorHandler): \Generator {
private static function parser(callable $push, callable $errorHandler): \Generator
{
while (true) {
$header = yield self::HEADER_LENGTH;
$data = \unpack("Cprefix/Llength", $header);
@ -78,7 +82,8 @@ class ChannelParser extends Parser {
}
}
private static function errorHandler($errno, $errstr, $errfile, $errline) {
private static function errorHandler($errno, $errstr, $errfile, $errline)
{
if ($errno & \error_reporting()) {
throw new ChannelException(\sprintf(
'Received corrupted data. Errno: %d; %s in file %s on line %d',

View File

@ -5,7 +5,8 @@ namespace Amp\Parallel\Sync;
use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
class ChannelledSocket extends ChannelledStream {
class ChannelledSocket extends ChannelledStream
{
/** @var \Amp\ByteStream\ResourceInputStream */
private $read;
@ -18,7 +19,8 @@ class ChannelledSocket extends ChannelledStream {
*
* @throws \Error If a stream resource is not given for $resource.
*/
public function __construct($read, $write) {
public function __construct($read, $write)
{
parent::__construct(
$this->read = new ResourceInputStream($read),
$this->write = new ResourceOutputStream($write)
@ -28,7 +30,8 @@ class ChannelledSocket extends ChannelledStream {
/**
* Closes the read and write resource streams.
*/
public function close() {
public function close()
{
$this->read->close();
$this->write->close();
}

View File

@ -13,7 +13,8 @@ use function Amp\call;
*
* Supports full duplex read and write.
*/
class ChannelledStream implements Channel {
class ChannelledStream implements Channel
{
/** @var \Amp\ByteStream\InputStream */
private $read;
@ -32,7 +33,8 @@ class ChannelledStream implements Channel {
* @param \Amp\ByteStream\InputStream $read
* @param \Amp\ByteStream\OutputStream $write
*/
public function __construct(InputStream $read, OutputStream $write) {
public function __construct(InputStream $read, OutputStream $write)
{
$this->read = $read;
$this->write = $write;
$this->received = new \SplQueue;
@ -42,7 +44,8 @@ class ChannelledStream implements Channel {
/**
* {@inheritdoc}
*/
public function send($data): Promise {
public function send($data): Promise
{
return call(function () use ($data) {
try {
return yield $this->write->write($this->parser->encode($data));
@ -55,7 +58,8 @@ class ChannelledStream implements Channel {
/**
* {@inheritdoc}
*/
public function receive(): Promise {
public function receive(): Promise
{
return call(function () {
while ($this->received->isEmpty()) {
try {

View File

@ -2,7 +2,8 @@
namespace Amp\Parallel\Sync;
class ExitFailure implements ExitResult {
class ExitFailure implements ExitResult
{
/** @var string */
private $type;
@ -15,7 +16,8 @@ class ExitFailure implements ExitResult {
/** @var array */
private $trace;
public function __construct(\Throwable $exception) {
public function __construct(\Throwable $exception)
{
$this->type = \get_class($exception);
$this->message = $exception->getMessage();
$this->code = $exception->getCode();
@ -25,7 +27,8 @@ class ExitFailure implements ExitResult {
/**
* {@inheritdoc}
*/
public function getResult() {
public function getResult()
{
throw new PanicError(
$this->type,
\sprintf(

View File

@ -2,7 +2,8 @@
namespace Amp\Parallel\Sync;
interface ExitResult {
interface ExitResult
{
/**
* @return mixed Return value of the callable given to the execution context.
*

View File

@ -2,18 +2,21 @@
namespace Amp\Parallel\Sync;
class ExitSuccess implements ExitResult {
class ExitSuccess implements ExitResult
{
/** @var mixed */
private $result;
public function __construct($result) {
public function __construct($result)
{
$this->result = $result;
}
/**
* {@inheritdoc}
*/
public function getResult() {
public function getResult()
{
return $this->result;
}
}

View File

@ -2,28 +2,32 @@
namespace Amp\Parallel\Sync\Internal;
class ParcelStorage extends \Threaded {
class ParcelStorage extends \Threaded
{
/** @var mixed */
private $value;
/**
* @param mixed $value
*/
public function __construct($value) {
public function __construct($value)
{
$this->value = $value;
}
/**
* @return mixed
*/
public function get() {
public function get()
{
return $this->value;
}
/**
* @param mixed $value
*/
public function set($value) {
public function set($value)
{
$this->value = $value;
}
}

View File

@ -2,7 +2,8 @@
namespace Amp\Parallel\Sync;
class PanicError extends \Error {
class PanicError extends \Error
{
/** @var string Class name of uncaught exception. */
private $name;
@ -16,7 +17,8 @@ class PanicError extends \Error {
* @param string $message The panic message.
* @param string $trace The panic stack trace.
*/
public function __construct(string $name, string $message = '', string $trace = '') {
public function __construct(string $name, string $message = '', string $trace = '')
{
parent::__construct($message);
$this->name = $name;
@ -28,7 +30,8 @@ class PanicError extends \Error {
*
* @return string
*/
public function getName(): string {
public function getName(): string
{
return $this->name;
}
@ -37,7 +40,8 @@ class PanicError extends \Error {
*
* @return string
*/
public function getPanicTrace(): string {
public function getPanicTrace(): string
{
return $this->trace;
}
}

View File

@ -17,7 +17,8 @@ use Amp\Promise;
* methods to acquire a lock for exclusive access to the parcel first before
* accessing the contained value.
*/
interface Parcel {
interface Parcel
{
/**
* Asynchronously invokes a callback while maintaining an exclusive lock on the parcel. The current value of the
* parcel is provided as the first argument to the callback function.

View File

@ -2,5 +2,6 @@
namespace Amp\Parallel\Sync;
class SerializationException extends ChannelException {
class SerializationException extends ChannelException
{
}

View File

@ -2,5 +2,6 @@
namespace Amp\Parallel\Sync;
class SharedMemoryException extends \Exception {
class SharedMemoryException extends \Exception
{
}

View File

@ -30,7 +30,8 @@ use function Amp\call;
* @see http://man7.org/linux/man-pages/man2/shmctl.2.html How shared memory works on Linux.
* @see https://msdn.microsoft.com/en-us/library/ms810613.aspx How shared memory works on Windows.
*/
class SharedMemoryParcel implements Parcel {
class SharedMemoryParcel implements Parcel
{
/** @var int The byte offset to the start of the object data in memory. */
const MEM_DATA_OFFSET = 7;
@ -64,7 +65,8 @@ class SharedMemoryParcel implements Parcel {
*
* @return \Amp\Parallel\Sync\SharedMemoryParcel
*/
public static function create(string $id, $value, int $size = 8192, int $permissions = 0600): self {
public static function create(string $id, $value, int $size = 8192, int $permissions = 0600): self
{
$parcel = new self($id);
$parcel->init($value, $size, $permissions);
return $parcel;
@ -75,7 +77,8 @@ class SharedMemoryParcel implements Parcel {
*
* @return \Amp\Parallel\Sync\SharedMemoryParcel
*/
public static function use(string $id): self {
public static function use(string $id): self
{
$parcel = new self($id);
$parcel->open();
return $parcel;
@ -93,7 +96,8 @@ class SharedMemoryParcel implements Parcel {
* @param int $permissions The access permissions to set for the object.
* If not specified defaults to 0600.
*/
private function __construct(string $id) {
private function __construct(string $id)
{
if (!\extension_loaded("shmop")) {
throw new \Error(__CLASS__ . " requires the shmop extension.");
}
@ -107,7 +111,8 @@ class SharedMemoryParcel implements Parcel {
* @param int $size
* @param int $permissions
*/
private function init($value, int $size = 8192, int $permissions = 0600) {
private function init($value, int $size = 8192, int $permissions = 0600)
{
$this->semaphore = PosixSemaphore::create($this->id, 1);
$this->initializer = \getmypid();
@ -116,7 +121,8 @@ class SharedMemoryParcel implements Parcel {
$this->wrap($value);
}
private function open() {
private function open()
{
$this->semaphore = PosixSemaphore::use($this->id);
$this->memOpen($this->key, 'w', 0, 0);
}
@ -129,7 +135,8 @@ class SharedMemoryParcel implements Parcel {
*
* @return bool True if the object is freed, otherwise false.
*/
private function isFreed(): bool {
private function isFreed(): bool
{
// If we are no longer connected to the memory segment, check if it has
// been invalidated.
if ($this->handle !== null) {
@ -144,7 +151,8 @@ class SharedMemoryParcel implements Parcel {
/**
* {@inheritdoc}
*/
public function unwrap(): Promise {
public function unwrap(): Promise
{
if ($this->isFreed()) {
return new Failure(new SharedMemoryException('The object has already been freed.'));
}
@ -169,7 +177,8 @@ class SharedMemoryParcel implements Parcel {
* memory segment on the next read attempt. Once all running processes and
* threads disconnect from the old segment, it will be freed by the OS.
*/
protected function wrap($value) {
protected function wrap($value)
{
if ($this->isFreed()) {
throw new SharedMemoryException('The object has already been freed.');
}
@ -203,7 +212,8 @@ class SharedMemoryParcel implements Parcel {
/**
* {@inheritdoc}
*/
public function synchronized(callable $callback): Promise {
public function synchronized(callable $callback): Promise
{
return call(function () use ($callback) {
/** @var \Amp\Sync\Lock $lock */
$lock = yield $this->semaphore->acquire();
@ -230,7 +240,8 @@ class SharedMemoryParcel implements Parcel {
* process disconnect from the object, the shared memory block will be
* destroyed by the OS.
*/
public function __destruct() {
public function __destruct()
{
if ($this->initializer === 0 || $this->initializer !== \getmypid()) {
return;
}
@ -253,20 +264,23 @@ class SharedMemoryParcel implements Parcel {
/**
* Private method to prevent cloning.
*/
private function __clone() {
private function __clone()
{
}
/**
* Private method to prevent serialization.
*/
private function __sleep() {
private function __sleep()
{
}
/**
* Updates the current memory segment handle, handling any moves made on the
* data.
*/
private function handleMovedMemory() {
private function handleMovedMemory()
{
// Read from the memory block and handle moved blocks until we find the
// correct block.
while (true) {
@ -289,7 +303,8 @@ class SharedMemoryParcel implements Parcel {
*
* @return array An associative array of header data.
*/
private function getHeader(): array {
private function getHeader(): array
{
$data = $this->memGet(0, self::MEM_DATA_OFFSET);
return \unpack('Cstate/Lsize/Spermissions', $data);
}
@ -301,7 +316,8 @@ class SharedMemoryParcel implements Parcel {
* @param int $size The size of the stored data, or other value.
* @param int $permissions The permissions mask on the memory segment.
*/
private function setHeader(int $state, int $size, int $permissions) {
private function setHeader(int $state, int $size, int $permissions)
{
$header = \pack('CLS', $state, $size, $permissions);
$this->memSet(0, $header);
}
@ -314,7 +330,8 @@ class SharedMemoryParcel implements Parcel {
* @param int $permissions Process permissions on the shared memory.
* @param int $size The size to crate the shared memory in bytes.
*/
private function memOpen(int $key, string $mode, int $permissions, int $size) {
private function memOpen(int $key, string $mode, int $permissions, int $size)
{
$this->handle = @\shmop_open($key, $mode, $permissions, $size);
if ($this->handle === false) {
throw new SharedMemoryException('Failed to create shared memory block.');
@ -329,7 +346,8 @@ class SharedMemoryParcel implements Parcel {
*
* @return string The binary data at the given offset.
*/
private function memGet(int $offset, int $size): string {
private function memGet(int $offset, int $size): string
{
$data = \shmop_read($this->handle, $offset, $size);
if ($data === false) {
throw new SharedMemoryException('Failed to read from shared memory block.');
@ -343,7 +361,8 @@ class SharedMemoryParcel implements Parcel {
* @param int $offset The offset to write to.
* @param string $data The binary data to write.
*/
private function memSet(int $offset, string $data) {
private function memSet(int $offset, string $data)
{
if (!\shmop_write($this->handle, $data, $offset)) {
throw new SharedMemoryException('Failed to write to shared memory block.');
}
@ -352,13 +371,15 @@ class SharedMemoryParcel implements Parcel {
/**
* Requests the shared memory segment to be deleted.
*/
private function memDelete() {
private function memDelete()
{
if (!\shmop_delete($this->handle)) {
throw new SharedMemoryException('Failed to discard shared memory block.');
}
}
private static function makeKey(string $id): int {
private static function makeKey(string $id): int
{
return \abs(\unpack("l", \md5($id, true))[1]);
}
}

View File

@ -2,5 +2,6 @@
namespace Amp\Parallel\Sync;
class SynchronizationError extends \Error {
class SynchronizationError extends \Error
{
}

View File

@ -10,7 +10,8 @@ use function Amp\call;
/**
* A thread-safe container that shares a value between multiple threads.
*/
class ThreadedParcel implements Parcel {
class ThreadedParcel implements Parcel
{
/** @var \Amp\Sync\ThreadedMutex */
private $mutex;
@ -22,7 +23,8 @@ class ThreadedParcel implements Parcel {
*
* @param mixed $value The value to store in the container.
*/
public function __construct($value) {
public function __construct($value)
{
$this->mutex = new ThreadedMutex;
$this->storage = new Internal\ParcelStorage($value);
}
@ -30,14 +32,16 @@ class ThreadedParcel implements Parcel {
/**
* {@inheritdoc}
*/
public function unwrap(): Promise {
public function unwrap(): Promise
{
return new Success($this->storage->get());
}
/**
* @return \Amp\Promise
*/
public function synchronized(callable $callback): Promise {
public function synchronized(callable $callback): Promise
{
return call(function () use ($callback) {
/** @var \Amp\Sync\Lock $lock */
$lock = yield $this->mutex->acquire();

View File

@ -12,7 +12,8 @@ use function Amp\call;
/**
* Base class for most common types of task workers.
*/
abstract class AbstractWorker implements Worker {
abstract class AbstractWorker implements Worker
{
/** @var \Amp\Parallel\Context\Context */
private $context;
@ -25,7 +26,8 @@ abstract class AbstractWorker implements Worker {
/**
* @param \Amp\Parallel\Context\Context $context
*/
public function __construct(Context $context) {
public function __construct(Context $context)
{
if ($context->isRunning()) {
throw new \Error("The context was already running");
}
@ -36,21 +38,24 @@ abstract class AbstractWorker implements Worker {
/**
* {@inheritdoc}
*/
public function isRunning(): bool {
public function isRunning(): bool
{
return $this->context->isRunning();
}
/**
* {@inheritdoc}
*/
public function isIdle(): bool {
public function isIdle(): bool
{
return $this->pending === null;
}
/**
* {@inheritdoc}
*/
public function enqueue(Task $task): Promise {
public function enqueue(Task $task): Promise
{
if ($this->shutdown) {
throw new StatusError("The worker has been shut down");
}
@ -100,7 +105,8 @@ abstract class AbstractWorker implements Worker {
/**
* {@inheritdoc}
*/
public function shutdown(): Promise {
public function shutdown(): Promise
{
if ($this->shutdown) {
throw new StatusError("The worker is not running");
}
@ -125,7 +131,8 @@ abstract class AbstractWorker implements Worker {
/**
* {@inheritdoc}
*/
public function kill() {
public function kill()
{
$this->cancel();
}
@ -136,7 +143,8 @@ abstract class AbstractWorker implements Worker {
*
* @param \Throwable|null $exception Optional exception to be used as the previous exception.
*/
protected function cancel(\Throwable $exception = null) {
protected function cancel(\Throwable $exception = null)
{
if ($this->context->isRunning()) {
$this->context->kill();
}

View File

@ -5,7 +5,8 @@ namespace Amp\Parallel\Worker;
use Amp\Loop;
use Amp\Struct;
class BasicEnvironment implements Environment {
class BasicEnvironment implements Environment
{
/** @var array */
private $data = [];
@ -15,7 +16,8 @@ class BasicEnvironment implements Environment {
/** @var string */
private $timer;
public function __construct() {
public function __construct()
{
$this->queue = $queue = new \SplPriorityQueue;
$data = &$this->data;
@ -68,7 +70,8 @@ class BasicEnvironment implements Environment {
*
* @return bool
*/
public function exists(string $key): bool {
public function exists(string $key): bool
{
return isset($this->data[$key]);
}
@ -77,7 +80,8 @@ class BasicEnvironment implements Environment {
*
* @return mixed|null Returns null if the key does not exist.
*/
public function get(string $key) {
public function get(string $key)
{
if (!isset($this->data[$key])) {
return null;
}
@ -102,7 +106,8 @@ class BasicEnvironment implements Environment {
*
* @throws \Error If the time-to-live is not a positive integer.
*/
public function set(string $key, $value, int $ttl = null) {
public function set(string $key, $value, int $ttl = null)
{
if ($value === null) {
$this->delete($key);
return;
@ -135,7 +140,8 @@ class BasicEnvironment implements Environment {
/**
* @param string $key
*/
public function delete(string $key) {
public function delete(string $key)
{
unset($this->data[$key]);
}
@ -146,7 +152,8 @@ class BasicEnvironment implements Environment {
*
* @return bool
*/
public function offsetExists($key) {
public function offsetExists($key)
{
return $this->exists($key);
}
@ -157,7 +164,8 @@ class BasicEnvironment implements Environment {
*
* @return mixed
*/
public function offsetGet($key) {
public function offsetGet($key)
{
return $this->get($key);
}
@ -167,7 +175,8 @@ class BasicEnvironment implements Environment {
* @param string $key
* @param mixed $value
*/
public function offsetSet($key, $value) {
public function offsetSet($key, $value)
{
$this->set($key, $value);
}
@ -176,14 +185,16 @@ class BasicEnvironment implements Environment {
*
* @param string $key
*/
public function offsetUnset($key) {
public function offsetUnset($key)
{
$this->delete($key);
}
/**
* Removes all values.
*/
public function clear() {
public function clear()
{
$this->data = [];
Loop::disable($this->timer);

View File

@ -13,7 +13,8 @@ use Amp\Promise;
* tasks simultaneously. The load on each worker is balanced such that tasks
* are completed as soon as possible and workers are used efficiently.
*/
class DefaultPool implements Pool {
class DefaultPool implements Pool
{
use CallableMaker;
/** @var bool Indicates if the pool is currently running. */
@ -47,7 +48,8 @@ class DefaultPool implements Pool {
*
* @throws \Error
*/
public function __construct(int $maxSize = self::DEFAULT_MAX_SIZE, WorkerFactory $factory = null) {
public function __construct(int $maxSize = self::DEFAULT_MAX_SIZE, WorkerFactory $factory = null)
{
if ($maxSize < 0) {
throw new \Error("Maximum size must be a non-negative integer");
}
@ -87,7 +89,8 @@ class DefaultPool implements Pool {
*
* @return bool True if the pool is running, otherwise false.
*/
public function isRunning(): bool {
public function isRunning(): bool
{
return $this->running;
}
@ -96,28 +99,32 @@ class DefaultPool implements Pool {
*
* @return bool True if the pool has at least one idle worker, otherwise false.
*/
public function isIdle(): bool {
public function isIdle(): bool
{
return $this->idleWorkers->count() > 0 || $this->workers->count() === 0;
}
/**
* {@inheritdoc}
*/
public function getMaxSize(): int {
public function getMaxSize(): int
{
return $this->maxSize;
}
/**
* {@inheritdoc}
*/
public function getWorkerCount(): int {
public function getWorkerCount(): int
{
return $this->workers->count();
}
/**
* {@inheritdoc}
*/
public function getIdleWorkerCount(): int {
public function getIdleWorkerCount(): int
{
return $this->idleWorkers->count();
}
@ -131,7 +138,8 @@ class DefaultPool implements Pool {
* @throws \Amp\Parallel\Context\StatusError If the pool has been shutdown.
* @throws \Amp\Parallel\Worker\TaskException If the task throws an exception.
*/
public function enqueue(Task $task): Promise {
public function enqueue(Task $task): Promise
{
$worker = $this->pull();
$promise = $worker->enqueue($task);
@ -148,7 +156,8 @@ class DefaultPool implements Pool {
*
* @throws \Amp\Parallel\Context\StatusError If the pool has not been started.
*/
public function shutdown(): Promise {
public function shutdown(): Promise
{
if (!$this->isRunning()) {
throw new StatusError("The pool was shutdown");
}
@ -168,7 +177,8 @@ class DefaultPool implements Pool {
/**
* Kills all workers in the pool and halts the worker pool.
*/
public function kill() {
public function kill()
{
$this->running = false;
foreach ($this->workers as $worker) {
@ -181,7 +191,8 @@ class DefaultPool implements Pool {
*
* @return Worker The worker created.
*/
private function createWorker() {
private function createWorker()
{
$worker = $this->factory->create();
$this->workers->attach($worker, 0);
return $worker;
@ -190,7 +201,8 @@ class DefaultPool implements Pool {
/**
* {@inheritdoc}
*/
public function get(): Worker {
public function get(): Worker
{
return new Internal\PooledWorker($this->pull(), $this->push);
}
@ -200,7 +212,8 @@ class DefaultPool implements Pool {
* @return \Amp\Parallel\Worker\Worker
* @throws \Amp\Parallel\Context\StatusError
*/
protected function pull(): Worker {
protected function pull(): Worker
{
if (!$this->isRunning()) {
throw new StatusError("The pool was shutdown");
}
@ -240,7 +253,8 @@ class DefaultPool implements Pool {
*
* @throws \Error If the worker was not part of this queue.
*/
protected function push(Worker $worker) {
protected function push(Worker $worker)
{
($this->push)($worker); // Kept for BC
}
}

View File

@ -7,7 +7,8 @@ use Amp\Parallel\Context\Thread;
/**
* The built-in worker factory type.
*/
class DefaultWorkerFactory implements WorkerFactory {
class DefaultWorkerFactory implements WorkerFactory
{
/** @var string */
private $className;
@ -17,7 +18,8 @@ class DefaultWorkerFactory implements WorkerFactory {
*
* @throws \Error If the given class name does not exist or does not implement \Amp\Parallel\Worker\Environment.
*/
public function __construct(string $envClassName = BasicEnvironment::class) {
public function __construct(string $envClassName = BasicEnvironment::class)
{
if (!\class_exists($envClassName)) {
throw new \Error(\sprintf("Invalid environment class name '%s'", $envClassName));
}
@ -39,7 +41,8 @@ class DefaultWorkerFactory implements WorkerFactory {
* The type of worker created depends on the extensions available. If multi-threading is enabled, a WorkerThread
* will be created. If threads are not available a WorkerProcess will be created.
*/
public function create(): Worker {
public function create(): Worker
{
if (Thread::supported()) {
return new WorkerThread($this->className);
}

View File

@ -2,7 +2,8 @@
namespace Amp\Parallel\Worker;
interface Environment extends \ArrayAccess {
interface Environment extends \ArrayAccess
{
/**
* @param string $key
*

View File

@ -5,25 +5,29 @@ namespace Amp\Parallel\Worker\Internal;
use Amp\Parallel\Worker\Task;
/** @internal */
class Job {
class Job
{
/** @var string */
private $id;
/** @var \Amp\Parallel\Worker\Task */
private $task;
public function __construct(Task $task) {
public function __construct(Task $task)
{
static $id = 'a';
$this->task = $task;
$this->id = $id++;
}
public function getId(): string {
public function getId(): string
{
return $this->id;
}
public function getTask(): Task {
public function getTask(): Task
{
// Classes that cannot be autoloaded will be unserialized as an instance of __PHP_Incomplete_Class.
if ($this->task instanceof \__PHP_Incomplete_Class) {
throw new \Error(\sprintf("Classes implementing %s must be autoloadable by the Composer autoloader", Task::class));

View File

@ -7,7 +7,8 @@ use Amp\Parallel\Worker\Worker;
use Amp\Promise;
/** @internal */
class PooledWorker implements Worker {
class PooledWorker implements Worker
{
/** @var callable */
private $push;
@ -18,7 +19,8 @@ class PooledWorker implements Worker {
* @param \Amp\Parallel\Worker\Worker $worker
* @param callable $push Callable to push the worker back into the queue.
*/
public function __construct(Worker $worker, callable $push) {
public function __construct(Worker $worker, callable $push)
{
$this->worker = $worker;
$this->push = $push;
}
@ -26,42 +28,48 @@ class PooledWorker implements Worker {
/**
* Automatically pushes the worker back into the queue.
*/
public function __destruct() {
public function __destruct()
{
($this->push)($this->worker);
}
/**
* {@inheritdoc}
*/
public function isRunning(): bool {
public function isRunning(): bool
{
return $this->worker->isRunning();
}
/**
* {@inheritdoc}
*/
public function isIdle(): bool {
public function isIdle(): bool
{
return $this->worker->isIdle();
}
/**
* {@inheritdoc}
*/
public function enqueue(Task $task): Promise {
public function enqueue(Task $task): Promise
{
return $this->worker->enqueue($task);
}
/**
* {@inheritdoc}
*/
public function shutdown(): Promise {
public function shutdown(): Promise
{
return $this->worker->shutdown();
}
/**
* {@inheritdoc}
*/
public function kill() {
public function kill()
{
$this->worker->kill();
}
}

View File

@ -8,7 +8,8 @@ use Amp\Parallel\Worker\TaskException;
use Amp\Promise;
/** @internal */
class TaskFailure extends TaskResult {
class TaskFailure extends TaskResult
{
const PARENT_EXCEPTION = 0;
const PARENT_ERROR = 1;
@ -27,7 +28,8 @@ class TaskFailure extends TaskResult {
/** @var array */
private $trace;
public function __construct(string $id, \Throwable $exception) {
public function __construct(string $id, \Throwable $exception)
{
parent::__construct($id);
$this->type = \get_class($exception);
$this->parent = $exception instanceof \Error ? self::PARENT_ERROR : self::PARENT_EXCEPTION;
@ -36,12 +38,13 @@ class TaskFailure extends TaskResult {
$this->trace = $exception->getTraceAsString();
}
public function promise(): Promise {
public function promise(): Promise
{
switch ($this->parent) {
case self::PARENT_ERROR:
$exception = new TaskError(
$this->type,
sprintf(
\sprintf(
'Uncaught %s in worker with message "%s" and code "%s"',
$this->type,
$this->message,
@ -54,7 +57,7 @@ class TaskFailure extends TaskResult {
default:
$exception = new TaskException(
$this->type,
sprintf(
\sprintf(
'Uncaught %s in worker with message "%s" and code "%s"',
$this->type,
$this->message,

View File

@ -5,21 +5,24 @@ namespace Amp\Parallel\Worker\Internal;
use Amp\Promise;
/** @internal */
abstract class TaskResult {
abstract class TaskResult
{
/** @var string Task identifier. */
private $id;
/**
* @param string $id Task identifier.
*/
public function __construct(string $id) {
public function __construct(string $id)
{
$this->id = $id;
}
/**
* @return string Task identifier.
*/
public function getId(): string {
public function getId(): string
{
return $this->id;
}

View File

@ -6,16 +6,19 @@ use Amp\Promise;
use Amp\Success;
/** @internal */
class TaskSuccess extends TaskResult {
class TaskSuccess extends TaskResult
{
/** @var mixed Result of task. */
private $result;
public function __construct(string $id, $result) {
public function __construct(string $id, $result)
{
parent::__construct($id);
$this->result = $result;
}
public function promise(): Promise {
public function promise(): Promise
{
return new Success($this->result);
}
}

View File

@ -5,7 +5,8 @@ namespace Amp\Parallel\Worker;
/**
* An interface for worker pools.
*/
interface Pool extends Worker {
interface Pool extends Worker
{
/** @var int The default maximum pool size. */
const DEFAULT_MAX_SIZE = 32;

View File

@ -5,7 +5,8 @@ namespace Amp\Parallel\Worker;
/**
* A runnable unit of execution.
*/
interface Task {
interface Task
{
/**
* Runs the task inside the caller's context.
*

View File

@ -2,7 +2,8 @@
namespace Amp\Parallel\Worker;
class TaskError extends \Error {
class TaskError extends \Error
{
/** @var string Class name of error thrown from task. */
private $name;
@ -14,7 +15,8 @@ class TaskError extends \Error {
* @param string $message The panic message.
* @param string $trace The panic stack trace.
*/
public function __construct(string $name, string $message = '', string $trace = '') {
public function __construct(string $name, string $message = '', string $trace = '')
{
parent::__construct($message);
$this->name = $name;
@ -26,7 +28,8 @@ class TaskError extends \Error {
*
* @return string
*/
public function getName(): string {
public function getName(): string
{
return $this->name;
}
@ -35,7 +38,8 @@ class TaskError extends \Error {
*
* @return string
*/
public function getWorkerTrace(): string {
public function getWorkerTrace(): string
{
return $this->trace;
}
}

View File

@ -2,7 +2,8 @@
namespace Amp\Parallel\Worker;
class TaskException extends \Exception {
class TaskException extends \Exception
{
/** @var string Class name of exception thrown from task. */
private $name;
@ -14,7 +15,8 @@ class TaskException extends \Exception {
* @param string $message The panic message.
* @param string $trace The panic stack trace.
*/
public function __construct(string $name, string $message = '', string $trace = '') {
public function __construct(string $name, string $message = '', string $trace = '')
{
parent::__construct($message);
$this->name = $name;
@ -26,7 +28,8 @@ class TaskException extends \Exception {
*
* @return string
*/
public function getName(): string {
public function getName(): string
{
return $this->name;
}
@ -35,7 +38,8 @@ class TaskException extends \Exception {
*
* @return string
*/
public function getWorkerTrace(): string {
public function getWorkerTrace(): string
{
return $this->trace;
}
}

View File

@ -7,14 +7,16 @@ use Amp\Parallel\Sync\Channel;
use Amp\Promise;
use function Amp\call;
class TaskRunner {
class TaskRunner
{
/** @var \Amp\Parallel\Sync\Channel */
private $channel;
/** @var \Amp\Parallel\Worker\Environment */
private $environment;
public function __construct(Channel $channel, Environment $environment) {
public function __construct(Channel $channel, Environment $environment)
{
$this->channel = $channel;
$this->environment = $environment;
}
@ -24,7 +26,8 @@ class TaskRunner {
*
* @return \Amp\Promise
*/
public function run(): Promise {
public function run(): Promise
{
return new Coroutine($this->execute());
}
@ -33,7 +36,8 @@ class TaskRunner {
*
* @return \Generator
*/
private function execute(): \Generator {
private function execute(): \Generator
{
$job = yield $this->channel->receive();
while ($job instanceof Internal\Job) {

View File

@ -7,7 +7,8 @@ use Amp\Promise;
/**
* An interface for a parallel worker thread that runs a queue of tasks.
*/
interface Worker {
interface Worker
{
/**
* Checks if the worker is running.
*

View File

@ -2,12 +2,14 @@
namespace Amp\Parallel\Worker;
class WorkerException extends \Exception {
class WorkerException extends \Exception
{
/**
* @param string $message
* @param \Throwable|null $previous
*/
public function __construct(string $message, \Throwable $previous = null) {
public function __construct(string $message, \Throwable $previous = null)
{
parent::__construct($message, 0, $previous);
}
}

View File

@ -5,7 +5,8 @@ namespace Amp\Parallel\Worker;
/**
* Interface for factories used to create new workers.
*/
interface WorkerFactory {
interface WorkerFactory
{
/**
* Creates a new worker instance.
*

View File

@ -7,7 +7,8 @@ use Amp\Parallel\Context\Process;
/**
* A worker process that executes task objects.
*/
class WorkerProcess extends AbstractWorker {
class WorkerProcess extends AbstractWorker
{
const SCRIPT_PATH = __DIR__ . "/Internal/worker-process.php";
/**
@ -19,7 +20,8 @@ class WorkerProcess extends AbstractWorker {
*
* @throws \Error If the PHP binary path given cannot be found or is not executable.
*/
public function __construct(string $envClassName = BasicEnvironment::class, array $env = [], string $binary = null) {
public function __construct(string $envClassName = BasicEnvironment::class, array $env = [], string $binary = null)
{
$script = [
self::SCRIPT_PATH,
$envClassName,

View File

@ -9,12 +9,14 @@ use Amp\Promise;
/**
* A worker thread that executes task objects.
*/
class WorkerThread extends AbstractWorker {
class WorkerThread extends AbstractWorker
{
/**
* @param string $envClassName Name of class implementing \Amp\Parallel\Worker\Environment to instigate.
* Defaults to \Amp\Parallel\Worker\BasicEnvironment.
*/
public function __construct(string $envClassName = BasicEnvironment::class) {
public function __construct(string $envClassName = BasicEnvironment::class)
{
parent::__construct(new Thread(function (Channel $channel, string $className): Promise {
if (!\class_exists($className)) {
throw new \Error(\sprintf("Invalid environment class name '%s'", $className));

View File

@ -15,7 +15,8 @@ const LOOP_FACTORY_IDENTIFIER = WorkerFactory::class;
*
* @return \Amp\Parallel\Worker\Pool The global worker pool instance.
*/
function pool(Pool $pool = null): Pool {
function pool(Pool $pool = null): Pool
{
if ($pool === null) {
$pool = Loop::getState(LOOP_POOL_IDENTIFIER);
if ($pool) {
@ -36,7 +37,8 @@ function pool(Pool $pool = null): Pool {
*
* @return \Amp\Promise<mixed>
*/
function enqueue(Task $task): Promise {
function enqueue(Task $task): Promise
{
return pool()->enqueue($task);
}
@ -45,7 +47,8 @@ function enqueue(Task $task): Promise {
*
* @return \Amp\Parallel\Worker\Worker
*/
function get(): Worker {
function get(): Worker
{
return pool()->get();
}
@ -54,7 +57,8 @@ function get(): Worker {
*
* @return \Amp\Parallel\Worker\Worker
*/
function create(): Worker {
function create(): Worker
{
return factory()->create();
}
@ -65,7 +69,8 @@ function create(): Worker {
*
* @return \Amp\Parallel\Worker\WorkerFactory
*/
function factory(WorkerFactory $factory = null): WorkerFactory {
function factory(WorkerFactory $factory = null): WorkerFactory
{
if ($factory === null) {
$factory = Loop::getState(LOOP_FACTORY_IDENTIFIER);
if ($factory) {

View File

@ -7,7 +7,8 @@ use Amp\Parallel\Sync\Channel;
use Amp\Parallel\Sync\ExitSuccess;
use Amp\PHPUnit\TestCase;
abstract class AbstractContextTest extends TestCase {
abstract class AbstractContextTest extends TestCase
{
/**
* @param callable $function
*
@ -15,10 +16,11 @@ abstract class AbstractContextTest extends TestCase {
*/
abstract public function createContext(callable $function);
public function testIsRunning() {
public function testIsRunning()
{
Loop::run(function () {
$context = $this->createContext(function () {
usleep(100);
\usleep(100);
});
$this->assertFalse($context->isRunning());
@ -33,10 +35,11 @@ abstract class AbstractContextTest extends TestCase {
});
}
public function testKill() {
public function testKill()
{
Loop::run(function () {
$context = $this->createContext(function () {
usleep(1e6);
\usleep(1e6);
});
yield $context->start();
@ -50,10 +53,11 @@ abstract class AbstractContextTest extends TestCase {
/**
* @expectedException \Amp\Parallel\Context\StatusError
*/
public function testStartWhileRunningThrowsError() {
public function testStartWhileRunningThrowsError()
{
Loop::run(function () {
$context = $this->createContext(function () {
usleep(100);
\usleep(100);
});
yield $context->start();
@ -64,11 +68,12 @@ abstract class AbstractContextTest extends TestCase {
/**
* @expectedException \Amp\Parallel\Context\StatusError
*/
public function testStartMultipleTimesThrowsError() {
public function testStartMultipleTimesThrowsError()
{
$this->assertRunTimeGreaterThan(function () {
Loop::run(function () {
$context = $this->createContext(function () {
sleep(1);
\sleep(1);
});
yield $context->start();
@ -83,7 +88,8 @@ abstract class AbstractContextTest extends TestCase {
/**
* @expectedException \Amp\Parallel\Sync\PanicError
*/
public function testExceptionInContextPanics() {
public function testExceptionInContextPanics()
{
Loop::run(function () {
$context = $this->createContext(function () {
throw new \Exception('Exception in fork.');
@ -97,7 +103,8 @@ abstract class AbstractContextTest extends TestCase {
/**
* @expectedException \Amp\Parallel\Sync\PanicError
*/
public function testReturnUnserializableDataPanics() {
public function testReturnUnserializableDataPanics()
{
Loop::run(function () {
$context = $this->createContext(function () {
return yield function () {};
@ -108,11 +115,12 @@ abstract class AbstractContextTest extends TestCase {
});
}
public function testJoinWaitsForChild() {
public function testJoinWaitsForChild()
{
$this->assertRunTimeGreaterThan(function () {
Loop::run(function () {
$context = $this->createContext(function () {
sleep(1);
\sleep(1);
});
yield $context->start();
@ -124,17 +132,19 @@ abstract class AbstractContextTest extends TestCase {
/**
* @expectedException \Amp\Parallel\Context\StatusError
*/
public function testJoinWithoutStartThrowsError() {
public function testJoinWithoutStartThrowsError()
{
Loop::run(function () {
$context = $this->createContext(function () {
usleep(100);
\usleep(100);
});
yield $context->join();
});
}
public function testJoinResolvesWithContextReturn() {
public function testJoinResolvesWithContextReturn()
{
Loop::run(function () {
$context = $this->createContext(function () {
return 42;
@ -145,7 +155,8 @@ abstract class AbstractContextTest extends TestCase {
});
}
public function testSendAndReceive() {
public function testSendAndReceive()
{
Loop::run(function () {
$context = $this->createContext(function (Channel $channel) {
yield $channel->send(1);
@ -166,7 +177,8 @@ abstract class AbstractContextTest extends TestCase {
* @depends testSendAndReceive
* @expectedException \Amp\Parallel\Sync\SynchronizationError
*/
public function testJoinWhenContextSendingData() {
public function testJoinWhenContextSendingData()
{
Loop::run(function () {
$context = $this->createContext(function (Channel $channel) {
yield $channel->send(0);
@ -182,7 +194,8 @@ abstract class AbstractContextTest extends TestCase {
* @depends testSendAndReceive
* @expectedException \Amp\Parallel\Context\StatusError
*/
public function testReceiveBeforeContextHasStarted() {
public function testReceiveBeforeContextHasStarted()
{
Loop::run(function () {
$context = $this->createContext(function (Channel $channel) {
yield $channel->send(0);
@ -197,7 +210,8 @@ abstract class AbstractContextTest extends TestCase {
* @depends testSendAndReceive
* @expectedException \Amp\Parallel\Context\StatusError
*/
public function testSendBeforeContextHasStarted() {
public function testSendBeforeContextHasStarted()
{
Loop::run(function () {
$context = $this->createContext(function (Channel $channel) {
yield $channel->send(0);
@ -212,7 +226,8 @@ abstract class AbstractContextTest extends TestCase {
* @depends testSendAndReceive
* @expectedException \Amp\Parallel\Sync\SynchronizationError
*/
public function testReceiveWhenContextHasReturned() {
public function testReceiveWhenContextHasReturned()
{
Loop::run(function () {
$context = $this->createContext(function (Channel $channel) {
yield $channel->send(0);
@ -230,7 +245,8 @@ abstract class AbstractContextTest extends TestCase {
* @depends testSendAndReceive
* @expectedException \Error
*/
public function testSendExitResult() {
public function testSendExitResult()
{
Loop::run(function () {
$context = $this->createContext(function (Channel $channel) {
$value = yield $channel->receive();
@ -247,7 +263,8 @@ abstract class AbstractContextTest extends TestCase {
* @expectedException \Amp\Parallel\Context\ContextException
* @expectedExceptionMessage The context stopped responding
*/
public function testExitingContextOnJoin() {
public function testExitingContextOnJoin()
{
Loop::run(function () {
$context = $this->createContext(function () {
exit;
@ -262,7 +279,8 @@ abstract class AbstractContextTest extends TestCase {
* @expectedException \Amp\Parallel\Sync\ChannelException
* @expectedExceptionMessage The channel closed unexpectedly
*/
public function testExitingContextOnReceive() {
public function testExitingContextOnReceive()
{
Loop::run(function () {
$context = $this->createContext(function () {
exit;
@ -277,7 +295,8 @@ abstract class AbstractContextTest extends TestCase {
* @expectedException \Amp\Parallel\Sync\ChannelException
* @expectedExceptionMessage Sending on the channel failed
*/
public function testExitingContextOnSend() {
public function testExitingContextOnSend()
{
Loop::run(function () {
$context = $this->createContext(function () {
exit;

View File

@ -6,8 +6,10 @@ use Amp\Loop;
use Amp\Parallel\Context\Process;
use Amp\PHPUnit\TestCase;
class ProcessTest extends TestCase {
public function testBasicProcess() {
class ProcessTest extends TestCase
{
public function testBasicProcess()
{
Loop::run(function () {
$process = new Process([
__DIR__ . "/test-process.php",
@ -22,7 +24,8 @@ class ProcessTest extends TestCase {
* @expectedException \Amp\Parallel\Sync\PanicError
* @expectedExceptionMessage No string provided
*/
public function testFailingProcess() {
public function testFailingProcess()
{
Loop::run(function () {
$process = new Process(__DIR__ . "/test-process.php");
yield $process->start();
@ -34,7 +37,8 @@ class ProcessTest extends TestCase {
* @expectedException \Amp\Parallel\Sync\PanicError
* @expectedExceptionMessage No script found at 'test-process.php'
*/
public function testInvalidScriptPath() {
public function testInvalidScriptPath()
{
Loop::run(function () {
$process = new Process("test-process.php");
yield $process->start();
@ -46,11 +50,12 @@ class ProcessTest extends TestCase {
* @expectedException \Amp\Parallel\Sync\PanicError
* @expectedExceptionMessage The given data cannot be sent because it is not serializable
*/
public function testInvalidResult() {
public function testInvalidResult()
{
Loop::run(function () {
$process = new Process(__DIR__ . "/invalid-result-process.php");
yield $process->start();
var_dump(yield $process->join());
\var_dump(yield $process->join());
});
}
@ -58,11 +63,12 @@ class ProcessTest extends TestCase {
* @expectedException \Amp\Parallel\Sync\PanicError
* @expectedExceptionMessage did not return a callable function
*/
public function testNoCallbackReturned() {
public function testNoCallbackReturned()
{
Loop::run(function () {
$process = new Process(__DIR__ . "/no-callback-process.php");
yield $process->start();
var_dump(yield $process->join());
\var_dump(yield $process->join());
});
}
@ -70,11 +76,12 @@ class ProcessTest extends TestCase {
* @expectedException \Amp\Parallel\Sync\PanicError
* @expectedExceptionMessage Uncaught ParseError in execution context
*/
public function testParseError() {
public function testParseError()
{
Loop::run(function () {
$process = new Process(__DIR__ . "/parse-error-process.inc");
yield $process->start();
var_dump(yield $process->join());
\var_dump(yield $process->join());
});
}
}

View File

@ -9,15 +9,18 @@ use Amp\Parallel\Context\Thread;
* @group threading
* @requires extension pthreads
*/
class ThreadTest extends AbstractContextTest {
public function createContext(callable $function) {
class ThreadTest extends AbstractContextTest
{
public function createContext(callable $function)
{
return new Thread($function);
}
public function testSpawnStartsThread() {
public function testSpawnStartsThread()
{
Loop::run(function () {
$thread = yield Thread::run(function () {
usleep(100);
\usleep(100);
});
$this->assertTrue($thread->isRunning());

View File

@ -4,7 +4,8 @@ use Amp\Parallel\Sync\Channel;
return function (Channel $channel) {
return new class {
private function __sleep() {
private function __sleep()
{
}
};
};

View File

@ -5,18 +5,21 @@ namespace Amp\Parallel\Test\Sync;
use Amp\PHPUnit\TestCase;
use Amp\Promise;
abstract class AbstractParcelTest extends TestCase {
abstract class AbstractParcelTest extends TestCase
{
/**
* @return \Amp\Parallel\Sync\Parcel
*/
abstract protected function createParcel($value);
public function testUnwrapIsOfCorrectType() {
public function testUnwrapIsOfCorrectType()
{
$object = $this->createParcel(new \stdClass);
$this->assertInstanceOf('stdClass', Promise\wait($object->unwrap()));
}
public function testUnwrapIsEqual() {
public function testUnwrapIsEqual()
{
$object = new \stdClass;
$shared = $this->createParcel($object);
$this->assertEquals($object, Promise\wait($shared->unwrap()));
@ -25,12 +28,13 @@ abstract class AbstractParcelTest extends TestCase {
/**
* @depends testUnwrapIsEqual
*/
public function testSynchronized() {
public function testSynchronized()
{
$parcel = $this->createParcel(0);
$awaitable = $parcel->synchronized(function ($value) {
$this->assertSame(0, $value);
usleep(10000);
\usleep(10000);
return 1;
});
@ -42,7 +46,7 @@ abstract class AbstractParcelTest extends TestCase {
$awaitable = $parcel->synchronized(function ($value) {
$this->assertSame(1, $value);
usleep(10000);
\usleep(10000);
return 2;
});

View File

@ -5,12 +5,14 @@ namespace Amp\Parallel\Test\Sync;
use Amp\Parallel\Sync\ChannelParser;
use Amp\PHPUnit\TestCase;
class ChannelParserTest extends TestCase {
class ChannelParserTest extends TestCase
{
/**
* @expectedException \Amp\Parallel\Sync\SerializationException
* @expectedExceptionMessage Exception thrown when unserializing data
*/
public function testCorruptedData() {
public function testCorruptedData()
{
$data = "Invalid serialized data";
$data = \pack("CL", 0, \strlen($data)) . $data;
$parser = new ChannelParser($this->createCallback(0));
@ -21,7 +23,8 @@ class ChannelParserTest extends TestCase {
* @expectedException \Amp\Parallel\Sync\ChannelException
* @expectedExceptionMessage Invalid packet received: Invalid packet
*/
public function testInvalidHeaderData() {
public function testInvalidHeaderData()
{
$data = "Invalid packet";
$parser = new ChannelParser($this->createCallback(0));
$parser->push($data);
@ -31,7 +34,8 @@ class ChannelParserTest extends TestCase {
* @expectedException \Amp\Parallel\Sync\ChannelException
* @expectedExceptionMessage Invalid packet received: B \xf3\xf2\x0\x1
*/
public function testInvalidHeaderBinaryData() {
public function testInvalidHeaderBinaryData()
{
$data = "\x42\x20\xf3\xf2\x00\x01";
$parser = new ChannelParser($this->createCallback(0));
$parser->push($data);

View File

@ -6,11 +6,13 @@ use Amp\Loop;
use Amp\Parallel\Sync\ChannelledSocket;
use Amp\PHPUnit\TestCase;
class ChannelledSocketTest extends TestCase {
class ChannelledSocketTest extends TestCase
{
/**
* @return resource[]
*/
protected function createSockets() {
protected function createSockets()
{
if (($sockets = @\stream_socket_pair(\stripos(PHP_OS, "win") === 0 ? STREAM_PF_INET : STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP)) === false) {
$message = "Failed to create socket pair";
if ($error = \error_get_last()) {
@ -21,7 +23,8 @@ class ChannelledSocketTest extends TestCase {
return $sockets;
}
public function testSendReceive() {
public function testSendReceive()
{
Loop::run(function () {
list($left, $right) = $this->createSockets();
$a = new ChannelledSocket($left, $left);
@ -38,7 +41,8 @@ class ChannelledSocketTest extends TestCase {
/**
* @depends testSendReceive
*/
public function testSendReceiveLongData() {
public function testSendReceiveLongData()
{
Loop::run(function () {
list($left, $right) = $this->createSockets();
$a = new ChannelledSocket($left, $left);
@ -47,7 +51,7 @@ class ChannelledSocketTest extends TestCase {
$length = 0xffff;
$message = '';
for ($i = 0; $i < $length; ++$i) {
$message .= chr(mt_rand(0, 255));
$message .= \chr(\mt_rand(0, 255));
}
$a->send($message);
@ -60,13 +64,14 @@ class ChannelledSocketTest extends TestCase {
* @depends testSendReceive
* @expectedException \Amp\Parallel\Sync\ChannelException
*/
public function testInvalidDataReceived() {
public function testInvalidDataReceived()
{
Loop::run(function () {
list($left, $right) = $this->createSockets();
$a = new ChannelledSocket($left, $left);
$b = new ChannelledSocket($right, $right);
fwrite($left, pack('L', 10) . '1234567890');
\fwrite($left, \pack('L', 10) . '1234567890');
$data = yield $b->receive();
});
}
@ -75,7 +80,8 @@ class ChannelledSocketTest extends TestCase {
* @depends testSendReceive
* @expectedException \Amp\Parallel\Sync\ChannelException
*/
public function testSendUnserializableData() {
public function testSendUnserializableData()
{
Loop::run(function () {
list($left, $right) = $this->createSockets();
$a = new ChannelledSocket($left, $left);
@ -91,7 +97,8 @@ class ChannelledSocketTest extends TestCase {
* @depends testSendReceive
* @expectedException \Amp\Parallel\Sync\ChannelException
*/
public function testSendAfterClose() {
public function testSendAfterClose()
{
Loop::run(function () {
list($left, $right) = $this->createSockets();
$a = new ChannelledSocket($left, $left);
@ -105,7 +112,8 @@ class ChannelledSocketTest extends TestCase {
* @depends testSendReceive
* @expectedException \Amp\Parallel\Sync\ChannelException
*/
public function testReceiveAfterClose() {
public function testReceiveAfterClose()
{
Loop::run(function () {
list($left, $right) = $this->createSockets();
$a = new ChannelledSocket($left, $left);

View File

@ -11,36 +11,43 @@ use Amp\PHPUnit\TestCase;
use Amp\Promise;
use Amp\Success;
class ChannelledStreamTest extends TestCase {
class ChannelledStreamTest extends TestCase
{
/**
* @return \Amp\ByteStream\InputStream|\Amp\ByteStream\OutputStream
*/
protected function createMockStream() {
protected function createMockStream()
{
return new class implements InputStream, OutputStream {
private $buffer = "";
public function read(): Promise {
public function read(): Promise
{
$data = $this->buffer;
$this->buffer = "";
return new Success($data);
}
public function write(string $data): Promise {
public function write(string $data): Promise
{
$this->buffer .= $data;
return new Success(\strlen($data));
}
public function end(string $finalData = ""): Promise {
public function end(string $finalData = ""): Promise
{
throw new \BadMethodCallException;
}
public function close() {
public function close()
{
throw new \BadMethodCallException;
}
};
}
public function testSendReceive() {
public function testSendReceive()
{
Loop::run(function () {
$mock = $this->createMockStream();
$a = new ChannelledStream($mock, $mock);
@ -57,7 +64,8 @@ class ChannelledStreamTest extends TestCase {
/**
* @depends testSendReceive
*/
public function testSendReceiveLongData() {
public function testSendReceiveLongData()
{
Loop::run(function () {
$mock = $this->createMockStream();
$a = new ChannelledStream($mock, $mock);
@ -66,7 +74,7 @@ class ChannelledStreamTest extends TestCase {
$length = 0xffff;
$message = '';
for ($i = 0; $i < $length; ++$i) {
$message .= chr(mt_rand(0, 255));
$message .= \chr(\mt_rand(0, 255));
}
yield $a->send($message);
@ -79,14 +87,15 @@ class ChannelledStreamTest extends TestCase {
* @depends testSendReceive
* @expectedException \Amp\Parallel\Sync\ChannelException
*/
public function testInvalidDataReceived() {
public function testInvalidDataReceived()
{
Loop::run(function () {
$mock = $this->createMockStream();
$a = new ChannelledStream($mock, $mock);
$b = new ChannelledStream($mock, $mock);
// Close $a. $b should close on next read...
yield $mock->write(pack('L', 10) . '1234567890');
yield $mock->write(\pack('L', 10) . '1234567890');
$data = yield $b->receive();
});
}
@ -95,7 +104,8 @@ class ChannelledStreamTest extends TestCase {
* @depends testSendReceive
* @expectedException \Amp\Parallel\Sync\ChannelException
*/
public function testSendUnserializableData() {
public function testSendUnserializableData()
{
Loop::run(function () {
$mock = $this->createMockStream();
$a = new ChannelledStream($mock, $mock);
@ -111,7 +121,8 @@ class ChannelledStreamTest extends TestCase {
* @depends testSendReceive
* @expectedException \Amp\Parallel\Sync\ChannelException
*/
public function testSendAfterClose() {
public function testSendAfterClose()
{
Loop::run(function () {
$mock = $this->createMock(OutputStream::class);
$mock->expects($this->once())
@ -132,7 +143,8 @@ class ChannelledStreamTest extends TestCase {
* @depends testSendReceive
* @expectedException \Amp\Parallel\Sync\ChannelException
*/
public function testReceiveAfterClose() {
public function testReceiveAfterClose()
{
Loop::run(function () {
$mock = $this->createMock(InputStream::class);
$mock->expects($this->once())

View File

@ -6,8 +6,10 @@ use Amp\Parallel\Sync\ExitFailure;
use Amp\Parallel\Sync\PanicError;
use Amp\PHPUnit\TestCase;
class ExitFailureTest extends TestCase {
public function testGetResult() {
class ExitFailureTest extends TestCase
{
public function testGetResult()
{
$message = "Test message";
$exception = new \Exception($message);
$result = new ExitFailure($exception);

View File

@ -5,8 +5,10 @@ namespace Amp\Parallel\Test\Sync;
use Amp\Parallel\Sync\ExitSuccess;
use Amp\PHPUnit\TestCase;
class ExitSuccessTest extends TestCase {
public function testGetResult() {
class ExitSuccessTest extends TestCase
{
public function testGetResult()
{
$value = 1;
$result = new ExitSuccess($value);
$this->assertSame($value, $result->getResult());

View File

@ -10,21 +10,25 @@ use Amp\Promise;
* @requires extension shmop
* @requires extension sysvmsg
*/
class SharedMemoryParcelTest extends AbstractParcelTest {
class SharedMemoryParcelTest extends AbstractParcelTest
{
const ID = __CLASS__;
private $parcel;
protected function createParcel($value) {
protected function createParcel($value)
{
$this->parcel = SharedMemoryParcel::create(self::ID, $value);
return $this->parcel;
}
public function tearDown() {
public function tearDown()
{
$this->parcel = null;
}
public function testObjectOverflowMoved() {
public function testObjectOverflowMoved()
{
$object = SharedMemoryParcel::create(self::ID, 'hi', 2);
$awaitable = $object->synchronized(function () {
return 'hello world';
@ -38,7 +42,8 @@ class SharedMemoryParcelTest extends AbstractParcelTest {
* @group posix
* @requires extension pcntl
*/
public function testSetInSeparateProcess() {
public function testSetInSeparateProcess()
{
$object = SharedMemoryParcel::create(self::ID, 42);
$this->doInFork(function () use ($object) {
@ -55,7 +60,8 @@ class SharedMemoryParcelTest extends AbstractParcelTest {
* @group posix
* @requires extension pcntl
*/
public function testInSeparateProcess() {
public function testInSeparateProcess()
{
$parcel = SharedMemoryParcel::create(self::ID, 42);
$this->doInFork(function () {

View File

@ -10,12 +10,15 @@ use Amp\Parallel\Sync\ThreadedParcel;
/**
* @requires extension pthreads
*/
class ThreadedParcelTest extends AbstractParcelTest {
protected function createParcel($value) {
class ThreadedParcelTest extends AbstractParcelTest
{
protected function createParcel($value)
{
return new ThreadedParcel($value);
}
public function testWithinThread() {
public function testWithinThread()
{
Loop::run(function () {
$value = 1;
$parcel = new ThreadedParcel($value);

View File

@ -9,7 +9,8 @@ use Amp\Parallel\Worker\Worker;
use Amp\PHPUnit\TestCase;
use Amp\Promise;
abstract class AbstractPoolTest extends TestCase {
abstract class AbstractPoolTest extends TestCase
{
/**
* @param int $min
* @param int $max
@ -18,7 +19,8 @@ abstract class AbstractPoolTest extends TestCase {
*/
abstract protected function createPool($max = Pool::DEFAULT_MAX_SIZE): Pool;
public function testIsRunning() {
public function testIsRunning()
{
Loop::run(function () {
$pool = $this->createPool();
@ -29,7 +31,8 @@ abstract class AbstractPoolTest extends TestCase {
});
}
public function testIsIdleOnStart() {
public function testIsIdleOnStart()
{
Loop::run(function () {
$pool = $this->createPool();
@ -39,12 +42,14 @@ abstract class AbstractPoolTest extends TestCase {
});
}
public function testGetMaxSize() {
public function testGetMaxSize()
{
$pool = $this->createPool(17);
$this->assertEquals(17, $pool->getMaxSize());
}
public function testWorkersIdleOnStart() {
public function testWorkersIdleOnStart()
{
Loop::run(function () {
$pool = $this->createPool();
@ -54,7 +59,8 @@ abstract class AbstractPoolTest extends TestCase {
});
}
public function testEnqueue() {
public function testEnqueue()
{
Loop::run(function () {
$pool = $this->createPool();
@ -65,7 +71,8 @@ abstract class AbstractPoolTest extends TestCase {
});
}
public function testEnqueueMultiple() {
public function testEnqueueMultiple()
{
Loop::run(function () {
$pool = $this->createPool();
@ -81,14 +88,16 @@ abstract class AbstractPoolTest extends TestCase {
});
}
public function testKill() {
public function testKill()
{
$pool = $this->createPool();
$this->assertRunTimeLessThan([$pool, 'kill'], 1000);
$this->assertFalse($pool->isRunning());
}
public function testGet() {
public function testGet()
{
Loop::run(function () {
$pool = $this->createPool();
@ -106,7 +115,8 @@ abstract class AbstractPoolTest extends TestCase {
});
}
public function testBusyPool() {
public function testBusyPool()
{
Loop::run(function () {
$pool = $this->createPool(2);
@ -131,7 +141,8 @@ abstract class AbstractPoolTest extends TestCase {
});
}
public function testCleanGarbageCollection() {
public function testCleanGarbageCollection()
{
// See https://github.com/amphp/parallel-functions/issues/5
Loop::run(function () {
for ($i = 0; $i < 3; $i++) {

View File

@ -10,19 +10,23 @@ use Amp\Parallel\Worker\TaskError;
use Amp\Parallel\Worker\WorkerException;
use Amp\PHPUnit\TestCase;
class NonAutoloadableTask implements Task {
public function run(Environment $environment) {
class NonAutoloadableTask implements Task
{
public function run(Environment $environment)
{
return 1;
}
}
abstract class AbstractWorkerTest extends TestCase {
abstract class AbstractWorkerTest extends TestCase
{
/**
* @return \Amp\Parallel\Worker\Worker
*/
abstract protected function createWorker();
public function testWorkerConstantDefined() {
public function testWorkerConstantDefined()
{
Loop::run(function () {
$worker = $this->createWorker();
$this->assertTrue(yield $worker->enqueue(new ConstantTask));
@ -30,7 +34,8 @@ abstract class AbstractWorkerTest extends TestCase {
});
}
public function testIsRunning() {
public function testIsRunning()
{
Loop::run(function () {
$worker = $this->createWorker();
$this->assertFalse($worker->isRunning());
@ -44,7 +49,8 @@ abstract class AbstractWorkerTest extends TestCase {
});
}
public function testIsIdleOnStart() {
public function testIsIdleOnStart()
{
Loop::run(function () {
$worker = $this->createWorker();
@ -54,7 +60,8 @@ abstract class AbstractWorkerTest extends TestCase {
});
}
public function testEnqueue() {
public function testEnqueue()
{
Loop::run(function () {
$worker = $this->createWorker();
@ -65,7 +72,8 @@ abstract class AbstractWorkerTest extends TestCase {
});
}
public function testEnqueueMultipleSynchronous() {
public function testEnqueueMultipleSynchronous()
{
Loop::run(function () {
$worker = $this->createWorker();
@ -81,7 +89,8 @@ abstract class AbstractWorkerTest extends TestCase {
});
}
public function testEnqueueMultipleAsynchronous() {
public function testEnqueueMultipleAsynchronous()
{
Loop::run(function () {
$worker = $this->createWorker();
@ -104,7 +113,8 @@ abstract class AbstractWorkerTest extends TestCase {
});
}
public function testEnqueueMultipleThenShutdown() {
public function testEnqueueMultipleThenShutdown()
{
Loop::run(function () {
$worker = $this->createWorker();
@ -126,7 +136,8 @@ abstract class AbstractWorkerTest extends TestCase {
});
}
public function testNotIdleOnEnqueue() {
public function testNotIdleOnEnqueue()
{
Loop::run(function () {
$worker = $this->createWorker();
@ -138,7 +149,8 @@ abstract class AbstractWorkerTest extends TestCase {
});
}
public function testKill() {
public function testKill()
{
$worker = $this->createWorker();
$worker->enqueue(new TestTask(42));
@ -147,7 +159,8 @@ abstract class AbstractWorkerTest extends TestCase {
$this->assertFalse($worker->isRunning());
}
public function testNonAutoloadableTask() {
public function testNonAutoloadableTask()
{
Loop::run(function () {
$worker = $this->createWorker();
@ -163,13 +176,15 @@ abstract class AbstractWorkerTest extends TestCase {
});
}
public function testUnserializableTask() {
public function testUnserializableTask()
{
Loop::run(function () {
$worker = $this->createWorker();
try {
yield $worker->enqueue(new class implements Task { // Anonymous classes are not serializable.
public function run(Environment $environment) {
public function run(Environment $environment)
{
}
});
$this->fail("Tasks that cannot be autoloaded should throw an exception");
@ -181,12 +196,14 @@ abstract class AbstractWorkerTest extends TestCase {
});
}
public function testUnserializableTaskFollowedByValidTask() {
public function testUnserializableTaskFollowedByValidTask()
{
Loop::run(function () {
$worker = $this->createWorker();
$promise1 = $worker->enqueue(new class implements Task { // Anonymous classes are not serializable.
public function run(Environment $environment) {
public function run(Environment $environment)
{
}
});
$promise2 = $worker->enqueue(new TestTask(42));

View File

@ -7,8 +7,10 @@ use Amp\Loop;
use Amp\Parallel\Worker\BasicEnvironment;
use Amp\PHPUnit\TestCase;
class BasicEnvironmentTest extends TestCase {
public function testBasicOperations() {
class BasicEnvironmentTest extends TestCase
{
public function testBasicOperations()
{
$environment = new BasicEnvironment;
$key = "key";
@ -27,7 +29,8 @@ class BasicEnvironmentTest extends TestCase {
$this->assertNull($environment->get($key));
}
public function testArrayAccess() {
public function testArrayAccess()
{
$environment = new BasicEnvironment;
$key = "key";
@ -46,7 +49,8 @@ class BasicEnvironmentTest extends TestCase {
$this->assertNull($environment[$key]);
}
public function testClear() {
public function testClear()
{
$environment = new BasicEnvironment;
$environment->set("key1", 1);
@ -58,7 +62,8 @@ class BasicEnvironmentTest extends TestCase {
$this->assertFalse($environment->exists("key2"));
}
public function testTtl() {
public function testTtl()
{
Loop::run(function () {
$environment = new BasicEnvironment;
$key = "key";
@ -74,7 +79,8 @@ class BasicEnvironmentTest extends TestCase {
/**
* @depends testTtl
*/
public function testRemovingTtl() {
public function testRemovingTtl()
{
Loop::run(function () {
$environment = new BasicEnvironment;
$key = "key";
@ -90,7 +96,8 @@ class BasicEnvironmentTest extends TestCase {
});
}
public function testShorteningTtl() {
public function testShorteningTtl()
{
Loop::run(function () {
$environment = new BasicEnvironment;
$key = "key";
@ -104,7 +111,8 @@ class BasicEnvironmentTest extends TestCase {
});
}
public function testLengtheningTtl() {
public function testLengtheningTtl()
{
Loop::run(function () {
$environment = new BasicEnvironment;
$key = "key";
@ -122,7 +130,8 @@ class BasicEnvironmentTest extends TestCase {
});
}
public function testAccessExtendsTtl() {
public function testAccessExtendsTtl()
{
Loop::run(function () {
$environment = new BasicEnvironment;
$key1 = "key1";

View File

@ -5,8 +5,10 @@ namespace Amp\Parallel\Test\Worker;
use Amp\Parallel\Worker\Environment;
use Amp\Parallel\Worker\Task;
class ConstantTask implements Task {
public function run(Environment $environment) {
class ConstantTask implements Task
{
public function run(Environment $environment)
{
return \defined("AMP_WORKER");
}
}

View File

@ -6,12 +6,14 @@ use Amp\Parallel\Worker\DefaultWorkerFactory;
use Amp\Parallel\Worker\Worker;
use Amp\PHPUnit\TestCase;
class DefaultWorkerFactoryTest extends TestCase {
class DefaultWorkerFactoryTest extends TestCase
{
/**
* @expectedException \Error
* @expectedExceptionMessage Invalid environment class name 'Invalid'
*/
public function testInvalidClassName() {
public function testInvalidClassName()
{
$factory = new DefaultWorkerFactory("Invalid");
}
@ -19,11 +21,13 @@ class DefaultWorkerFactoryTest extends TestCase {
* @expectedException \Error
* @expectedExceptionMessage does not implement 'Amp\Parallel\Worker\Environment'
*/
public function testNonEnvironmentClassName() {
public function testNonEnvironmentClassName()
{
$factory = new DefaultWorkerFactory(DefaultWorkerFactory::class);
}
public function testCreate() {
public function testCreate()
{
$factory = new DefaultWorkerFactory;
$this->assertInstanceOf(Worker::class, $factory->create());

View File

@ -11,8 +11,10 @@ use Amp\PHPUnit\TestCase;
use Amp\Promise;
use Amp\Success;
class FunctionsTest extends TestCase {
public function testPool() {
class FunctionsTest extends TestCase
{
public function testPool()
{
$pool = $this->createMock(Pool::class);
Worker\pool($pool);
@ -23,7 +25,8 @@ class FunctionsTest extends TestCase {
/**
* @depends testPool
*/
public function testEnqueue() {
public function testEnqueue()
{
$pool = $this->createMock(Pool::class);
$pool->method('enqueue')
->will($this->returnCallback(function (Task $task): Promise {
@ -44,7 +47,8 @@ class FunctionsTest extends TestCase {
/**
* @depends testPool
*/
public function testGet() {
public function testGet()
{
$pool = $this->createMock(Pool::class);
$pool->expects($this->once())
->method('get')
@ -55,7 +59,8 @@ class FunctionsTest extends TestCase {
$worker = Worker\get();
}
public function testFactory() {
public function testFactory()
{
$factory = $this->createMock(WorkerFactory::class);
Worker\factory($factory);
@ -66,7 +71,8 @@ class FunctionsTest extends TestCase {
/**
* @depends testFactory
*/
public function testCreate() {
public function testCreate()
{
$factory = $this->createMock(WorkerFactory::class);
$factory->expects($this->once())
->method('create')

View File

@ -5,8 +5,10 @@ namespace Amp\Parallel\Test\Worker;
use Amp\Parallel\Worker\Internal\Job;
use Amp\PHPUnit\TestCase;
class JobTest extends TestCase {
public function testGetJob() {
class JobTest extends TestCase
{
public function testGetJob()
{
$task = new TestTask(42);
$job = new Job($task);
$this->assertSame($task, $job->getTask());
@ -16,7 +18,8 @@ class JobTest extends TestCase {
* @expectedException \Error
* @expectedExceptionMessage Classes implementing Amp\Parallel\Worker\Task must be autoloadable by the Composer autoloader
*/
public function testUnserialiableClass() {
public function testUnserialiableClass()
{
$task = new TestTask(42);
$job = new Job($task);
$serialized = \serialize($job);

View File

@ -10,8 +10,10 @@ use Amp\Parallel\Worker\WorkerProcess;
/**
* @group process
*/
class ProcessPoolTest extends AbstractPoolTest {
protected function createPool($max = Pool::DEFAULT_MAX_SIZE): Pool {
class ProcessPoolTest extends AbstractPoolTest
{
protected function createPool($max = Pool::DEFAULT_MAX_SIZE): Pool
{
$factory = $this->createMock(WorkerFactory::class);
$factory->method('create')->will($this->returnCallback(function () {
return new WorkerProcess;

View File

@ -7,12 +7,14 @@ use Amp\Parallel\Worker\Worker;
use Amp\PHPUnit\TestCase;
use Amp\Promise;
class TaskFailureTest extends TestCase {
class TaskFailureTest extends TestCase
{
/**
* @expectedException \Amp\Parallel\Worker\TaskException
* @expectedExceptionMessage Uncaught Exception in worker
*/
public function testWithException() {
public function testWithException()
{
$exception = new \Exception("Message", 1);
$result = new TaskFailure('a', $exception);
Promise\wait($result->promise());
@ -22,7 +24,8 @@ class TaskFailureTest extends TestCase {
* @expectedException \Amp\Parallel\Worker\TaskError
* @expectedExceptionMessage Uncaught Error in worker
*/
public function testWithError() {
public function testWithError()
{
$exception = new \Error("Message", 1);
$result = new TaskFailure('a', $exception);
Promise\wait($result->promise());

View File

@ -6,14 +6,17 @@ use Amp\Parallel\Worker\Internal\TaskSuccess;
use Amp\PHPUnit\TestCase;
use Amp\Promise;
class TaskSuccessTest extends TestCase {
public function testGetId() {
class TaskSuccessTest extends TestCase
{
public function testGetId()
{
$id = 'a';
$result = new TaskSuccess($id, 1);
$this->assertSame($id, $result->getId());
}
public function testPromise() {
public function testPromise()
{
$value = 1;
$result = new TaskSuccess('a', $value);
$this->assertSame($value, Promise\wait($result->promise()));

View File

@ -6,16 +6,19 @@ use Amp\Delayed;
use Amp\Parallel\Worker\Environment;
use Amp\Parallel\Worker\Task;
class TestTask implements Task {
class TestTask implements Task
{
private $returnValue;
private $delay = 0;
public function __construct($returnValue, int $delay = 0) {
public function __construct($returnValue, int $delay = 0)
{
$this->returnValue = $returnValue;
$this->delay = $delay;
}
public function run(Environment $environment) {
public function run(Environment $environment)
{
if ($this->delay) {
return new Delayed($this->delay, $this->returnValue);
}

View File

@ -11,8 +11,10 @@ use Amp\Parallel\Worker\WorkerThread;
* @group threading
* @requires extension pthreads
*/
class ThreadPoolTest extends AbstractPoolTest {
protected function createPool($max = Pool::DEFAULT_MAX_SIZE): Pool {
class ThreadPoolTest extends AbstractPoolTest
{
protected function createPool($max = Pool::DEFAULT_MAX_SIZE): Pool
{
$factory = $this->createMock(WorkerFactory::class);
$factory->method('create')->will($this->returnCallback(function () {
return new WorkerThread;

View File

@ -4,8 +4,10 @@ namespace Amp\Parallel\Test\Worker;
use Amp\Parallel\Worker\WorkerProcess;
class WorkerProcessTest extends AbstractWorkerTest {
protected function createWorker() {
class WorkerProcessTest extends AbstractWorkerTest
{
protected function createWorker()
{
return new WorkerProcess;
}
}

View File

@ -8,8 +8,10 @@ use Amp\Parallel\Worker\WorkerThread;
* @group threading
* @requires extension pthreads
*/
class WorkerThreadTest extends AbstractWorkerTest {
protected function createWorker() {
class WorkerThreadTest extends AbstractWorkerTest
{
protected function createWorker()
{
return new WorkerThread;
}
}