mirror of
https://github.com/amphp/parallel.git
synced 2025-02-20 04:44:36 +01:00
Remove do*() private methods
Replaced with Amp\call().
This commit is contained in:
parent
11a115670c
commit
1adb63d906
@ -3,7 +3,6 @@
|
||||
namespace Amp\Parallel\Context;
|
||||
|
||||
use Amp\ByteStream;
|
||||
use Amp\Coroutine;
|
||||
use Amp\Parallel\Context;
|
||||
use Amp\Parallel\ContextException;
|
||||
use Amp\Parallel\StatusError;
|
||||
@ -123,25 +122,23 @@ class Process implements Context {
|
||||
throw new StatusError("The process has not been started");
|
||||
}
|
||||
|
||||
return new Coroutine($this->doReceive());
|
||||
}
|
||||
return call(function () {
|
||||
try {
|
||||
$data = yield $this->channel->receive();
|
||||
} catch (ChannelException $e) {
|
||||
throw new ContextException("The context stopped responding, potentially due to a fatal error or calling exit", 0, $e);
|
||||
}
|
||||
|
||||
private function doReceive() {
|
||||
try {
|
||||
$data = yield $this->channel->receive();
|
||||
} catch (ChannelException $e) {
|
||||
throw new ContextException("The context stopped responding, potentially due to a fatal error or calling exit", 0, $e);
|
||||
}
|
||||
if ($data instanceof ExitResult) {
|
||||
$data = $data->getResult();
|
||||
throw new SynchronizationError(\sprintf(
|
||||
'Process unexpectedly exited with result of type: %s',
|
||||
\is_object($data) ? \get_class($data) : \gettype($data)
|
||||
));
|
||||
}
|
||||
|
||||
if ($data instanceof ExitResult) {
|
||||
$data = $data->getResult();
|
||||
throw new SynchronizationError(\sprintf(
|
||||
'Process unexpectedly exited with result of type: %s',
|
||||
\is_object($data) ? \get_class($data) : \gettype($data)
|
||||
));
|
||||
}
|
||||
|
||||
return $data;
|
||||
return $data;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@ -173,29 +170,27 @@ class Process implements Context {
|
||||
throw new StatusError("The process has not been started");
|
||||
}
|
||||
|
||||
return new Coroutine($this->doJoin());
|
||||
}
|
||||
|
||||
private function doJoin(): \Generator {
|
||||
try {
|
||||
$data = yield $this->channel->receive();
|
||||
if (!$data instanceof ExitResult) {
|
||||
throw new SynchronizationError("Did not receive an exit result from process");
|
||||
return call(function () {
|
||||
try {
|
||||
$data = yield $this->channel->receive();
|
||||
if (!$data instanceof ExitResult) {
|
||||
throw new SynchronizationError("Did not receive an exit result from process");
|
||||
}
|
||||
} catch (ChannelException $e) {
|
||||
$this->kill();
|
||||
throw new ContextException("The context stopped responding, potentially due to a fatal error or calling exit", 0, $e);
|
||||
} catch (\Throwable $exception) {
|
||||
$this->kill();
|
||||
throw $exception;
|
||||
}
|
||||
} catch (ChannelException $e) {
|
||||
$this->kill();
|
||||
throw new ContextException("The context stopped responding, potentially due to a fatal error or calling exit", 0, $e);
|
||||
} catch (\Throwable $exception) {
|
||||
$this->kill();
|
||||
throw $exception;
|
||||
}
|
||||
|
||||
$code = yield $this->process->join();
|
||||
if ($code !== 0) {
|
||||
throw new ContextException(\sprintf("Process exited with code %d", $code));
|
||||
}
|
||||
$code = yield $this->process->join();
|
||||
if ($code !== 0) {
|
||||
throw new ContextException(\sprintf("Process exited with code %d", $code));
|
||||
}
|
||||
|
||||
return $data->getResult();
|
||||
return $data->getResult();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2,7 +2,6 @@
|
||||
|
||||
namespace Amp\Parallel\Context;
|
||||
|
||||
use Amp\Coroutine;
|
||||
use Amp\Loop;
|
||||
use Amp\Parallel\Context;
|
||||
use Amp\Parallel\ContextException;
|
||||
@ -213,42 +212,32 @@ class Thread implements Context {
|
||||
throw new StatusError('The thread has not been started or has already finished.');
|
||||
}
|
||||
|
||||
return new Coroutine($this->doJoin());
|
||||
}
|
||||
return call(function () {
|
||||
Loop::enable($this->watcher);
|
||||
|
||||
/**
|
||||
* @coroutine
|
||||
*
|
||||
* @return \Generator
|
||||
*
|
||||
* @throws SynchronizationError If the thread does not send an exit status.
|
||||
* @throws ContextException If the context stops responding.
|
||||
*/
|
||||
private function doJoin(): \Generator {
|
||||
Loop::enable($this->watcher);
|
||||
try {
|
||||
$response = yield $this->channel->receive();
|
||||
|
||||
try {
|
||||
$response = yield $this->channel->receive();
|
||||
|
||||
if (!$response instanceof ExitResult) {
|
||||
throw new SynchronizationError('Did not receive an exit result from thread.');
|
||||
if (!$response instanceof ExitResult) {
|
||||
throw new SynchronizationError('Did not receive an exit result from thread.');
|
||||
}
|
||||
} catch (ChannelException $exception) {
|
||||
$this->kill();
|
||||
throw new ContextException(
|
||||
"The context stopped responding, potentially due to a fatal error or calling exit",
|
||||
0,
|
||||
$exception
|
||||
);
|
||||
} catch (\Throwable $exception) {
|
||||
$this->kill();
|
||||
throw $exception;
|
||||
} finally {
|
||||
Loop::disable($this->watcher);
|
||||
$this->close();
|
||||
}
|
||||
} catch (ChannelException $exception) {
|
||||
$this->kill();
|
||||
throw new ContextException(
|
||||
"The context stopped responding, potentially due to a fatal error or calling exit",
|
||||
0,
|
||||
$exception
|
||||
);
|
||||
} catch (\Throwable $exception) {
|
||||
$this->kill();
|
||||
throw $exception;
|
||||
} finally {
|
||||
Loop::disable($this->watcher);
|
||||
$this->close();
|
||||
}
|
||||
|
||||
return $response->getResult();
|
||||
return $response->getResult();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@ -259,29 +248,27 @@ class Thread implements Context {
|
||||
throw new StatusError('The process has not been started.');
|
||||
}
|
||||
|
||||
return new Coroutine($this->doReceive());
|
||||
}
|
||||
return call(function () {
|
||||
Loop::enable($this->watcher);
|
||||
|
||||
private function doReceive() {
|
||||
Loop::enable($this->watcher);
|
||||
try {
|
||||
$data = yield $this->channel->receive();
|
||||
} catch (ChannelException $e) {
|
||||
throw new ContextException("The context stopped responding, potentially due to a fatal error or calling exit", 0, $e);
|
||||
} finally {
|
||||
Loop::disable($this->watcher);
|
||||
}
|
||||
|
||||
try {
|
||||
$data = yield $this->channel->receive();
|
||||
} catch (ChannelException $e) {
|
||||
throw new ContextException("The context stopped responding, potentially due to a fatal error or calling exit", 0, $e);
|
||||
} finally {
|
||||
Loop::disable($this->watcher);
|
||||
}
|
||||
if ($data instanceof ExitResult) {
|
||||
$data = $data->getResult();
|
||||
throw new SynchronizationError(\sprintf(
|
||||
'Thread process unexpectedly exited with result of type: %s',
|
||||
\is_object($data) ? \get_class($data) : \gettype($data)
|
||||
));
|
||||
}
|
||||
|
||||
if ($data instanceof ExitResult) {
|
||||
$data = $data->getResult();
|
||||
throw new SynchronizationError(\sprintf(
|
||||
'Thread process unexpectedly exited with result of type: %s',
|
||||
\is_object($data) ? \get_class($data) : \gettype($data)
|
||||
));
|
||||
}
|
||||
|
||||
return $data;
|
||||
return $data;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -5,8 +5,8 @@ namespace Amp\Parallel\Sync;
|
||||
use Amp\ByteStream\InputStream;
|
||||
use Amp\ByteStream\OutputStream;
|
||||
use Amp\ByteStream\StreamException;
|
||||
use Amp\Coroutine;
|
||||
use Amp\Promise;
|
||||
use function Amp\call;
|
||||
|
||||
/**
|
||||
* An asynchronous channel for sending data between threads and processes.
|
||||
@ -43,37 +43,33 @@ class ChannelledStream implements Channel {
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function send($data): Promise {
|
||||
return new Coroutine($this->doSend($data));
|
||||
}
|
||||
|
||||
private function doSend($data): \Generator {
|
||||
try {
|
||||
return yield $this->write->write($this->parser->encode($data));
|
||||
} catch (StreamException $exception) {
|
||||
throw new ChannelException("Sending on the channel failed. Did the context die?", $exception);
|
||||
}
|
||||
return call(function () use ($data) {
|
||||
try {
|
||||
return yield $this->write->write($this->parser->encode($data));
|
||||
} catch (StreamException $exception) {
|
||||
throw new ChannelException("Sending on the channel failed. Did the context die?", $exception);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function receive(): Promise {
|
||||
return new Coroutine($this->doReceive());
|
||||
}
|
||||
return call(function () {
|
||||
while ($this->received->isEmpty()) {
|
||||
if (($chunk = yield $this->read->read()) === null) {
|
||||
throw new ChannelException("The channel closed. Did the context die?");
|
||||
}
|
||||
|
||||
private function doReceive(): \Generator {
|
||||
while ($this->received->isEmpty()) {
|
||||
if (($chunk = yield $this->read->read()) === null) {
|
||||
throw new ChannelException("The channel closed. Did the context die?");
|
||||
try {
|
||||
$this->parser->push($chunk);
|
||||
} catch (StreamException $exception) {
|
||||
throw new ChannelException("Reading from the channel failed. Did the context die?", $exception);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
$this->parser->push($chunk);
|
||||
} catch (StreamException $exception) {
|
||||
throw new ChannelException("Reading from the channel failed. Did the context die?", $exception);
|
||||
}
|
||||
}
|
||||
|
||||
return $this->received->shift();
|
||||
return $this->received->shift();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -2,7 +2,6 @@
|
||||
|
||||
namespace Amp\Parallel\Sync;
|
||||
|
||||
use Amp\Coroutine;
|
||||
use Amp\Failure;
|
||||
use Amp\Promise;
|
||||
use Amp\Success;
|
||||
@ -205,32 +204,22 @@ class SharedMemoryParcel implements Parcel {
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function synchronized(callable $callback): Promise {
|
||||
return new Coroutine($this->doSynchronized($callback));
|
||||
}
|
||||
return call(function () use ($callback) {
|
||||
/** @var \Amp\Sync\Lock $lock */
|
||||
$lock = yield $this->semaphore->acquire();
|
||||
|
||||
/**
|
||||
* @coroutine
|
||||
*
|
||||
* @param callable $callback
|
||||
*
|
||||
* @return \Generator
|
||||
*/
|
||||
private function doSynchronized(callable $callback): \Generator {
|
||||
/** @var \Amp\Sync\Lock $lock */
|
||||
$lock = yield $this->semaphore->acquire();
|
||||
try {
|
||||
$result = yield call($callback, yield $this->unwrap());
|
||||
|
||||
try {
|
||||
$value = yield $this->unwrap();
|
||||
$result = yield call($callback, $value);
|
||||
|
||||
if ($result !== null) {
|
||||
$this->wrap($result);
|
||||
if ($result !== null) {
|
||||
$this->wrap($result);
|
||||
}
|
||||
} finally {
|
||||
$lock->release();
|
||||
}
|
||||
} finally {
|
||||
$lock->release();
|
||||
}
|
||||
|
||||
return $result;
|
||||
return $result;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
|
@ -2,12 +2,12 @@
|
||||
|
||||
namespace Amp\Parallel\Worker;
|
||||
|
||||
use Amp\Coroutine;
|
||||
use Amp\Deferred;
|
||||
use Amp\Parallel\Context;
|
||||
use Amp\Parallel\ContextException;
|
||||
use Amp\Parallel\StatusError;
|
||||
use Amp\Promise;
|
||||
use function Amp\call;
|
||||
|
||||
/**
|
||||
* Base class for most common types of task workers.
|
||||
@ -98,39 +98,18 @@ abstract class AbstractWorker implements Worker {
|
||||
throw new StatusError("The worker has been shut down");
|
||||
}
|
||||
|
||||
return new Coroutine($this->doEnqueue($task));
|
||||
}
|
||||
|
||||
/**
|
||||
* @coroutine
|
||||
*
|
||||
* @param \Amp\Parallel\Worker\Task $task
|
||||
*
|
||||
* @return \Generator
|
||||
* @throws \Amp\Parallel\StatusError
|
||||
* @throws \Amp\Parallel\Worker\TaskException
|
||||
* @throws \Amp\Parallel\Worker\TaskError
|
||||
* @throws \Amp\Parallel\Worker\WorkerException
|
||||
*/
|
||||
private function doEnqueue(Task $task): \Generator {
|
||||
$empty = empty($this->jobQueue);
|
||||
|
||||
$job = new Internal\Job($task);
|
||||
$this->jobQueue[$job->getId()] = $deferred = new Deferred;
|
||||
|
||||
try {
|
||||
yield $this->context->send($job);
|
||||
} catch (\Throwable $exception) {
|
||||
$exception = new WorkerException("Sending the task to the worker failed", $exception);
|
||||
$this->cancel($exception);
|
||||
throw $exception;
|
||||
}
|
||||
$this->context->send($job);
|
||||
|
||||
if ($empty) {
|
||||
$this->context->receive()->onResolve($this->onResolve);
|
||||
}
|
||||
|
||||
return yield $deferred->promise();
|
||||
return $deferred->promise();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -141,22 +120,19 @@ abstract class AbstractWorker implements Worker {
|
||||
throw new StatusError("The worker is not running");
|
||||
}
|
||||
|
||||
return new Coroutine($this->doShutdown());
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
private function doShutdown(): \Generator {
|
||||
$this->shutdown = true;
|
||||
|
||||
// If a task is currently running, wait for it to finish.
|
||||
yield Promise\any(\array_map(function (Deferred $deferred): Promise {
|
||||
return $deferred->promise();
|
||||
}, $this->jobQueue));
|
||||
return call(function () {
|
||||
if (!empty($this->jobQueue)) {
|
||||
// If a task is currently running, wait for it to finish.
|
||||
yield Promise\any(\array_map(function (Deferred $deferred): Promise {
|
||||
return $deferred->promise();
|
||||
}, $this->jobQueue));
|
||||
}
|
||||
|
||||
yield $this->context->send(0);
|
||||
return yield $this->context->join();
|
||||
yield $this->context->send(0);
|
||||
return yield $this->context->join();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -3,7 +3,6 @@
|
||||
namespace Amp\Parallel\Worker;
|
||||
|
||||
use Amp\CallableMaker;
|
||||
use Amp\Coroutine;
|
||||
use Amp\Parallel\StatusError;
|
||||
use Amp\Promise;
|
||||
|
||||
@ -156,34 +155,18 @@ class DefaultPool implements Pool {
|
||||
* @throws \Amp\Parallel\Worker\TaskException If the task throws an exception.
|
||||
*/
|
||||
public function enqueue(Task $task): Promise {
|
||||
return new Coroutine($this->doEnqueue($this->pull(), $task));
|
||||
}
|
||||
$worker = $this->pull();
|
||||
|
||||
/**
|
||||
* @coroutine
|
||||
*
|
||||
* Keeps the worker marked as busy until the task has completed.
|
||||
*
|
||||
* @param \Amp\Parallel\Worker\Worker $worker
|
||||
* @param \Amp\Parallel\Worker\Task $task
|
||||
*
|
||||
* @return \Generator
|
||||
*/
|
||||
public function doEnqueue(Worker $worker, Task $task): \Generator {
|
||||
try {
|
||||
$result = yield $worker->enqueue($task);
|
||||
} finally {
|
||||
$promise = $worker->enqueue($task);
|
||||
$promise->onResolve(function () use ($worker) {
|
||||
$this->push($worker);
|
||||
}
|
||||
|
||||
return $result;
|
||||
});
|
||||
return $promise;
|
||||
}
|
||||
|
||||
/**
|
||||
* Shuts down the pool and all workers in it.
|
||||
*
|
||||
* @coroutine
|
||||
*
|
||||
* @return \Amp\Promise<int[]> Array of exit status from all workers.
|
||||
*
|
||||
* @throws \Amp\Parallel\StatusError If the pool has not been started.
|
||||
@ -193,30 +176,16 @@ class DefaultPool implements Pool {
|
||||
throw new StatusError('The pool is not running.');
|
||||
}
|
||||
|
||||
return new Coroutine($this->doShutdown());
|
||||
}
|
||||
|
||||
/**
|
||||
* Shuts down the pool and all workers in it.
|
||||
*
|
||||
* @coroutine
|
||||
*
|
||||
* @return \Generator
|
||||
*
|
||||
* @throws \Amp\Parallel\StatusError If the pool has not been started.
|
||||
*/
|
||||
private function doShutdown(): \Generator {
|
||||
$this->running = false;
|
||||
|
||||
$shutdowns = [];
|
||||
|
||||
foreach ($this->workers as $worker) {
|
||||
if ($worker->isRunning()) {
|
||||
$shutdowns[] = $worker->shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
return yield Promise\all($shutdowns);
|
||||
return Promise\all($shutdowns);
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
x
Reference in New Issue
Block a user