mirror of
https://github.com/amphp/parallel.git
synced 2025-02-21 13:22:44 +01:00
Awaitable → Promise
This commit is contained in:
parent
c4a5082d90
commit
6d88d87fe0
@ -2,7 +2,7 @@
|
||||
|
||||
namespace Amp\Parallel;
|
||||
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
interface Context {
|
||||
/**
|
||||
@ -21,7 +21,7 @@ interface Context {
|
||||
public function kill();
|
||||
|
||||
/**
|
||||
* @return \Interop\Async\Awaitable<mixed> Resolves with the returned from the context.
|
||||
* @return \Interop\Async\Promise<mixed> Resolves with the returned from the context.
|
||||
*/
|
||||
public function join(): Awaitable;
|
||||
public function join(): Promise;
|
||||
}
|
||||
|
@ -14,7 +14,7 @@ use Amp\Parallel\{
|
||||
};
|
||||
use Amp\Parallel\Sync\{ Channel, ChannelledSocket };
|
||||
use Amp\Parallel\Sync\Internal\{ ExitFailure, ExitStatus, ExitSuccess };
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
/**
|
||||
* Implements a UNIX-compatible context using forked processes.
|
||||
@ -220,7 +220,7 @@ class Fork implements Process, Strand {
|
||||
$result = new Coroutine($result);
|
||||
}
|
||||
|
||||
if ($result instanceof Awaitable) {
|
||||
if ($result instanceof Promise) {
|
||||
$result = yield $result;
|
||||
}
|
||||
|
||||
@ -278,12 +278,12 @@ class Fork implements Process, Strand {
|
||||
* Gets a promise that resolves when the context ends and joins with the
|
||||
* parent context.
|
||||
*
|
||||
* @return \Interop\Async\Awaitable<int>
|
||||
* @return \Interop\Async\Promise<int>
|
||||
*
|
||||
* @throws \Amp\Parallel\StatusError Thrown if the context has not been started.
|
||||
* @throws \Amp\Parallel\SynchronizationError Thrown if an exit status object is not received.
|
||||
*/
|
||||
public function join(): Awaitable {
|
||||
public function join(): Promise {
|
||||
if (null === $this->channel) {
|
||||
throw new StatusError('The fork has not been started or has already finished.');
|
||||
}
|
||||
@ -318,7 +318,7 @@ class Fork implements Process, Strand {
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function receive(): Awaitable {
|
||||
public function receive(): Promise {
|
||||
if (null === $this->channel) {
|
||||
throw new StatusError('The process has not been started.');
|
||||
}
|
||||
@ -340,7 +340,7 @@ class Fork implements Process, Strand {
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function send($data): Awaitable {
|
||||
public function send($data): Promise {
|
||||
if (null === $this->channel) {
|
||||
throw new StatusError('The fork has not been started or has already finished.');
|
||||
}
|
||||
|
@ -4,7 +4,7 @@ namespace Amp\Parallel\Process;
|
||||
|
||||
use Amp\Parallel\{ Process as ProcessContext, StatusError, Strand, SynchronizationError };
|
||||
use Amp\Parallel\Sync\{ ChannelledSocket, Internal\ExitStatus };
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
class ChannelledProcess implements ProcessContext, Strand {
|
||||
/** @var \Amp\Parallel\Process\Process */
|
||||
@ -49,7 +49,7 @@ class ChannelledProcess implements ProcessContext, Strand {
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function receive(): Awaitable {
|
||||
public function receive(): Promise {
|
||||
if ($this->channel === null) {
|
||||
throw new StatusError("The process has not been started");
|
||||
}
|
||||
@ -70,7 +70,7 @@ class ChannelledProcess implements ProcessContext, Strand {
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function send($data): Awaitable {
|
||||
public function send($data): Promise {
|
||||
if ($this->channel === null) {
|
||||
throw new StatusError("The process has not been started");
|
||||
}
|
||||
@ -85,7 +85,7 @@ class ChannelledProcess implements ProcessContext, Strand {
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function join(): Awaitable {
|
||||
public function join(): Promise {
|
||||
return $this->process->join();
|
||||
}
|
||||
|
||||
|
@ -4,7 +4,7 @@ namespace Amp\Parallel\Process;
|
||||
|
||||
use Amp\Deferred;
|
||||
use Amp\Parallel\{ ContextException, Process as ProcessContext, StatusError };
|
||||
use Interop\Async\{ Awaitable, Loop };
|
||||
use Interop\Async\{ Loop, Promise };
|
||||
|
||||
class Process implements ProcessContext {
|
||||
/** @var resource|null */
|
||||
@ -189,11 +189,11 @@ class Process implements ProcessContext {
|
||||
}
|
||||
|
||||
/**
|
||||
* @return \Interop\Async\Awaitable<int> Resolves with exit status.
|
||||
* @return \Interop\Async\Promise<int> Resolves with exit status.
|
||||
*
|
||||
* @throws \Amp\Parallel\StatusError If the process has not been started.
|
||||
*/
|
||||
public function join(): Awaitable {
|
||||
public function join(): Promise {
|
||||
if ($this->deferred === null) {
|
||||
throw new StatusError("The process has not been started");
|
||||
}
|
||||
@ -208,7 +208,7 @@ class Process implements ProcessContext {
|
||||
\fclose($this->stderr);
|
||||
}
|
||||
|
||||
return $this->deferred->getAwaitable();
|
||||
return $this->deferred->promise();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2,14 +2,14 @@
|
||||
|
||||
namespace Amp\Parallel\Sync;
|
||||
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
/**
|
||||
* Interface for sending messages between execution contexts.
|
||||
*/
|
||||
interface Channel {
|
||||
/**
|
||||
* @return \Interop\Async\Awaitable<mixed>
|
||||
* @return \Interop\Async\Promise<mixed>
|
||||
*
|
||||
* @throws \Amp\Parallel\StatusError Thrown if the context has not been started.
|
||||
* @throws \Amp\Parallel\SynchronizationError If the context has not been started or the context
|
||||
@ -17,12 +17,12 @@ interface Channel {
|
||||
* @throws \Amp\Parallel\ChannelException If receiving from the channel fails.
|
||||
* @throws \Amp\Parallel\SerializationException If unserializing the data fails.
|
||||
*/
|
||||
public function receive(): Awaitable;
|
||||
public function receive(): Promise;
|
||||
|
||||
/**
|
||||
* @param mixed $data
|
||||
*
|
||||
* @return \Interop\Async\Awaitable<int> Resolves with the number of bytes sent on the channel.
|
||||
* @return \Interop\Async\Promise<int> Resolves with the number of bytes sent on the channel.
|
||||
*
|
||||
* @throws \Amp\Parallel\StatusError Thrown if the context has not been started.
|
||||
* @throws \Amp\Parallel\SynchronizationError If the context has not been started or the context
|
||||
@ -31,5 +31,5 @@ interface Channel {
|
||||
* @throws \Error If an ExitStatus object is given.
|
||||
* @throws \Amp\Parallel\SerializationException If serializing the data fails.
|
||||
*/
|
||||
public function send($data): Awaitable;
|
||||
public function send($data): Promise;
|
||||
}
|
||||
|
@ -4,7 +4,7 @@ namespace Amp\Parallel\Sync;
|
||||
|
||||
use Amp\{ Coroutine, Deferred, Failure, Success };
|
||||
use Amp\Parallel\{ ChannelException, SerializationException };
|
||||
use Interop\Async\{ Awaitable, Loop };
|
||||
use Interop\Async\{ Loop, Promise };
|
||||
|
||||
class ChannelledSocket implements Channel {
|
||||
const HEADER_LENGTH = 5;
|
||||
@ -237,7 +237,7 @@ class ChannelledSocket implements Channel {
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function receive(): Awaitable {
|
||||
public function receive(): Promise {
|
||||
if (!$this->open) {
|
||||
return new Failure(new ChannelException("The channel is has been closed"));
|
||||
}
|
||||
@ -247,16 +247,16 @@ class ChannelledSocket implements Channel {
|
||||
|
||||
Loop::enable($this->readWatcher);
|
||||
|
||||
return $deferred->getAwaitable();
|
||||
return $deferred->promise();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $data
|
||||
* @param bool $end
|
||||
*
|
||||
* @return \Interop\Async\Awaitable
|
||||
* @return \Interop\Async\Promise
|
||||
*/
|
||||
public function send($data): Awaitable {
|
||||
public function send($data): Promise {
|
||||
if (!$this->open) {
|
||||
return new Failure(new ChannelException("The channel is has been closed"));
|
||||
}
|
||||
@ -303,7 +303,7 @@ class ChannelledSocket implements Channel {
|
||||
Loop::enable($this->writeWatcher);
|
||||
|
||||
try {
|
||||
$written = yield $deferred->getAwaitable();
|
||||
$written = yield $deferred->promise();
|
||||
} catch (\Throwable $exception) {
|
||||
$this->close();
|
||||
throw $exception;
|
||||
|
@ -5,7 +5,7 @@ namespace Amp\Parallel\Sync;
|
||||
use Amp\Coroutine;
|
||||
use Amp\Parallel\{ ChannelException, SerializationException };
|
||||
use Amp\Stream\Stream;
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
/**
|
||||
* An asynchronous channel for sending data between threads and processes.
|
||||
@ -55,11 +55,11 @@ class ChannelledStream implements Channel {
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function send($data): Awaitable {
|
||||
public function send($data): Promise {
|
||||
return new Coroutine($this->doSend($data));
|
||||
}
|
||||
|
||||
public function doSend($data): \Generator {
|
||||
private function doSend($data): \Generator {
|
||||
// Serialize the data to send into the channel.
|
||||
try {
|
||||
$data = \serialize($data);
|
||||
@ -79,11 +79,11 @@ class ChannelledStream implements Channel {
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function receive(): Awaitable {
|
||||
public function receive(): Promise {
|
||||
return new Coroutine($this->doReceive());
|
||||
}
|
||||
|
||||
public function doReceive(): \Generator {
|
||||
private function doReceive(): \Generator {
|
||||
try {
|
||||
// Read the message length first to determine how much needs to be read from the stream.
|
||||
$buffer = yield $this->read->read(self::HEADER_LENGTH);
|
||||
|
@ -4,7 +4,7 @@ namespace Amp\Parallel\Sync;
|
||||
|
||||
use Amp\{ Coroutine, Pause };
|
||||
use Amp\Parallel\MutexException;
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
/**
|
||||
* A cross-platform mutex that uses exclusive files as the lock mechanism.
|
||||
@ -37,7 +37,7 @@ class FileMutex implements Mutex {
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function acquire(): Awaitable {
|
||||
public function acquire(): Promise {
|
||||
return new Coroutine($this->doAcquire());
|
||||
}
|
||||
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
namespace Amp\Parallel\Sync;
|
||||
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
/**
|
||||
* A non-blocking synchronization primitive that can be used for mutual exclusion across contexts.
|
||||
@ -18,8 +18,8 @@ interface Mutex
|
||||
*
|
||||
* Acquires a lock on the mutex.
|
||||
*
|
||||
* @return \Interop\Async\Awaitable<\Amp\Parallel\Sync\Lock> Resolves with a lock object when the acquire is
|
||||
* @return \Interop\Async\Promise<\Amp\Parallel\Sync\Lock> Resolves with a lock object when the acquire is
|
||||
* successful.
|
||||
*/
|
||||
public function acquire(): Awaitable;
|
||||
public function acquire(): Promise;
|
||||
}
|
||||
|
@ -4,7 +4,7 @@ namespace Amp\Parallel\Sync;
|
||||
|
||||
use Amp\{ Coroutine, Pause };
|
||||
use Amp\Parallel\SemaphoreException;
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
/**
|
||||
* A non-blocking, interprocess POSIX semaphore.
|
||||
@ -120,7 +120,7 @@ class PosixSemaphore implements Semaphore, \Serializable {
|
||||
return $stat['msg_qnum'];
|
||||
}
|
||||
|
||||
public function acquire(): Awaitable {
|
||||
public function acquire(): Promise {
|
||||
return new Coroutine($this->doAcquire());
|
||||
}
|
||||
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
namespace Amp\Parallel\Sync;
|
||||
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
/**
|
||||
* A non-blocking counting semaphore.
|
||||
@ -34,8 +34,8 @@ interface Semaphore extends \Countable {
|
||||
* If there are one or more locks available, this function resolves immediately with a lock and the lock count is
|
||||
* decreased. If no locks are available, the semaphore waits asynchronously for a lock to become available.
|
||||
*
|
||||
* @return \Interop\Async\Awaitable<\Amp\Parallel\Sync\Lock> Resolves with a lock object when the acquire is
|
||||
* @return \Interop\Async\Promise<\Amp\Parallel\Sync\Lock> Resolves with a lock object when the acquire is
|
||||
* successful.
|
||||
*/
|
||||
public function acquire(): Awaitable;
|
||||
public function acquire(): Promise;
|
||||
}
|
||||
|
@ -4,7 +4,7 @@ namespace Amp\Parallel\Sync;
|
||||
|
||||
use Amp\Coroutine;
|
||||
use Amp\Parallel\SharedMemoryException;
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
/**
|
||||
* A container object for sharing a value across contexts.
|
||||
@ -163,7 +163,7 @@ class SharedMemoryParcel implements Parcel, \Serializable {
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function synchronized(callable $callback): Awaitable {
|
||||
public function synchronized(callable $callback): Promise {
|
||||
return new Coroutine($this->doSynchronized($callback));
|
||||
}
|
||||
|
||||
@ -186,7 +186,7 @@ class SharedMemoryParcel implements Parcel, \Serializable {
|
||||
$result = new Coroutine($result);
|
||||
}
|
||||
|
||||
if ($result instanceof Awaitable) {
|
||||
if ($result instanceof Promise) {
|
||||
$result = yield $result;
|
||||
}
|
||||
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
namespace Amp\Parallel\Sync;
|
||||
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
/**
|
||||
* An object that can be synchronized for exclusive access across contexts.
|
||||
@ -17,8 +17,8 @@ interface Synchronizable {
|
||||
* @param callable<(mixed ...$args): \Generator|mixed> $callback The synchronized callback to invoke.
|
||||
* The callback may be a regular function or a coroutine.
|
||||
*
|
||||
* @return \Interop\Async\Awaitable<mixed> Resolves with the return value of $callback or fails if $callback
|
||||
* @return \Interop\Async\Promise<mixed> Resolves with the return value of $callback or fails if $callback
|
||||
* throws an exception.
|
||||
*/
|
||||
public function synchronized(callable $callback): Awaitable;
|
||||
public function synchronized(callable $callback): Promise;
|
||||
}
|
||||
|
@ -4,7 +4,7 @@ namespace Amp\Parallel\Threading\Internal;
|
||||
|
||||
use Amp\{ Coroutine, Pause };
|
||||
use Amp\Parallel\Sync\Lock;
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
/**
|
||||
* @internal
|
||||
@ -16,9 +16,9 @@ class Mutex extends \Threaded {
|
||||
private $lock = true;
|
||||
|
||||
/**
|
||||
* @return \Interop\Async\Awaitable
|
||||
* @return \Interop\Async\Promise
|
||||
*/
|
||||
public function acquire(): Awaitable {
|
||||
public function acquire(): Promise {
|
||||
return new Coroutine($this->doAcquire());
|
||||
}
|
||||
|
||||
|
@ -4,7 +4,7 @@ namespace Amp\Parallel\Threading\Internal;
|
||||
|
||||
use Amp\{ Coroutine, Pause };
|
||||
use Amp\Parallel\Sync\Lock;
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
/**
|
||||
* An asynchronous semaphore based on pthreads' synchronization methods.
|
||||
@ -36,9 +36,9 @@ class Semaphore extends \Threaded {
|
||||
}
|
||||
|
||||
/**
|
||||
* @return \Interop\Async\Awaitable
|
||||
* @return \Interop\Async\Promise
|
||||
*/
|
||||
public function acquire(): Awaitable {
|
||||
public function acquire(): Promise {
|
||||
return new Coroutine($this->doAcquire());
|
||||
}
|
||||
|
||||
|
@ -5,7 +5,7 @@ namespace Amp\Parallel\Threading\Internal;
|
||||
use Amp\Coroutine;
|
||||
use Amp\Parallel\{ ChannelException, SerializationException };
|
||||
use Amp\Parallel\Sync\{ Channel, ChannelledSocket, Internal\ExitFailure, Internal\ExitSuccess };
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
/**
|
||||
* An internal thread that executes a given function concurrently.
|
||||
@ -126,7 +126,7 @@ class Thread extends \Thread {
|
||||
$result = new Coroutine($result);
|
||||
}
|
||||
|
||||
if ($result instanceof Awaitable) {
|
||||
if ($result instanceof Promise) {
|
||||
$result = yield $result;
|
||||
}
|
||||
|
||||
|
@ -3,7 +3,7 @@
|
||||
namespace Amp\Parallel\Threading;
|
||||
|
||||
use Amp\Parallel\Sync\Mutex as SyncMutex;
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
/**
|
||||
* A thread-safe, asynchronous mutex using the pthreads locking mechanism.
|
||||
@ -31,7 +31,7 @@ class Mutex implements SyncMutex {
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function acquire(): Awaitable {
|
||||
public function acquire(): Promise {
|
||||
return $this->mutex->acquire();
|
||||
}
|
||||
|
||||
|
@ -4,7 +4,7 @@ namespace Amp\Parallel\Threading;
|
||||
|
||||
use Amp\Coroutine;
|
||||
use Amp\Parallel\Sync\Parcel as SyncParcel;
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
/**
|
||||
* A thread-safe container that shares a value between multiple threads.
|
||||
@ -48,9 +48,9 @@ class Parcel implements SyncParcel {
|
||||
}
|
||||
|
||||
/**
|
||||
* @return \Interop\Async\Awaitable
|
||||
* @return \Interop\Async\Promise
|
||||
*/
|
||||
public function synchronized(callable $callback): Awaitable {
|
||||
public function synchronized(callable $callback): Promise {
|
||||
return new Coroutine($this->doSynchronized($callback));
|
||||
}
|
||||
|
||||
@ -76,7 +76,7 @@ class Parcel implements SyncParcel {
|
||||
$result = new Coroutine($result);
|
||||
}
|
||||
|
||||
if ($result instanceof Awaitable) {
|
||||
if ($result instanceof Promise) {
|
||||
$result = yield $result;
|
||||
}
|
||||
|
||||
|
@ -3,7 +3,7 @@
|
||||
namespace Amp\Parallel\Threading;
|
||||
|
||||
use Amp\Parallel\Sync\Semaphore as SyncSemaphore;
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
/**
|
||||
* An asynchronous semaphore based on pthreads' synchronization methods.
|
||||
@ -60,7 +60,7 @@ class Semaphore implements SyncSemaphore {
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function acquire(): Awaitable {
|
||||
public function acquire(): Promise {
|
||||
return $this->semaphore->acquire();
|
||||
}
|
||||
|
||||
|
@ -5,7 +5,7 @@ namespace Amp\Parallel\Threading;
|
||||
use Amp\Coroutine;
|
||||
use Amp\Parallel\{ ContextException, StatusError, SynchronizationError, Strand };
|
||||
use Amp\Parallel\Sync\{ ChannelledSocket, Internal\ExitStatus };
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
/**
|
||||
* Implements an execution context using native multi-threading.
|
||||
@ -177,12 +177,12 @@ class Thread implements Strand {
|
||||
* Gets a promise that resolves when the context ends and joins with the
|
||||
* parent context.
|
||||
*
|
||||
* @return \Interop\Async\Awaitable<mixed>
|
||||
* @return \Interop\Async\Promise<mixed>
|
||||
*
|
||||
* @throws StatusError Thrown if the context has not been started.
|
||||
* @throws SynchronizationError Thrown if an exit status object is not received.
|
||||
*/
|
||||
public function join(): Awaitable {
|
||||
public function join(): Promise {
|
||||
if ($this->channel == null || $this->thread === null) {
|
||||
throw new StatusError('The thread has not been started or has already finished.');
|
||||
}
|
||||
@ -217,7 +217,7 @@ class Thread implements Strand {
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function receive(): Awaitable {
|
||||
public function receive(): Promise {
|
||||
if ($this->channel === null) {
|
||||
throw new StatusError('The process has not been started.');
|
||||
}
|
||||
@ -238,7 +238,7 @@ class Thread implements Strand {
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function send($data): Awaitable {
|
||||
public function send($data): Promise {
|
||||
if ($this->channel === null) {
|
||||
throw new StatusError('The thread has not been started or has already finished.');
|
||||
}
|
||||
|
@ -5,7 +5,7 @@ namespace Amp\Parallel\Worker;
|
||||
use Amp\{ Coroutine, Deferred };
|
||||
use Amp\Parallel\{ StatusError, Strand, WorkerException} ;
|
||||
use Amp\Parallel\Worker\Internal\{ Job, TaskResult };
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
/**
|
||||
* Base class for most common types of task workers.
|
||||
@ -54,7 +54,7 @@ abstract class AbstractWorker implements Worker {
|
||||
$this->context->receive()->when($this->when);
|
||||
}
|
||||
|
||||
$deferred->resolve($data->getAwaitable());
|
||||
$deferred->resolve($data->promise());
|
||||
};
|
||||
}
|
||||
|
||||
@ -82,7 +82,7 @@ abstract class AbstractWorker implements Worker {
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function enqueue(Task $task): Awaitable {
|
||||
public function enqueue(Task $task): Promise {
|
||||
if (!$this->context->isRunning()) {
|
||||
throw new StatusError('The worker has not been started.');
|
||||
}
|
||||
@ -118,13 +118,13 @@ abstract class AbstractWorker implements Worker {
|
||||
throw new WorkerException('Sending the task to the worker failed.', $exception);
|
||||
}
|
||||
|
||||
return yield $deferred->getAwaitable();
|
||||
return yield $deferred->promise();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function shutdown(): Awaitable {
|
||||
public function shutdown(): Promise {
|
||||
if (!$this->context->isRunning() || $this->shutdown) {
|
||||
throw new StatusError('The worker is not running.');
|
||||
}
|
||||
|
@ -4,7 +4,7 @@ namespace Amp\Parallel\Worker;
|
||||
|
||||
use Amp\{ CallableMaker, Coroutine };
|
||||
use Amp\Parallel\StatusError;
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
/**
|
||||
* Provides a pool of workers that can be used to execute multiple tasks asynchronously.
|
||||
@ -149,12 +149,12 @@ class DefaultPool implements Pool {
|
||||
*
|
||||
* @param Task $task The task to enqueue.
|
||||
*
|
||||
* @return \Interop\Async\Awaitable<mixed> The return value of Task::run().
|
||||
* @return \Interop\Async\Promise<mixed> The return value of Task::run().
|
||||
*
|
||||
* @throws \Amp\Parallel\StatusError If the pool has not been started.
|
||||
* @throws \Amp\Parallel\TaskException If the task throws an exception.
|
||||
*/
|
||||
public function enqueue(Task $task): Awaitable {
|
||||
public function enqueue(Task $task): Promise {
|
||||
return new Coroutine($this->doEnqueue($this->pull(), $task));
|
||||
}
|
||||
|
||||
@ -183,11 +183,11 @@ class DefaultPool implements Pool {
|
||||
*
|
||||
* @coroutine
|
||||
*
|
||||
* @return \Interop\Async\Awaitable<int[]> Array of exit status from all workers.
|
||||
* @return \Interop\Async\Promise<int[]> Array of exit status from all workers.
|
||||
*
|
||||
* @throws \Amp\Parallel\StatusError If the pool has not been started.
|
||||
*/
|
||||
public function shutdown(): Awaitable {
|
||||
public function shutdown(): Promise {
|
||||
if (!$this->isRunning()) {
|
||||
throw new StatusError('The pool is not running.');
|
||||
}
|
||||
|
@ -3,7 +3,7 @@
|
||||
namespace Amp\Parallel\Worker\Internal;
|
||||
|
||||
use Amp\Parallel\Worker\{ Task, Worker };
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
class PooledWorker implements Worker {
|
||||
/** @var callable */
|
||||
@ -52,14 +52,14 @@ class PooledWorker implements Worker {
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function enqueue(Task $task): Awaitable {
|
||||
public function enqueue(Task $task): Promise {
|
||||
return $this->worker->enqueue($task);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function shutdown(): Awaitable {
|
||||
public function shutdown(): Promise {
|
||||
return $this->worker->shutdown();
|
||||
}
|
||||
|
||||
|
@ -4,7 +4,7 @@ namespace Amp\Parallel\Worker\Internal;
|
||||
|
||||
use Amp\Failure;
|
||||
use Amp\Parallel\TaskException;
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
class TaskFailure extends TaskResult {
|
||||
/** @var string */
|
||||
@ -27,7 +27,7 @@ class TaskFailure extends TaskResult {
|
||||
$this->trace = $exception->getTraceAsString();
|
||||
}
|
||||
|
||||
public function getAwaitable(): Awaitable {
|
||||
public function promise(): Promise {
|
||||
return new Failure(new TaskException(
|
||||
$this->type,
|
||||
sprintf('Uncaught exception in worker of type "%s" with message "%s"', $this->type, $this->message),
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
namespace Amp\Parallel\Worker\Internal;
|
||||
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
abstract class TaskResult {
|
||||
/** @var string Task identifier. */
|
||||
@ -23,7 +23,7 @@ abstract class TaskResult {
|
||||
}
|
||||
|
||||
/**
|
||||
* @return \Interop\Async\Awaitable<mixed> Resolved with the task result or failure reason.
|
||||
* @return \Interop\Async\Promise<mixed> Resolved with the task result or failure reason.
|
||||
*/
|
||||
abstract public function getAwaitable(): Awaitable;
|
||||
abstract public function promise(): Promise;
|
||||
}
|
@ -4,7 +4,7 @@ namespace Amp\Parallel\Worker\Internal;
|
||||
|
||||
use Amp\{ Coroutine, Failure, Success };
|
||||
use Amp\Parallel\{ Sync\Channel, Worker\Environment };
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
class TaskRunner {
|
||||
/** @var \Amp\Parallel\Sync\Channel */
|
||||
@ -21,9 +21,9 @@ class TaskRunner {
|
||||
/**
|
||||
* Runs the task runner, receiving tasks from the parent and sending the result of those tasks.
|
||||
*
|
||||
* @return \Interop\Async\Awaitable
|
||||
* @return \Interop\Async\Promise
|
||||
*/
|
||||
public function run(): Awaitable {
|
||||
public function run(): Promise {
|
||||
return new Coroutine($this->execute());
|
||||
}
|
||||
|
||||
@ -45,7 +45,7 @@ class TaskRunner {
|
||||
$result = new Coroutine($result);
|
||||
}
|
||||
|
||||
if (!$result instanceof Awaitable) {
|
||||
if (!$result instanceof Promise) {
|
||||
$result = new Success($result);
|
||||
}
|
||||
} catch (\Throwable $exception) {
|
||||
|
@ -3,7 +3,7 @@
|
||||
namespace Amp\Parallel\Worker\Internal;
|
||||
|
||||
use Amp\Success;
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
class TaskSuccess extends TaskResult {
|
||||
/** @var mixed Result of task. */
|
||||
@ -14,7 +14,7 @@ class TaskSuccess extends TaskResult {
|
||||
$this->result = $result;
|
||||
}
|
||||
|
||||
public function getAwaitable(): Awaitable {
|
||||
public function promise(): Promise {
|
||||
return new Success($this->result);
|
||||
}
|
||||
}
|
||||
|
@ -13,7 +13,7 @@ interface Task {
|
||||
*
|
||||
* @param \Amp\Parallel\Worker\Environment
|
||||
*
|
||||
* @return mixed|\Interop\Async\Awaitable|\Generator
|
||||
* @return mixed|\Interop\Async\Promise|\Generator
|
||||
*/
|
||||
public function run(Environment $environment);
|
||||
}
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
namespace Amp\Parallel\Worker;
|
||||
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
/**
|
||||
* An interface for a parallel worker thread that runs a queue of tasks.
|
||||
@ -32,14 +32,14 @@ interface Worker {
|
||||
*
|
||||
* @param Task $task The task to enqueue.
|
||||
*
|
||||
* @return \Interop\Async\Awaitable<mixed> Resolves with the return value of Task::run().
|
||||
* @return \Interop\Async\Promise<mixed> Resolves with the return value of Task::run().
|
||||
*/
|
||||
public function enqueue(Task $task): Awaitable;
|
||||
public function enqueue(Task $task): Promise;
|
||||
|
||||
/**
|
||||
* @return \Interop\Async\Awaitable<int> Exit code.
|
||||
* @return \Interop\Async\Promise<int> Exit code.
|
||||
*/
|
||||
public function shutdown(): Awaitable;
|
||||
public function shutdown(): Promise;
|
||||
|
||||
/**
|
||||
* Immediately kills the context.
|
||||
|
@ -4,14 +4,14 @@ namespace Amp\Parallel\Worker;
|
||||
|
||||
use Amp\Parallel\Forking\Fork;
|
||||
use Amp\Parallel\Worker\Internal\TaskRunner;
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
/**
|
||||
* A worker thread that executes task objects.
|
||||
*/
|
||||
class WorkerFork extends AbstractWorker {
|
||||
public function __construct() {
|
||||
parent::__construct(new Fork(function (): Awaitable {
|
||||
parent::__construct(new Fork(function (): Promise {
|
||||
$runner = new TaskRunner($this, new BasicEnvironment);
|
||||
return $runner->run();
|
||||
}));
|
||||
|
@ -4,14 +4,14 @@ namespace Amp\Parallel\Worker;
|
||||
|
||||
use Amp\Parallel\Threading\Thread;
|
||||
use Amp\Parallel\Worker\Internal\TaskRunner;
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
/**
|
||||
* A worker thread that executes task objects.
|
||||
*/
|
||||
class WorkerThread extends AbstractWorker {
|
||||
public function __construct() {
|
||||
parent::__construct(new Thread(function (): Awaitable {
|
||||
parent::__construct(new Thread(function (): Promise {
|
||||
$runner = new TaskRunner($this, new BasicEnvironment);
|
||||
return $runner->run();
|
||||
}));
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
namespace Amp\Parallel\Worker;
|
||||
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
/**
|
||||
* Returns the global worker pool for the current context.
|
||||
@ -32,9 +32,9 @@ function pool(Pool $pool = null): Pool {
|
||||
*
|
||||
* @param \Amp\Parallel\Worker\Task $task The task to enqueue.
|
||||
*
|
||||
* @return \Interop\Async\Awaitable<mixed>
|
||||
* @return \Interop\Async\Promise<mixed>
|
||||
*/
|
||||
function enqueue(Task $task): Awaitable {
|
||||
function enqueue(Task $task): Promise {
|
||||
return pool()->enqueue($task);
|
||||
}
|
||||
|
||||
|
@ -6,7 +6,7 @@ use Amp\Parallel\Worker;
|
||||
use Amp\Parallel\Worker\{ Environment, Pool, Task, WorkerFactory };
|
||||
use Amp\Parallel\Test\TestCase;
|
||||
use Amp\Success;
|
||||
use Interop\Async\Awaitable;
|
||||
use Interop\Async\Promise;
|
||||
|
||||
class FunctionsTest extends TestCase {
|
||||
public function testPool() {
|
||||
@ -23,7 +23,7 @@ class FunctionsTest extends TestCase {
|
||||
public function testEnqueue() {
|
||||
$pool = $this->createMock(Pool::class);
|
||||
$pool->method('enqueue')
|
||||
->will($this->returnCallback(function (Task $task): Awaitable {
|
||||
->will($this->returnCallback(function (Task $task): Promise {
|
||||
return new Success($task->run($this->createMock(Environment::class)));
|
||||
}));
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user