From 6be72c67542a5eb3204c7399912d9b49011c6ed6 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Wed, 6 Dec 2017 15:59:28 -0600 Subject: [PATCH] Run tasks in-order Tasks on Windows must be run in-order because STDIO pipes are blocking. IPC be refactored to use sockets in the future if desired. --- lib/Worker/Internal/Job.php | 7 +------ lib/Worker/TaskRunner.php | 19 ++++++++++--------- test/Worker/AbstractWorkerTest.php | 2 +- 3 files changed, 12 insertions(+), 16 deletions(-) diff --git a/lib/Worker/Internal/Job.php b/lib/Worker/Internal/Job.php index 75d3f3f..d6269cc 100644 --- a/lib/Worker/Internal/Job.php +++ b/lib/Worker/Internal/Job.php @@ -2,7 +2,6 @@ namespace Amp\Parallel\Worker\Internal; -use Amp\Parallel\Worker\Environment; use Amp\Parallel\Worker\Task; /** @internal */ @@ -25,11 +24,7 @@ class Job { public function getTask(): Task { // Classes that cannot be autoloaded will be unserialized as an instance of __PHP_Incomplete_Class. if ($this->task instanceof \__PHP_Incomplete_Class) { - return new class implements Task { - public function run(Environment $environment) { - throw new \Error(\sprintf("Classes implementing %s must be autoloadable by the Composer autoloader", Task::class)); - } - }; + throw new \Error(\sprintf("Classes implementing %s must be autoloadable by the Composer autoloader", Task::class)); } return $this->task; diff --git a/lib/Worker/TaskRunner.php b/lib/Worker/TaskRunner.php index 403938e..5ca7823 100644 --- a/lib/Worker/TaskRunner.php +++ b/lib/Worker/TaskRunner.php @@ -37,17 +37,18 @@ class TaskRunner { $job = yield $this->channel->receive(); while ($job instanceof Internal\Job) { - call([$job->getTask(), 'run'], $this->environment)->onResolve(function ($exception, $value) use ($job) { - if ($exception) { - $result = new Internal\TaskFailure($job->getId(), $exception); - } else { - $result = new Internal\TaskSuccess($job->getId(), $value); - } + try { + $result = yield call([$job->getTask(), "run"], $this->environment); + $result = new Internal\TaskSuccess($job->getId(), $result); + } catch (\Throwable $exception) { + $result = new Internal\TaskFailure($job->getId(), $exception); + } - $this->channel->send($result); - }); + $job = null; // Free memory from last job. - unset($job); // Free memory from last job. + yield $this->channel->send($result); + + $result = null; // Free memory from last result. $job = yield $this->channel->receive(); } diff --git a/test/Worker/AbstractWorkerTest.php b/test/Worker/AbstractWorkerTest.php index 6ddc156..07d2343 100644 --- a/test/Worker/AbstractWorkerTest.php +++ b/test/Worker/AbstractWorkerTest.php @@ -93,7 +93,7 @@ abstract class AbstractWorkerTest extends TestCase { $worker->enqueue(new TestTask(72, 100)) ]; - $expected = [72, 42, 56]; + $expected = [42, 56, 72]; foreach ($promises as $promise) { $promise->onResolve(function ($e, $v) use (&$expected) { $this->assertSame(\array_shift($expected), $v);