diff --git a/lib/Context/Process.php b/lib/Context/Process.php index 9185db8..1d3d53c 100644 --- a/lib/Context/Process.php +++ b/lib/Context/Process.php @@ -174,13 +174,7 @@ class Process implements Context { throw new \Error("Cannot send exit result objects"); } - return call(function () use ($data) { - try { - yield $this->channel->send($data); - } catch (ChannelException $e) { - throw new ContextException("The context went away, potentially due to a fatal error or calling exit", 0, $e); - } - }); + return $this->channel->send($data); } /** @@ -197,9 +191,6 @@ class Process implements Context { if (!$data instanceof ExitResult) { throw new SynchronizationError("Did not receive an exit result from process"); } - } catch (ChannelException $e) { - $this->kill(); - throw new ContextException("The context stopped responding, potentially due to a fatal error or calling exit", 0, $e); } catch (\Throwable $exception) { $this->kill(); throw $exception; diff --git a/lib/Context/Thread.php b/lib/Context/Thread.php index 57055ea..6c3e0a9 100644 --- a/lib/Context/Thread.php +++ b/lib/Context/Thread.php @@ -254,8 +254,6 @@ class Thread implements Context { try { $data = yield $this->channel->receive(); - } catch (ChannelException $e) { - throw new ContextException("The context stopped responding, potentially due to a fatal error or calling exit", 0, $e); } finally { Loop::disable($this->watcher); } @@ -288,12 +286,12 @@ class Thread implements Context { Loop::enable($this->watcher); try { - yield $this->channel->send($data); - } catch (ChannelException $e) { - throw new ContextException("The context went away, potentially due to a fatal error or calling exit", 0, $e); + $result = yield $this->channel->send($data); } finally { Loop::disable($this->watcher); } + + return $result; }); } } diff --git a/lib/Worker/AbstractWorker.php b/lib/Worker/AbstractWorker.php index 44a056d..18ba046 100644 --- a/lib/Worker/AbstractWorker.php +++ b/lib/Worker/AbstractWorker.php @@ -92,18 +92,27 @@ abstract class AbstractWorker implements Worker { $this->context->start(); } - $empty = empty($this->jobQueue); + return call(function () use ($task) { + $empty = empty($this->jobQueue); - $job = new Internal\Job($task); - $this->jobQueue[$job->getId()] = $deferred = new Deferred; + $job = new Internal\Job($task); + $this->jobQueue[$job->getId()] = $deferred = new Deferred; - $this->context->send($job); + try { + yield $this->context->send($job); + if ($empty) { + $this->context->receive()->onResolve($this->onResolve); + } + } catch (\Throwable $exception) { + unset($this->jobQueue[$job->getId()]); + if (!empty($this->jobQueue)) { + $this->context->receive()->onResolve($this->onResolve); + } + $deferred->fail($exception); + } - if ($empty) { - $this->context->receive()->onResolve($this->onResolve); - } - - return $deferred->promise(); + return $deferred->promise(); + }); } /** diff --git a/test/Worker/AbstractWorkerTest.php b/test/Worker/AbstractWorkerTest.php index f52b7e6..4c81554 100644 --- a/test/Worker/AbstractWorkerTest.php +++ b/test/Worker/AbstractWorkerTest.php @@ -3,6 +3,7 @@ namespace Amp\Parallel\Test\Worker; use Amp\Loop; +use Amp\Parallel\Sync\SerializationException; use Amp\Parallel\Worker\Environment; use Amp\Parallel\Worker\Task; use Amp\Parallel\Worker\TaskError; @@ -121,7 +122,7 @@ abstract class AbstractWorkerTest extends TestCase { $this->assertFalse($worker->isRunning()); } - public function testUnserializableTask() { + public function testNonAutoloadableTask() { Loop::run(function () { $worker = $this->createWorker(); @@ -136,4 +137,22 @@ abstract class AbstractWorkerTest extends TestCase { yield $worker->shutdown(); }); } + + public function testUnserializableTask() { + Loop::run(function () { + $worker = $this->createWorker(); + + try { + yield $worker->enqueue(new class implements Task { // Anonymous classes are not serializable. + public function run(Environment $environment) { + } + }); + $this->fail("Tasks that cannot be autoloaded should throw an exception"); + } catch (SerializationException $exception) { + $this->assertSame(0, \strpos($exception->getMessage(), "The given data cannot be sent because it is not serializable")); + } + + yield $worker->shutdown(); + }); + } }