From cfd33799d7e42070cfdea95adab323a2614ecb8a Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Sat, 23 Dec 2017 23:18:09 +0100 Subject: [PATCH] Avoid cyclic references, fixes amphp/parallel-functions#5 --- lib/Context/Process.php | 2 +- lib/Worker/AbstractWorker.php | 57 +++++++++++++++++++++-------------- lib/Worker/DefaultPool.php | 36 +++++++++++++--------- 3 files changed, 57 insertions(+), 38 deletions(-) diff --git a/lib/Context/Process.php b/lib/Context/Process.php index 1d3d53c..1561b43 100644 --- a/lib/Context/Process.php +++ b/lib/Context/Process.php @@ -122,7 +122,7 @@ class Process implements Context { $childStderr = $this->process->getStderr(); $childStderr->unreference(); - asyncCall(function () use ($childStderr) { + asyncCall(static function () use ($childStderr) { $stderr = new ByteStream\ResourceOutputStream(\STDERR); yield ByteStream\pipe($childStderr, $stderr); }); diff --git a/lib/Worker/AbstractWorker.php b/lib/Worker/AbstractWorker.php index 4a7db21..bcd5edd 100644 --- a/lib/Worker/AbstractWorker.php +++ b/lib/Worker/AbstractWorker.php @@ -27,6 +27,9 @@ abstract class AbstractWorker implements Worker { /** @var callable */ private $onResolve; + /** @var callable */ + private $cancel; + /** * @param \Amp\Parallel\Context\Context $context */ @@ -37,34 +40,56 @@ abstract class AbstractWorker implements Worker { $this->context = $context; - $this->onResolve = function ($exception, $data) { + $jobQueue = &$this->jobQueue; + + $this->cancel = static function (\Throwable $exception = null) use (&$jobQueue, &$context) { + if (!empty($jobQueue)) { + $exception = new WorkerException('Worker was shut down', $exception); + + foreach ($jobQueue as $job) { + $job->fail($exception); + } + + $jobQueue = []; + } + + if ($context->isRunning()) { + $context->kill(); + } + }; + + $cancel = &$this->cancel; + + $this->onResolve = static function ($exception, $data) use (&$jobQueue, &$cancel, &$context, &$onResolve) { if ($exception) { - $this->cancel($exception); + $cancel($exception); return; } if (!$data instanceof Internal\TaskResult) { - $this->cancel(new ContextException("Context did not return a task result")); + $cancel(new ContextException("Context did not return a task result")); return; } $id = $data->getId(); - if (!isset($this->jobQueue[$id])) { - $this->cancel(new ContextException("Job ID returned by context does not exist")); + if (!isset($jobQueue[$id])) { + $cancel(new ContextException("Job ID returned by context does not exist")); return; } - $deferred = $this->jobQueue[$id]; - unset($this->jobQueue[$id]); - $empty = empty($this->jobQueue); + $deferred = $jobQueue[$id]; + unset($jobQueue[$id]); + $empty = empty($jobQueue); $deferred->resolve($data->promise()); if (!$empty) { - $this->context->receive()->onResolve($this->onResolve); + $context->receive()->onResolve($onResolve); } }; + + $onResolve = $this->onResolve; } /** @@ -155,18 +180,6 @@ abstract class AbstractWorker implements Worker { * @param \Throwable|null $exception Optional exception to be used as the previous exception. */ protected function cancel(\Throwable $exception = null) { - if (!empty($this->jobQueue)) { - $exception = new WorkerException('Worker was shut down', $exception); - - foreach ($this->jobQueue as $job) { - $job->fail($exception); - } - - $this->jobQueue = []; - } - - if ($this->context->isRunning()) { - $this->context->kill(); - } + ($this->cancel)($exception); } } diff --git a/lib/Worker/DefaultPool.php b/lib/Worker/DefaultPool.php index 3ab0e0d..6a5a2b6 100644 --- a/lib/Worker/DefaultPool.php +++ b/lib/Worker/DefaultPool.php @@ -61,7 +61,25 @@ class DefaultPool implements Pool { $this->idleWorkers = new \SplQueue; $this->busyQueue = new \SplQueue; - $this->push = $this->callableFromInstanceMethod("push"); + $workers = &$this->workers; + $idleWorkers = &$this->idleWorkers; + $busyQueue = &$this->busyQueue; + + $this->push = static function (Worker $worker) use (&$workers, &$idleWorkers, &$busyQueue) { + \assert($workers->contains($worker), "The provided worker was not part of this queue"); + + if (($workers[$worker] -= 1) === 0) { + // Worker is completely idle, remove from busy queue and add to idle queue. + foreach ($busyQueue as $key => $busy) { + if ($busy === $worker) { + unset($busyQueue[$key]); + break; + } + } + + $idleWorkers->push($worker); + } + }; } /** @@ -118,7 +136,7 @@ class DefaultPool implements Pool { $promise = $worker->enqueue($task); $promise->onResolve(function () use ($worker) { - $this->push($worker); + ($this->push)($worker); }); return $promise; } @@ -223,18 +241,6 @@ class DefaultPool implements Pool { * @throws \Error If the worker was not part of this queue. */ protected function push(Worker $worker) { - \assert($this->workers->contains($worker), "The provided worker was not part of this queue"); - - if (($this->workers[$worker] -= 1) === 0) { - // Worker is completely idle, remove from busy queue and add to idle queue. - foreach ($this->busyQueue as $key => $busy) { - if ($busy === $worker) { - unset($this->busyQueue[$key]); - break; - } - } - - $this->idleWorkers->push($worker); - } + ($this->push)($worker); // Kept for BC } }