Avoid cyclic references, fixes amphp/parallel-functions#5

This commit is contained in:
Niklas Keller 2017-12-23 23:18:09 +01:00 committed by Aaron Piotrowski
parent 074e2f215f
commit cfd33799d7
3 changed files with 57 additions and 38 deletions

View File

@ -122,7 +122,7 @@ class Process implements Context {
$childStderr = $this->process->getStderr(); $childStderr = $this->process->getStderr();
$childStderr->unreference(); $childStderr->unreference();
asyncCall(function () use ($childStderr) { asyncCall(static function () use ($childStderr) {
$stderr = new ByteStream\ResourceOutputStream(\STDERR); $stderr = new ByteStream\ResourceOutputStream(\STDERR);
yield ByteStream\pipe($childStderr, $stderr); yield ByteStream\pipe($childStderr, $stderr);
}); });

View File

@ -27,6 +27,9 @@ abstract class AbstractWorker implements Worker {
/** @var callable */ /** @var callable */
private $onResolve; private $onResolve;
/** @var callable */
private $cancel;
/** /**
* @param \Amp\Parallel\Context\Context $context * @param \Amp\Parallel\Context\Context $context
*/ */
@ -37,34 +40,56 @@ abstract class AbstractWorker implements Worker {
$this->context = $context; $this->context = $context;
$this->onResolve = function ($exception, $data) { $jobQueue = &$this->jobQueue;
$this->cancel = static function (\Throwable $exception = null) use (&$jobQueue, &$context) {
if (!empty($jobQueue)) {
$exception = new WorkerException('Worker was shut down', $exception);
foreach ($jobQueue as $job) {
$job->fail($exception);
}
$jobQueue = [];
}
if ($context->isRunning()) {
$context->kill();
}
};
$cancel = &$this->cancel;
$this->onResolve = static function ($exception, $data) use (&$jobQueue, &$cancel, &$context, &$onResolve) {
if ($exception) { if ($exception) {
$this->cancel($exception); $cancel($exception);
return; return;
} }
if (!$data instanceof Internal\TaskResult) { if (!$data instanceof Internal\TaskResult) {
$this->cancel(new ContextException("Context did not return a task result")); $cancel(new ContextException("Context did not return a task result"));
return; return;
} }
$id = $data->getId(); $id = $data->getId();
if (!isset($this->jobQueue[$id])) { if (!isset($jobQueue[$id])) {
$this->cancel(new ContextException("Job ID returned by context does not exist")); $cancel(new ContextException("Job ID returned by context does not exist"));
return; return;
} }
$deferred = $this->jobQueue[$id]; $deferred = $jobQueue[$id];
unset($this->jobQueue[$id]); unset($jobQueue[$id]);
$empty = empty($this->jobQueue); $empty = empty($jobQueue);
$deferred->resolve($data->promise()); $deferred->resolve($data->promise());
if (!$empty) { if (!$empty) {
$this->context->receive()->onResolve($this->onResolve); $context->receive()->onResolve($onResolve);
} }
}; };
$onResolve = $this->onResolve;
} }
/** /**
@ -155,18 +180,6 @@ abstract class AbstractWorker implements Worker {
* @param \Throwable|null $exception Optional exception to be used as the previous exception. * @param \Throwable|null $exception Optional exception to be used as the previous exception.
*/ */
protected function cancel(\Throwable $exception = null) { protected function cancel(\Throwable $exception = null) {
if (!empty($this->jobQueue)) { ($this->cancel)($exception);
$exception = new WorkerException('Worker was shut down', $exception);
foreach ($this->jobQueue as $job) {
$job->fail($exception);
}
$this->jobQueue = [];
}
if ($this->context->isRunning()) {
$this->context->kill();
}
} }
} }

View File

@ -61,7 +61,25 @@ class DefaultPool implements Pool {
$this->idleWorkers = new \SplQueue; $this->idleWorkers = new \SplQueue;
$this->busyQueue = new \SplQueue; $this->busyQueue = new \SplQueue;
$this->push = $this->callableFromInstanceMethod("push"); $workers = &$this->workers;
$idleWorkers = &$this->idleWorkers;
$busyQueue = &$this->busyQueue;
$this->push = static function (Worker $worker) use (&$workers, &$idleWorkers, &$busyQueue) {
\assert($workers->contains($worker), "The provided worker was not part of this queue");
if (($workers[$worker] -= 1) === 0) {
// Worker is completely idle, remove from busy queue and add to idle queue.
foreach ($busyQueue as $key => $busy) {
if ($busy === $worker) {
unset($busyQueue[$key]);
break;
}
}
$idleWorkers->push($worker);
}
};
} }
/** /**
@ -118,7 +136,7 @@ class DefaultPool implements Pool {
$promise = $worker->enqueue($task); $promise = $worker->enqueue($task);
$promise->onResolve(function () use ($worker) { $promise->onResolve(function () use ($worker) {
$this->push($worker); ($this->push)($worker);
}); });
return $promise; return $promise;
} }
@ -223,18 +241,6 @@ class DefaultPool implements Pool {
* @throws \Error If the worker was not part of this queue. * @throws \Error If the worker was not part of this queue.
*/ */
protected function push(Worker $worker) { protected function push(Worker $worker) {
\assert($this->workers->contains($worker), "The provided worker was not part of this queue"); ($this->push)($worker); // Kept for BC
if (($this->workers[$worker] -= 1) === 0) {
// Worker is completely idle, remove from busy queue and add to idle queue.
foreach ($this->busyQueue as $key => $busy) {
if ($busy === $worker) {
unset($this->busyQueue[$key]);
break;
}
}
$this->idleWorkers->push($worker);
}
} }
} }