Run tasks in-order

Tasks on Windows must be run in-order because STDIO pipes are blocking. IPC be refactored to use sockets in the future if desired.
This commit is contained in:
Aaron Piotrowski 2017-12-06 15:59:28 -06:00
parent ef557e18f9
commit 6be72c6754
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
3 changed files with 12 additions and 16 deletions

View File

@ -2,7 +2,6 @@
namespace Amp\Parallel\Worker\Internal;
use Amp\Parallel\Worker\Environment;
use Amp\Parallel\Worker\Task;
/** @internal */
@ -25,11 +24,7 @@ class Job {
public function getTask(): Task {
// Classes that cannot be autoloaded will be unserialized as an instance of __PHP_Incomplete_Class.
if ($this->task instanceof \__PHP_Incomplete_Class) {
return new class implements Task {
public function run(Environment $environment) {
throw new \Error(\sprintf("Classes implementing %s must be autoloadable by the Composer autoloader", Task::class));
}
};
throw new \Error(\sprintf("Classes implementing %s must be autoloadable by the Composer autoloader", Task::class));
}
return $this->task;

View File

@ -37,17 +37,18 @@ class TaskRunner {
$job = yield $this->channel->receive();
while ($job instanceof Internal\Job) {
call([$job->getTask(), 'run'], $this->environment)->onResolve(function ($exception, $value) use ($job) {
if ($exception) {
$result = new Internal\TaskFailure($job->getId(), $exception);
} else {
$result = new Internal\TaskSuccess($job->getId(), $value);
}
try {
$result = yield call([$job->getTask(), "run"], $this->environment);
$result = new Internal\TaskSuccess($job->getId(), $result);
} catch (\Throwable $exception) {
$result = new Internal\TaskFailure($job->getId(), $exception);
}
$this->channel->send($result);
});
$job = null; // Free memory from last job.
unset($job); // Free memory from last job.
yield $this->channel->send($result);
$result = null; // Free memory from last result.
$job = yield $this->channel->receive();
}

View File

@ -93,7 +93,7 @@ abstract class AbstractWorkerTest extends TestCase {
$worker->enqueue(new TestTask(72, 100))
];
$expected = [72, 42, 56];
$expected = [42, 56, 72];
foreach ($promises as $promise) {
$promise->onResolve(function ($e, $v) use (&$expected) {
$this->assertSame(\array_shift($expected), $v);