Return a Future from Channel::send()

This commit is contained in:
Aaron Piotrowski 2021-11-21 12:27:07 -06:00
parent 54ca955c69
commit 8b048caa71
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
12 changed files with 64 additions and 77 deletions

View File

@ -71,8 +71,8 @@ if (\function_exists("cli_set_process_title")) {
$channel = new Sync\ChannelledSocket($socket, $socket);
try {
$channel->send($key);
} catch (\Throwable $exception) {
$channel->send($key)->await();
} catch (\Throwable) {
\trigger_error("Could not send key to parent", E_USER_ERROR);
}
@ -104,12 +104,15 @@ if (\function_exists("cli_set_process_title")) {
try {
try {
$channel->send($result);
$channel->send($result)->await();
} catch (Sync\SerializationException $exception) {
// Serializing the result failed. Send the reason why.
$channel->send(new Sync\ExitFailure($exception));
$channel->send(new Sync\ExitFailure($exception))->await();
}
} catch (\Throwable $exception) {
\trigger_error("Could not send result to parent; be sure to shutdown the child before ending the parent", E_USER_ERROR);
\trigger_error(sprintf(
"Could not send result to parent: '%s'; be sure to shutdown the child before ending the parent",
$exception->getMessage(),
), E_USER_ERROR);
}
})();

View File

@ -188,8 +188,8 @@ final class Parallel implements Context
$channel = new ChannelledSocket($socket, $socket);
try {
$channel->send($key);
} catch (\Throwable $exception) {
$channel->send($key)->await();
} catch (\Throwable) {
\trigger_error("Could not send key to parent", E_USER_ERROR);
return 1;
}
@ -236,16 +236,16 @@ final class Parallel implements Context
}
try {
$channel->send($result);
$channel->send($result)->await();
} catch (SerializationException $exception) {
// Serializing the result failed. Send the reason why.
$channel->send(new ExitFailure($exception));
$channel->send(new ExitFailure($exception))->await();
}
} catch (\Throwable $exception) {
\trigger_error(
"Could not send result to parent; be sure to shutdown the child before ending the parent",
E_USER_ERROR
);
\trigger_error(sprintf(
"Could not send result to parent: '%s'; be sure to shutdown the child before ending the parent",
$exception->getMessage(),
), E_USER_ERROR);
return 1;
} finally {
$channel->close();
@ -355,7 +355,7 @@ final class Parallel implements Context
/**
* {@inheritdoc}
*/
public function send(mixed $data): void
public function send(mixed $data): Future
{
if ($this->channel === null) {
throw new StatusError('The thread has not been started or has already finished.');
@ -365,9 +365,7 @@ final class Parallel implements Context
throw new \Error('Cannot send exit result objects.');
}
try {
$this->channel->send($data);
} catch (ChannelException $e) {
return $this->channel->send($data)->catch(function (\Throwable $e): mixed {
if ($this->channel === null) {
throw new ContextException(
"The thread stopped responding, potentially due to a fatal error or calling exit",
@ -391,7 +389,7 @@ final class Parallel implements Context
'Thread unexpectedly exited with result of type: %s',
\is_object($data) ? \get_class($data) : \gettype($data)
), 0, $e);
}
});
}
/**

View File

@ -4,6 +4,7 @@ namespace Amp\Parallel\Context;
use Amp\CancellationToken;
use Amp\CancelledException;
use Amp\Future;
use Amp\Parallel\Sync\ChannelException;
use Amp\Parallel\Sync\ChannelledSocket;
use Amp\Parallel\Sync\ExitResult;
@ -232,7 +233,7 @@ final class Process implements Context
/**
* {@inheritdoc}
*/
public function send(mixed $data): void
public function send(mixed $data): Future
{
if ($this->channel === null) {
throw new StatusError("The process has not been started");
@ -242,16 +243,14 @@ final class Process implements Context
throw new \Error("Cannot send exit result objects");
}
try {
$this->channel->send($data);
} catch (ChannelException $e) {
return $this->channel->send($data)->catch(function (\Throwable $e): mixed {
if ($this->channel === null) {
throw new ContextException("The process stopped responding, potentially due to a fatal error or calling exit", 0, $e);
}
try {
$data = launch(fn () => $this->join())->await(new TimeoutCancellationToken(0.1));
} catch (ContextException | ChannelException | CancelledException $ignored) {
} catch (ContextException | ChannelException | CancelledException) {
if ($this->isRunning()) {
$this->kill();
}
@ -262,7 +261,7 @@ final class Process implements Context
'Process unexpectedly exited with result of type: %s',
\is_object($data) ? \get_class($data) : \gettype($data)
), 0, $e);
}
});
}
/**

View File

@ -3,6 +3,7 @@
namespace Amp\Parallel\Sync;
use Amp\CancellationToken;
use Amp\Future;
use Amp\Parallel\Context\StatusError;
/**
@ -27,6 +28,8 @@ interface Channel
/**
* @param mixed $data
*
* @return Future Resolved with the data has been successfully written to the channel.
*
* @throws StatusError Thrown if the context has not been started.
* @throws SynchronizationError If the context has not been started or the context
* unexpectedly ends.
@ -34,5 +37,5 @@ interface Channel
* @throws \Error If an ExitResult object is given.
* @throws SerializationException If serializing the data fails.
*/
public function send(mixed $data): void;
public function send(mixed $data): Future;
}

View File

@ -5,6 +5,7 @@ namespace Amp\Parallel\Sync;
use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\CancellationToken;
use Amp\Future;
use Amp\Serialization\Serializer;
final class ChannelledSocket implements Channel
@ -42,9 +43,9 @@ final class ChannelledSocket implements Channel
/**
* {@inheritdoc}
*/
public function send(mixed $data): void
public function send(mixed $data): Future
{
$this->channel->send($data);
return $this->channel->send($data);
}
public function unreference(): void

View File

@ -6,6 +6,7 @@ use Amp\ByteStream\InputStream;
use Amp\ByteStream\OutputStream;
use Amp\ByteStream\StreamException;
use Amp\CancellationToken;
use Amp\Future;
use Amp\Serialization\Serializer;
/**
@ -26,8 +27,8 @@ final class ChannelledStream implements Channel
/**
* Creates a new channel from the given stream objects. Note that $read and $write can be the same object.
*
* @param InputStream $read
* @param OutputStream $write
* @param InputStream $read
* @param OutputStream $write
* @param Serializer|null $serializer
*/
public function __construct(InputStream $read, OutputStream $write, ?Serializer $serializer = null)
@ -41,13 +42,12 @@ final class ChannelledStream implements Channel
/**
* {@inheritdoc}
*/
public function send(mixed $data): void
public function send(mixed $data): Future
{
try {
$this->write->write($this->parser->encode($data))->await();
} catch (StreamException $exception) {
throw new ChannelException("Sending on the channel failed. Did the context die?", 0, $exception);
}
return $this->write->write($this->parser->encode($data))
->catch(static function (\Throwable $exception): mixed {
throw new ChannelException("Sending on the channel failed. Did the context die?", 0, $exception);
});
}
/**

View File

@ -4,6 +4,7 @@ namespace Amp\Parallel\Worker\Internal;
use Amp\ByteStream;
use Amp\CancellationToken;
use Amp\Future;
use Amp\Parallel\Context\Context;
use Amp\Parallel\Context\ContextException;
use Amp\Parallel\Context\Process;
@ -25,9 +26,9 @@ class WorkerProcess implements Context
return $this->process->receive($token);
}
public function send($data): void
public function send($data): Future
{
$this->process->send($data);
return $this->process->send($data);
}
public function isRunning(): bool

View File

@ -54,10 +54,10 @@ final class TaskRunner
}
try {
$this->channel->send($result);
$this->channel->send($result)->await();
} catch (SerializationException $exception) {
// Could not serialize task result.
$this->channel->send(new Internal\TaskFailure($id, $exception));
$this->channel->send(new Internal\TaskFailure($id, $exception))->await();
}
});
continue;

View File

@ -11,7 +11,6 @@ use Amp\Parallel\Context\StatusError;
use Amp\Parallel\Sync\ChannelException;
use Amp\Serialization\SerializationException;
use Amp\TimeoutCancellationToken;
use Revolt\EventLoop;
use function Amp\launch;
/**
@ -133,27 +132,12 @@ abstract class TaskWorker implements Worker
$future = $deferred->getFuture();
try {
$this->context->send($job);
$this->context->send($job)->await();
if ($token) {
$context = $this->context;
$cancellationId = $token->subscribe(static function () use ($jobId, $context): void {
try {
$context->send($jobId);
} catch (\Throwable) {
return;
}
});
EventLoop::queue(static function () use ($future, $token, $cancellationId): void {
try {
$future->await();
} catch (\Throwable) {
// Ignored.
} finally {
$token->unsubscribe($cancellationId);
}
});
$cancellationId = $token->subscribe(static fn () => $context->send($jobId)->ignore());
$future->finally(fn () => $token->unsubscribe($cancellationId))->ignore();
}
} catch (SerializationException $exception) {
// Could not serialize Task object.
@ -203,7 +187,7 @@ abstract class TaskWorker implements Worker
// Wait for pending tasks to finish.
Future\settle(\array_map(fn (Deferred $deferred) => $deferred->getFuture(), $this->jobQueue));
$this->context->send(null);
$this->context->send(null)->await();
try {
return launch(fn () => $this->context->join())

View File

@ -51,7 +51,7 @@ abstract class AbstractContextTest extends AsyncTestCase
$context = $this->createContext(__DIR__ . "/Fixtures/throwing-process.php");
$context->start();
delay(0.1);
$context->send(1);
$context->send(1)->await();
}
public function testInvalidScriptPath(): void
@ -159,6 +159,6 @@ abstract class AbstractContextTest extends AsyncTestCase
$context = $this->createContext(__DIR__ . "/Fixtures/exiting-process.php");
$context->start();
delay(0.5);
$context->send(1);
$context->send(1)->await();
}
}

View File

@ -20,7 +20,7 @@ class ChannelledSocketTest extends AsyncTestCase
$message = 'hello';
EventLoop::queue(fn () => $a->send($message));
$a->send($message);
$data = $b->receive();
self::assertSame($message, $data);
}
@ -40,7 +40,7 @@ class ChannelledSocketTest extends AsyncTestCase
$message .= \chr(\mt_rand(0, 255));
}
EventLoop::queue(fn () => $a->send($message));
$a->send($message);
$data = $b->receive();
self::assertSame($message, $data);
}
@ -72,8 +72,7 @@ class ChannelledSocketTest extends AsyncTestCase
$b = new ChannelledSocket($right, $right);
// Close $a. $b should close on next read...
$a->send(function () {
});
$a->send(fn () => null)->await();
$data = $b->receive();
}
@ -88,7 +87,7 @@ class ChannelledSocketTest extends AsyncTestCase
$a = new ChannelledSocket($left, $left);
$a->close();
$a->send('hello');
$a->send('hello')->await();
}
/**
@ -118,7 +117,7 @@ class ChannelledSocketTest extends AsyncTestCase
}
$data = 'test';
$b->send($data);
$b->send($data)->await();
self::assertSame($data, $a->receive());
}
@ -128,10 +127,10 @@ class ChannelledSocketTest extends AsyncTestCase
protected function createSockets(): array
{
if (($sockets = @\stream_socket_pair(
\stripos(PHP_OS, "win") === 0 ? STREAM_PF_INET : STREAM_PF_UNIX,
STREAM_SOCK_STREAM,
STREAM_IPPROTO_IP
)) === false) {
\stripos(PHP_OS, "win") === 0 ? STREAM_PF_INET : STREAM_PF_UNIX,
STREAM_SOCK_STREAM,
STREAM_IPPROTO_IP
)) === false) {
$message = "Failed to create socket pair";
if ($error = \error_get_last()) {
$message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]);

View File

@ -22,7 +22,7 @@ class ChannelledStreamTest extends AsyncTestCase
$message = 'hello';
$a->send($message);
$a->send($message)->await();
$data = $b->receive();
$this->assertSame($message, $data);
}
@ -42,7 +42,7 @@ class ChannelledStreamTest extends AsyncTestCase
$message .= \chr(\mt_rand(0, 255));
}
$a->send($message);
$a->send($message)->await();
$data = $b->receive();
$this->assertSame($message, $data);
}
@ -75,8 +75,7 @@ class ChannelledStreamTest extends AsyncTestCase
$b = new ChannelledStream($mock, $mock);
// Close $a. $b should close on next read...
$a->send(function () {
});
$a->send(fn () => null)->await();
$data = $b->receive();
}
@ -90,7 +89,7 @@ class ChannelledStreamTest extends AsyncTestCase
$mock = $this->createMock(OutputStream::class);
$mock->expects($this->once())
->method('write')
->will($this->throwException(new StreamException));
->willReturn(Future::error(new StreamException));
$a = new ChannelledStream($this->createMock(InputStream::class), $mock);
$b = new ChannelledStream(
@ -98,7 +97,7 @@ class ChannelledStreamTest extends AsyncTestCase
$this->createMock(OutputStream::class)
);
$a->send('hello');
$a->send('hello')->await();
}
/**