From 1adb63d90686fbb552ce63e54a443f960d2716ac Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Thu, 7 Dec 2017 20:49:44 -0600 Subject: [PATCH] Remove do*() private methods Replaced with Amp\call(). --- lib/Context/Process.php | 71 ++++++++++++------------- lib/Context/Thread.php | 93 ++++++++++++++------------------- lib/Sync/ChannelledStream.php | 44 +++++++--------- lib/Sync/SharedMemoryParcel.php | 35 +++++-------- lib/Worker/AbstractWorker.php | 50 +++++------------- lib/Worker/DefaultPool.php | 43 +++------------ 6 files changed, 124 insertions(+), 212 deletions(-) diff --git a/lib/Context/Process.php b/lib/Context/Process.php index 45e9d56..6e7db10 100644 --- a/lib/Context/Process.php +++ b/lib/Context/Process.php @@ -3,7 +3,6 @@ namespace Amp\Parallel\Context; use Amp\ByteStream; -use Amp\Coroutine; use Amp\Parallel\Context; use Amp\Parallel\ContextException; use Amp\Parallel\StatusError; @@ -123,25 +122,23 @@ class Process implements Context { throw new StatusError("The process has not been started"); } - return new Coroutine($this->doReceive()); - } + return call(function () { + try { + $data = yield $this->channel->receive(); + } catch (ChannelException $e) { + throw new ContextException("The context stopped responding, potentially due to a fatal error or calling exit", 0, $e); + } - private function doReceive() { - try { - $data = yield $this->channel->receive(); - } catch (ChannelException $e) { - throw new ContextException("The context stopped responding, potentially due to a fatal error or calling exit", 0, $e); - } + if ($data instanceof ExitResult) { + $data = $data->getResult(); + throw new SynchronizationError(\sprintf( + 'Process unexpectedly exited with result of type: %s', + \is_object($data) ? \get_class($data) : \gettype($data) + )); + } - if ($data instanceof ExitResult) { - $data = $data->getResult(); - throw new SynchronizationError(\sprintf( - 'Process unexpectedly exited with result of type: %s', - \is_object($data) ? \get_class($data) : \gettype($data) - )); - } - - return $data; + return $data; + }); } /** @@ -173,29 +170,27 @@ class Process implements Context { throw new StatusError("The process has not been started"); } - return new Coroutine($this->doJoin()); - } - - private function doJoin(): \Generator { - try { - $data = yield $this->channel->receive(); - if (!$data instanceof ExitResult) { - throw new SynchronizationError("Did not receive an exit result from process"); + return call(function () { + try { + $data = yield $this->channel->receive(); + if (!$data instanceof ExitResult) { + throw new SynchronizationError("Did not receive an exit result from process"); + } + } catch (ChannelException $e) { + $this->kill(); + throw new ContextException("The context stopped responding, potentially due to a fatal error or calling exit", 0, $e); + } catch (\Throwable $exception) { + $this->kill(); + throw $exception; } - } catch (ChannelException $e) { - $this->kill(); - throw new ContextException("The context stopped responding, potentially due to a fatal error or calling exit", 0, $e); - } catch (\Throwable $exception) { - $this->kill(); - throw $exception; - } - $code = yield $this->process->join(); - if ($code !== 0) { - throw new ContextException(\sprintf("Process exited with code %d", $code)); - } + $code = yield $this->process->join(); + if ($code !== 0) { + throw new ContextException(\sprintf("Process exited with code %d", $code)); + } - return $data->getResult(); + return $data->getResult(); + }); } /** diff --git a/lib/Context/Thread.php b/lib/Context/Thread.php index 346f0f4..dca5654 100644 --- a/lib/Context/Thread.php +++ b/lib/Context/Thread.php @@ -2,7 +2,6 @@ namespace Amp\Parallel\Context; -use Amp\Coroutine; use Amp\Loop; use Amp\Parallel\Context; use Amp\Parallel\ContextException; @@ -213,42 +212,32 @@ class Thread implements Context { throw new StatusError('The thread has not been started or has already finished.'); } - return new Coroutine($this->doJoin()); - } + return call(function () { + Loop::enable($this->watcher); - /** - * @coroutine - * - * @return \Generator - * - * @throws SynchronizationError If the thread does not send an exit status. - * @throws ContextException If the context stops responding. - */ - private function doJoin(): \Generator { - Loop::enable($this->watcher); + try { + $response = yield $this->channel->receive(); - try { - $response = yield $this->channel->receive(); - - if (!$response instanceof ExitResult) { - throw new SynchronizationError('Did not receive an exit result from thread.'); + if (!$response instanceof ExitResult) { + throw new SynchronizationError('Did not receive an exit result from thread.'); + } + } catch (ChannelException $exception) { + $this->kill(); + throw new ContextException( + "The context stopped responding, potentially due to a fatal error or calling exit", + 0, + $exception + ); + } catch (\Throwable $exception) { + $this->kill(); + throw $exception; + } finally { + Loop::disable($this->watcher); + $this->close(); } - } catch (ChannelException $exception) { - $this->kill(); - throw new ContextException( - "The context stopped responding, potentially due to a fatal error or calling exit", - 0, - $exception - ); - } catch (\Throwable $exception) { - $this->kill(); - throw $exception; - } finally { - Loop::disable($this->watcher); - $this->close(); - } - return $response->getResult(); + return $response->getResult(); + }); } /** @@ -259,29 +248,27 @@ class Thread implements Context { throw new StatusError('The process has not been started.'); } - return new Coroutine($this->doReceive()); - } + return call(function () { + Loop::enable($this->watcher); - private function doReceive() { - Loop::enable($this->watcher); + try { + $data = yield $this->channel->receive(); + } catch (ChannelException $e) { + throw new ContextException("The context stopped responding, potentially due to a fatal error or calling exit", 0, $e); + } finally { + Loop::disable($this->watcher); + } - try { - $data = yield $this->channel->receive(); - } catch (ChannelException $e) { - throw new ContextException("The context stopped responding, potentially due to a fatal error or calling exit", 0, $e); - } finally { - Loop::disable($this->watcher); - } + if ($data instanceof ExitResult) { + $data = $data->getResult(); + throw new SynchronizationError(\sprintf( + 'Thread process unexpectedly exited with result of type: %s', + \is_object($data) ? \get_class($data) : \gettype($data) + )); + } - if ($data instanceof ExitResult) { - $data = $data->getResult(); - throw new SynchronizationError(\sprintf( - 'Thread process unexpectedly exited with result of type: %s', - \is_object($data) ? \get_class($data) : \gettype($data) - )); - } - - return $data; + return $data; + }); } /** diff --git a/lib/Sync/ChannelledStream.php b/lib/Sync/ChannelledStream.php index eda4b4b..a75f0a9 100644 --- a/lib/Sync/ChannelledStream.php +++ b/lib/Sync/ChannelledStream.php @@ -5,8 +5,8 @@ namespace Amp\Parallel\Sync; use Amp\ByteStream\InputStream; use Amp\ByteStream\OutputStream; use Amp\ByteStream\StreamException; -use Amp\Coroutine; use Amp\Promise; +use function Amp\call; /** * An asynchronous channel for sending data between threads and processes. @@ -43,37 +43,33 @@ class ChannelledStream implements Channel { * {@inheritdoc} */ public function send($data): Promise { - return new Coroutine($this->doSend($data)); - } - - private function doSend($data): \Generator { - try { - return yield $this->write->write($this->parser->encode($data)); - } catch (StreamException $exception) { - throw new ChannelException("Sending on the channel failed. Did the context die?", $exception); - } + return call(function () use ($data) { + try { + return yield $this->write->write($this->parser->encode($data)); + } catch (StreamException $exception) { + throw new ChannelException("Sending on the channel failed. Did the context die?", $exception); + } + }); } /** * {@inheritdoc} */ public function receive(): Promise { - return new Coroutine($this->doReceive()); - } + return call(function () { + while ($this->received->isEmpty()) { + if (($chunk = yield $this->read->read()) === null) { + throw new ChannelException("The channel closed. Did the context die?"); + } - private function doReceive(): \Generator { - while ($this->received->isEmpty()) { - if (($chunk = yield $this->read->read()) === null) { - throw new ChannelException("The channel closed. Did the context die?"); + try { + $this->parser->push($chunk); + } catch (StreamException $exception) { + throw new ChannelException("Reading from the channel failed. Did the context die?", $exception); + } } - try { - $this->parser->push($chunk); - } catch (StreamException $exception) { - throw new ChannelException("Reading from the channel failed. Did the context die?", $exception); - } - } - - return $this->received->shift(); + return $this->received->shift(); + }); } } diff --git a/lib/Sync/SharedMemoryParcel.php b/lib/Sync/SharedMemoryParcel.php index 9af9c70..2b9566e 100644 --- a/lib/Sync/SharedMemoryParcel.php +++ b/lib/Sync/SharedMemoryParcel.php @@ -2,7 +2,6 @@ namespace Amp\Parallel\Sync; -use Amp\Coroutine; use Amp\Failure; use Amp\Promise; use Amp\Success; @@ -205,32 +204,22 @@ class SharedMemoryParcel implements Parcel { * {@inheritdoc} */ public function synchronized(callable $callback): Promise { - return new Coroutine($this->doSynchronized($callback)); - } + return call(function () use ($callback) { + /** @var \Amp\Sync\Lock $lock */ + $lock = yield $this->semaphore->acquire(); - /** - * @coroutine - * - * @param callable $callback - * - * @return \Generator - */ - private function doSynchronized(callable $callback): \Generator { - /** @var \Amp\Sync\Lock $lock */ - $lock = yield $this->semaphore->acquire(); + try { + $result = yield call($callback, yield $this->unwrap()); - try { - $value = yield $this->unwrap(); - $result = yield call($callback, $value); - - if ($result !== null) { - $this->wrap($result); + if ($result !== null) { + $this->wrap($result); + } + } finally { + $lock->release(); } - } finally { - $lock->release(); - } - return $result; + return $result; + }); } diff --git a/lib/Worker/AbstractWorker.php b/lib/Worker/AbstractWorker.php index b76bbd7..d8e42e9 100644 --- a/lib/Worker/AbstractWorker.php +++ b/lib/Worker/AbstractWorker.php @@ -2,12 +2,12 @@ namespace Amp\Parallel\Worker; -use Amp\Coroutine; use Amp\Deferred; use Amp\Parallel\Context; use Amp\Parallel\ContextException; use Amp\Parallel\StatusError; use Amp\Promise; +use function Amp\call; /** * Base class for most common types of task workers. @@ -98,39 +98,18 @@ abstract class AbstractWorker implements Worker { throw new StatusError("The worker has been shut down"); } - return new Coroutine($this->doEnqueue($task)); - } - - /** - * @coroutine - * - * @param \Amp\Parallel\Worker\Task $task - * - * @return \Generator - * @throws \Amp\Parallel\StatusError - * @throws \Amp\Parallel\Worker\TaskException - * @throws \Amp\Parallel\Worker\TaskError - * @throws \Amp\Parallel\Worker\WorkerException - */ - private function doEnqueue(Task $task): \Generator { $empty = empty($this->jobQueue); $job = new Internal\Job($task); $this->jobQueue[$job->getId()] = $deferred = new Deferred; - try { - yield $this->context->send($job); - } catch (\Throwable $exception) { - $exception = new WorkerException("Sending the task to the worker failed", $exception); - $this->cancel($exception); - throw $exception; - } + $this->context->send($job); if ($empty) { $this->context->receive()->onResolve($this->onResolve); } - return yield $deferred->promise(); + return $deferred->promise(); } /** @@ -141,22 +120,19 @@ abstract class AbstractWorker implements Worker { throw new StatusError("The worker is not running"); } - return new Coroutine($this->doShutdown()); - } - - /** - * {@inheritdoc} - */ - private function doShutdown(): \Generator { $this->shutdown = true; - // If a task is currently running, wait for it to finish. - yield Promise\any(\array_map(function (Deferred $deferred): Promise { - return $deferred->promise(); - }, $this->jobQueue)); + return call(function () { + if (!empty($this->jobQueue)) { + // If a task is currently running, wait for it to finish. + yield Promise\any(\array_map(function (Deferred $deferred): Promise { + return $deferred->promise(); + }, $this->jobQueue)); + } - yield $this->context->send(0); - return yield $this->context->join(); + yield $this->context->send(0); + return yield $this->context->join(); + }); } /** diff --git a/lib/Worker/DefaultPool.php b/lib/Worker/DefaultPool.php index f4dd781..4e1a4ad 100644 --- a/lib/Worker/DefaultPool.php +++ b/lib/Worker/DefaultPool.php @@ -3,7 +3,6 @@ namespace Amp\Parallel\Worker; use Amp\CallableMaker; -use Amp\Coroutine; use Amp\Parallel\StatusError; use Amp\Promise; @@ -156,34 +155,18 @@ class DefaultPool implements Pool { * @throws \Amp\Parallel\Worker\TaskException If the task throws an exception. */ public function enqueue(Task $task): Promise { - return new Coroutine($this->doEnqueue($this->pull(), $task)); - } + $worker = $this->pull(); - /** - * @coroutine - * - * Keeps the worker marked as busy until the task has completed. - * - * @param \Amp\Parallel\Worker\Worker $worker - * @param \Amp\Parallel\Worker\Task $task - * - * @return \Generator - */ - public function doEnqueue(Worker $worker, Task $task): \Generator { - try { - $result = yield $worker->enqueue($task); - } finally { + $promise = $worker->enqueue($task); + $promise->onResolve(function () use ($worker) { $this->push($worker); - } - - return $result; + }); + return $promise; } /** * Shuts down the pool and all workers in it. * - * @coroutine - * * @return \Amp\Promise Array of exit status from all workers. * * @throws \Amp\Parallel\StatusError If the pool has not been started. @@ -193,30 +176,16 @@ class DefaultPool implements Pool { throw new StatusError('The pool is not running.'); } - return new Coroutine($this->doShutdown()); - } - - /** - * Shuts down the pool and all workers in it. - * - * @coroutine - * - * @return \Generator - * - * @throws \Amp\Parallel\StatusError If the pool has not been started. - */ - private function doShutdown(): \Generator { $this->running = false; $shutdowns = []; - foreach ($this->workers as $worker) { if ($worker->isRunning()) { $shutdowns[] = $worker->shutdown(); } } - return yield Promise\all($shutdowns); + return Promise\all($shutdowns); } /**