Run multiple tasks simultaneously on workers

This commit is contained in:
Aaron Piotrowski 2016-08-21 10:33:39 -05:00
parent 347c4615b1
commit 193581d6c9
6 changed files with 169 additions and 124 deletions

View File

@ -3,41 +3,59 @@
namespace Amp\Concurrent\Worker;
use Amp\Concurrent\{ StatusError, Strand, WorkerException} ;
use Amp\Concurrent\Worker\Internal\TaskFailure;
use Amp\Coroutine;
use Amp\Deferred;
use Amp\Concurrent\Worker\Internal\{ Job, TaskResult };
use Amp\{ Coroutine, Deferred };
use Interop\Async\Awaitable;
/**
* Base class for most common types of task workers.
*/
abstract class AbstractWorker implements Worker {
/**
* @var \Amp\Concurrent\Strand
*/
/** @var \Amp\Concurrent\Strand */
private $context;
/**
* @var bool
*/
/** @var bool */
private $shutdown = false;
/**
* @var \Amp\Coroutine
*/
private $active;
/**
* @var \SplQueue
*/
private $busyQueue;
/** @var \Amp\Deferred[] */
private $jobQueue = [];
/** @var callable */
private $when;
/**
* @param \Amp\Concurrent\Strand $strand
*/
public function __construct(Strand $strand) {
$this->context = $strand;
$this->busyQueue = new \SplQueue;
$this->when = function ($exception, $data) {
if ($exception) {
$this->kill();
return;
}
if (!$data instanceof TaskResult) {
$this->kill();
return;
}
$id = $data->getId();
if (!isset($this->jobQueue[$id])) {
$this->kill();
return;
}
$deferred = $this->jobQueue[$id];
unset($this->jobQueue[$id]);
if (!empty($this->jobQueue)) {
$this->context->receive()->when($this->when);
}
$deferred->resolve($data->getAwaitable());
};
}
/**
@ -51,7 +69,7 @@ abstract class AbstractWorker implements Worker {
* {@inheritdoc}
*/
public function isIdle(): bool {
return null === $this->active;
return empty($this->jobQueue);
}
/**
@ -87,54 +105,30 @@ abstract class AbstractWorker implements Worker {
* @throws \Amp\Concurrent\WorkerException
*/
private function doEnqueue(Task $task): \Generator {
// If the worker is currently busy, store the task in a busy queue.
if (null !== $this->active) {
$deferred = new Deferred;
$this->busyQueue->enqueue($deferred);
yield $deferred->getAwaitable();
if (empty($this->jobQueue)) {
$this->context->receive()->when($this->when);
}
$this->active = new Coroutine($this->send($task));
try {
$result = yield $this->active;
$job = new Job($task);
$this->jobQueue[$job->getId()] = $deferred = new Deferred;
yield $this->context->send($job);
} catch (\Throwable $exception) {
$this->kill();
throw new WorkerException('Sending the task to the worker failed.', $exception);
} finally {
$this->active = null;
}
// We're no longer busy at the moment, so dequeue a waiting task.
if (!$this->busyQueue->isEmpty()) {
$this->busyQueue->dequeue()->resolve();
}
if ($result instanceof TaskFailure) {
throw $result->getException();
}
return $result;
}
/**
* @coroutine
*
* @param \Amp\Concurrent\Worker\Task $task
*
* @return \Generator
*
* @resolve mixed
*/
private function send(Task $task): \Generator {
yield $this->context->send($task);
return yield $this->context->receive();
return yield $deferred->getAwaitable();
}
/**
* {@inheritdoc}
*/
public function shutdown(): Awaitable {
if (!$this->context->isRunning() || $this->shutdown) {
throw new StatusError('The worker is not running.');
}
return new Coroutine($this->doShutdown());
}
@ -142,23 +136,10 @@ abstract class AbstractWorker implements Worker {
* {@inheritdoc}
*/
private function doShutdown(): \Generator {
if (!$this->context->isRunning() || $this->shutdown) {
throw new StatusError('The worker is not running.');
}
$this->shutdown = true;
// Cancel any waiting tasks.
$this->cancelPending();
// If a task is currently running, wait for it to finish.
if (null !== $this->active) {
try {
yield $this->active;
} catch (\Throwable $exception) {
// Ignore failure in this context.
}
}
yield \Amp\any($this->jobQueue);
yield $this->context->send(0);
return yield $this->context->join();
@ -176,12 +157,14 @@ abstract class AbstractWorker implements Worker {
* Cancels all pending tasks.
*/
private function cancelPending() {
if (!$this->busyQueue->isEmpty()) {
if (!empty($this->jobQueue)) {
$exception = new WorkerException('Worker was shut down.');
do {
$this->busyQueue->dequeue()->fail($exception);
} while (!$this->busyQueue->isEmpty());
foreach ($this->jobQueue as $job) {
$job->fail($exception);
}
$this->jobQueue = [];
}
}
}

View File

@ -0,0 +1,26 @@
<?php
namespace Amp\Concurrent\Worker\Internal;
use Amp\Concurrent\Worker\Task;
class Job {
/** @var string */
private $id;
/** @var \Amp\Concurrent\Worker\Task */
private $task;
public function __construct(Task $task) {
$this->task = $task;
$this->id = \spl_object_hash($this->task);
}
public function getId(): string {
return $this->id;
}
public function getTask(): Task {
return $this->task;
}
}

View File

@ -3,43 +3,42 @@
namespace Amp\Concurrent\Worker\Internal;
use Amp\Concurrent\TaskException;
use Amp\Failure;
use Interop\Async\Awaitable;
class TaskFailure {
/**
* @var string
*/
class TaskFailure implements TaskResult {
/** @var string */
private $id;
/** @var string */
private $type;
/**
* @var string
*/
/** @var string */
private $message;
/**
* @var int
*/
/** @var int */
private $code;
/**
* @var array
*/
/** @var array */
private $trace;
public function __construct(\Throwable $exception) {
public function __construct(string $id, \Throwable $exception) {
$this->id = $id;
$this->type = get_class($exception);
$this->message = $exception->getMessage();
$this->code = $exception->getCode();
$this->trace = $exception->getTraceAsString();
}
/**
* {@inheritdoc}
*/
public function getException() {
return new TaskException(
public function getId(): string {
return $this->id;
}
public function getAwaitable(): Awaitable {
return new Failure(new TaskException(
sprintf('Uncaught exception in worker of type "%s" with message "%s"', $this->type, $this->message),
$this->code,
$this->trace
);
));
}
}

View File

@ -0,0 +1,17 @@
<?php
namespace Amp\Concurrent\Worker\Internal;
use Interop\Async\Awaitable;
interface TaskResult {
/**
* @return string Task identifier.
*/
public function getId(): string;
/**
* @return \Interop\Async\Awaitable<mixed> Resolved with the task result or failure reason.
*/
public function getAwaitable(): Awaitable;
}

View File

@ -2,17 +2,11 @@
namespace Amp\Concurrent\Worker\Internal;
use Amp\Concurrent\Sync\Channel;
use Amp\Concurrent\Worker\{ Environment, Task };
use Amp\Coroutine;
use Amp\Concurrent\{ Sync\Channel, Worker\Environment };
use Amp\{ Coroutine, Failure, Success };
use Interop\Async\Awaitable;
class TaskRunner {
/**
* @var bool
*/
private $idle = true;
/**
* @var \Amp\Concurrent\Sync\Channel
*/
@ -43,11 +37,11 @@ class TaskRunner {
* @return \Generator
*/
private function execute(): \Generator {
$task = yield $this->channel->receive();
while ($task instanceof Task) {
$this->idle = false;
$job = yield $this->channel->receive();
while ($job instanceof Job) {
$task = $job->getTask();
try {
$result = $task->run($this->environment);
@ -55,27 +49,26 @@ class TaskRunner {
$result = new Coroutine($result);
}
if ($result instanceof Awaitable) {
$result = yield $result;
if (!$result instanceof Awaitable) {
$result = new Success($result);
}
} catch (\Throwable $exception) {
$result = new TaskFailure($exception);
$result = new Failure($exception);
}
$result->when(function ($exception, $value) use ($job) {
if ($exception) {
$result = new TaskFailure($job->getId(), $exception);
} else {
$result = new TaskSuccess($job->getId(), $value);
}
$this->channel->send($result);
});
yield $this->channel->send($result);
$this->idle = true;
$task = yield $this->channel->receive();
$job = yield $this->channel->receive();
}
return $task;
}
/**
* @return bool
*/
public function isIdle(): bool {
return $this->idle;
return $job;
}
}

View File

@ -0,0 +1,27 @@
<?php
namespace Amp\Concurrent\Worker\Internal;
use Amp\Success;
use Interop\Async\Awaitable;
class TaskSuccess implements TaskResult {
/** @var string */
private $id;
/** @var mixed Result of task. */
private $result;
public function __construct(string $id, $result) {
$this->id = $id;
$this->result = $result;
}
public function getId(): string {
return $this->id;
}
public function getAwaitable(): Awaitable {
return new Success($this->result);
}
}