diff --git a/lib/Worker/Internal/Job.php b/lib/Worker/Internal/Job.php index 75d3f3f..d6269cc 100644 --- a/lib/Worker/Internal/Job.php +++ b/lib/Worker/Internal/Job.php @@ -2,7 +2,6 @@ namespace Amp\Parallel\Worker\Internal; -use Amp\Parallel\Worker\Environment; use Amp\Parallel\Worker\Task; /** @internal */ @@ -25,11 +24,7 @@ class Job { public function getTask(): Task { // Classes that cannot be autoloaded will be unserialized as an instance of __PHP_Incomplete_Class. if ($this->task instanceof \__PHP_Incomplete_Class) { - return new class implements Task { - public function run(Environment $environment) { - throw new \Error(\sprintf("Classes implementing %s must be autoloadable by the Composer autoloader", Task::class)); - } - }; + throw new \Error(\sprintf("Classes implementing %s must be autoloadable by the Composer autoloader", Task::class)); } return $this->task; diff --git a/lib/Worker/TaskRunner.php b/lib/Worker/TaskRunner.php index 403938e..5ca7823 100644 --- a/lib/Worker/TaskRunner.php +++ b/lib/Worker/TaskRunner.php @@ -37,17 +37,18 @@ class TaskRunner { $job = yield $this->channel->receive(); while ($job instanceof Internal\Job) { - call([$job->getTask(), 'run'], $this->environment)->onResolve(function ($exception, $value) use ($job) { - if ($exception) { - $result = new Internal\TaskFailure($job->getId(), $exception); - } else { - $result = new Internal\TaskSuccess($job->getId(), $value); - } + try { + $result = yield call([$job->getTask(), "run"], $this->environment); + $result = new Internal\TaskSuccess($job->getId(), $result); + } catch (\Throwable $exception) { + $result = new Internal\TaskFailure($job->getId(), $exception); + } - $this->channel->send($result); - }); + $job = null; // Free memory from last job. - unset($job); // Free memory from last job. + yield $this->channel->send($result); + + $result = null; // Free memory from last result. $job = yield $this->channel->receive(); } diff --git a/test/Worker/AbstractWorkerTest.php b/test/Worker/AbstractWorkerTest.php index 6ddc156..07d2343 100644 --- a/test/Worker/AbstractWorkerTest.php +++ b/test/Worker/AbstractWorkerTest.php @@ -93,7 +93,7 @@ abstract class AbstractWorkerTest extends TestCase { $worker->enqueue(new TestTask(72, 100)) ]; - $expected = [72, 42, 56]; + $expected = [42, 56, 72]; foreach ($promises as $promise) { $promise->onResolve(function ($e, $v) use (&$expected) { $this->assertSame(\array_shift($expected), $v);