From 193581d6c931094e7f489be1d301686a88880678 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sun, 21 Aug 2016 10:33:39 -0500 Subject: [PATCH] Run multiple tasks simultaneously on workers --- lib/Worker/AbstractWorker.php | 135 ++++++++++++---------------- lib/Worker/Internal/Job.php | 26 ++++++ lib/Worker/Internal/TaskFailure.php | 39 ++++---- lib/Worker/Internal/TaskResult.php | 17 ++++ lib/Worker/Internal/TaskRunner.php | 49 +++++----- lib/Worker/Internal/TaskSuccess.php | 27 ++++++ 6 files changed, 169 insertions(+), 124 deletions(-) create mode 100644 lib/Worker/Internal/Job.php create mode 100644 lib/Worker/Internal/TaskResult.php create mode 100644 lib/Worker/Internal/TaskSuccess.php diff --git a/lib/Worker/AbstractWorker.php b/lib/Worker/AbstractWorker.php index 6a33538..920d6c4 100644 --- a/lib/Worker/AbstractWorker.php +++ b/lib/Worker/AbstractWorker.php @@ -3,41 +3,59 @@ namespace Amp\Concurrent\Worker; use Amp\Concurrent\{ StatusError, Strand, WorkerException} ; -use Amp\Concurrent\Worker\Internal\TaskFailure; -use Amp\Coroutine; -use Amp\Deferred; +use Amp\Concurrent\Worker\Internal\{ Job, TaskResult }; +use Amp\{ Coroutine, Deferred }; use Interop\Async\Awaitable; /** * Base class for most common types of task workers. */ abstract class AbstractWorker implements Worker { - /** - * @var \Amp\Concurrent\Strand - */ + /** @var \Amp\Concurrent\Strand */ private $context; - /** - * @var bool - */ + /** @var bool */ private $shutdown = false; - - /** - * @var \Amp\Coroutine - */ - private $active; - - /** - * @var \SplQueue - */ - private $busyQueue; + + /** @var \Amp\Deferred[] */ + private $jobQueue = []; + + /** @var callable */ + private $when; /** * @param \Amp\Concurrent\Strand $strand */ public function __construct(Strand $strand) { $this->context = $strand; - $this->busyQueue = new \SplQueue; + + $this->when = function ($exception, $data) { + if ($exception) { + $this->kill(); + return; + } + + if (!$data instanceof TaskResult) { + $this->kill(); + return; + } + + $id = $data->getId(); + + if (!isset($this->jobQueue[$id])) { + $this->kill(); + return; + } + + $deferred = $this->jobQueue[$id]; + unset($this->jobQueue[$id]); + + if (!empty($this->jobQueue)) { + $this->context->receive()->when($this->when); + } + + $deferred->resolve($data->getAwaitable()); + }; } /** @@ -51,7 +69,7 @@ abstract class AbstractWorker implements Worker { * {@inheritdoc} */ public function isIdle(): bool { - return null === $this->active; + return empty($this->jobQueue); } /** @@ -87,54 +105,30 @@ abstract class AbstractWorker implements Worker { * @throws \Amp\Concurrent\WorkerException */ private function doEnqueue(Task $task): \Generator { - // If the worker is currently busy, store the task in a busy queue. - if (null !== $this->active) { - $deferred = new Deferred; - $this->busyQueue->enqueue($deferred); - yield $deferred->getAwaitable(); + if (empty($this->jobQueue)) { + $this->context->receive()->when($this->when); } - - $this->active = new Coroutine($this->send($task)); - + try { - $result = yield $this->active; + $job = new Job($task); + $this->jobQueue[$job->getId()] = $deferred = new Deferred; + yield $this->context->send($job); } catch (\Throwable $exception) { $this->kill(); throw new WorkerException('Sending the task to the worker failed.', $exception); - } finally { - $this->active = null; } - - // We're no longer busy at the moment, so dequeue a waiting task. - if (!$this->busyQueue->isEmpty()) { - $this->busyQueue->dequeue()->resolve(); - } - - if ($result instanceof TaskFailure) { - throw $result->getException(); - } - - return $result; - } - - /** - * @coroutine - * - * @param \Amp\Concurrent\Worker\Task $task - * - * @return \Generator - * - * @resolve mixed - */ - private function send(Task $task): \Generator { - yield $this->context->send($task); - return yield $this->context->receive(); + + return yield $deferred->getAwaitable(); } /** * {@inheritdoc} */ public function shutdown(): Awaitable { + if (!$this->context->isRunning() || $this->shutdown) { + throw new StatusError('The worker is not running.'); + } + return new Coroutine($this->doShutdown()); } @@ -142,23 +136,10 @@ abstract class AbstractWorker implements Worker { * {@inheritdoc} */ private function doShutdown(): \Generator { - if (!$this->context->isRunning() || $this->shutdown) { - throw new StatusError('The worker is not running.'); - } - $this->shutdown = true; - // Cancel any waiting tasks. - $this->cancelPending(); - // If a task is currently running, wait for it to finish. - if (null !== $this->active) { - try { - yield $this->active; - } catch (\Throwable $exception) { - // Ignore failure in this context. - } - } + yield \Amp\any($this->jobQueue); yield $this->context->send(0); return yield $this->context->join(); @@ -176,12 +157,14 @@ abstract class AbstractWorker implements Worker { * Cancels all pending tasks. */ private function cancelPending() { - if (!$this->busyQueue->isEmpty()) { + if (!empty($this->jobQueue)) { $exception = new WorkerException('Worker was shut down.'); - - do { - $this->busyQueue->dequeue()->fail($exception); - } while (!$this->busyQueue->isEmpty()); + + foreach ($this->jobQueue as $job) { + $job->fail($exception); + } + + $this->jobQueue = []; } } } diff --git a/lib/Worker/Internal/Job.php b/lib/Worker/Internal/Job.php new file mode 100644 index 0000000..914622d --- /dev/null +++ b/lib/Worker/Internal/Job.php @@ -0,0 +1,26 @@ +task = $task; + $this->id = \spl_object_hash($this->task); + } + + public function getId(): string { + return $this->id; + } + + public function getTask(): Task { + return $this->task; + } +} diff --git a/lib/Worker/Internal/TaskFailure.php b/lib/Worker/Internal/TaskFailure.php index 51ec9c9..3196985 100644 --- a/lib/Worker/Internal/TaskFailure.php +++ b/lib/Worker/Internal/TaskFailure.php @@ -3,43 +3,42 @@ namespace Amp\Concurrent\Worker\Internal; use Amp\Concurrent\TaskException; +use Amp\Failure; +use Interop\Async\Awaitable; -class TaskFailure { - /** - * @var string - */ +class TaskFailure implements TaskResult { + /** @var string */ + private $id; + + /** @var string */ private $type; - /** - * @var string - */ + /** @var string */ private $message; - /** - * @var int - */ + /** @var int */ private $code; - /** - * @var array - */ + /** @var array */ private $trace; - public function __construct(\Throwable $exception) { + public function __construct(string $id, \Throwable $exception) { + $this->id = $id; $this->type = get_class($exception); $this->message = $exception->getMessage(); $this->code = $exception->getCode(); $this->trace = $exception->getTraceAsString(); } - /** - * {@inheritdoc} - */ - public function getException() { - return new TaskException( + public function getId(): string { + return $this->id; + } + + public function getAwaitable(): Awaitable { + return new Failure(new TaskException( sprintf('Uncaught exception in worker of type "%s" with message "%s"', $this->type, $this->message), $this->code, $this->trace - ); + )); } } \ No newline at end of file diff --git a/lib/Worker/Internal/TaskResult.php b/lib/Worker/Internal/TaskResult.php new file mode 100644 index 0000000..dd5d808 --- /dev/null +++ b/lib/Worker/Internal/TaskResult.php @@ -0,0 +1,17 @@ + Resolved with the task result or failure reason. + */ + public function getAwaitable(): Awaitable; +} \ No newline at end of file diff --git a/lib/Worker/Internal/TaskRunner.php b/lib/Worker/Internal/TaskRunner.php index 08bcc3e..ae4d39e 100644 --- a/lib/Worker/Internal/TaskRunner.php +++ b/lib/Worker/Internal/TaskRunner.php @@ -2,17 +2,11 @@ namespace Amp\Concurrent\Worker\Internal; -use Amp\Concurrent\Sync\Channel; -use Amp\Concurrent\Worker\{ Environment, Task }; -use Amp\Coroutine; +use Amp\Concurrent\{ Sync\Channel, Worker\Environment }; +use Amp\{ Coroutine, Failure, Success }; use Interop\Async\Awaitable; class TaskRunner { - /** - * @var bool - */ - private $idle = true; - /** * @var \Amp\Concurrent\Sync\Channel */ @@ -43,11 +37,11 @@ class TaskRunner { * @return \Generator */ private function execute(): \Generator { - $task = yield $this->channel->receive(); - - while ($task instanceof Task) { - $this->idle = false; + $job = yield $this->channel->receive(); + while ($job instanceof Job) { + $task = $job->getTask(); + try { $result = $task->run($this->environment); @@ -55,27 +49,26 @@ class TaskRunner { $result = new Coroutine($result); } - if ($result instanceof Awaitable) { - $result = yield $result; + if (!$result instanceof Awaitable) { + $result = new Success($result); } } catch (\Throwable $exception) { - $result = new TaskFailure($exception); + $result = new Failure($exception); } + + $result->when(function ($exception, $value) use ($job) { + if ($exception) { + $result = new TaskFailure($job->getId(), $exception); + } else { + $result = new TaskSuccess($job->getId(), $value); + } + + $this->channel->send($result); + }); - yield $this->channel->send($result); - - $this->idle = true; - - $task = yield $this->channel->receive(); + $job = yield $this->channel->receive(); } - return $task; - } - - /** - * @return bool - */ - public function isIdle(): bool { - return $this->idle; + return $job; } } diff --git a/lib/Worker/Internal/TaskSuccess.php b/lib/Worker/Internal/TaskSuccess.php new file mode 100644 index 0000000..95ead50 --- /dev/null +++ b/lib/Worker/Internal/TaskSuccess.php @@ -0,0 +1,27 @@ +id = $id; + $this->result = $result; + } + + public function getId(): string { + return $this->id; + } + + public function getAwaitable(): Awaitable { + return new Success($this->result); + } +}