diff --git a/lib/Context/Internal/process-runner.php b/lib/Context/Internal/process-runner.php index 4eb2ea4..acbc01d 100644 --- a/lib/Context/Internal/process-runner.php +++ b/lib/Context/Internal/process-runner.php @@ -71,8 +71,8 @@ if (\function_exists("cli_set_process_title")) { $channel = new Sync\ChannelledSocket($socket, $socket); try { - $channel->send($key); - } catch (\Throwable $exception) { + $channel->send($key)->await(); + } catch (\Throwable) { \trigger_error("Could not send key to parent", E_USER_ERROR); } @@ -104,12 +104,15 @@ if (\function_exists("cli_set_process_title")) { try { try { - $channel->send($result); + $channel->send($result)->await(); } catch (Sync\SerializationException $exception) { // Serializing the result failed. Send the reason why. - $channel->send(new Sync\ExitFailure($exception)); + $channel->send(new Sync\ExitFailure($exception))->await(); } } catch (\Throwable $exception) { - \trigger_error("Could not send result to parent; be sure to shutdown the child before ending the parent", E_USER_ERROR); + \trigger_error(sprintf( + "Could not send result to parent: '%s'; be sure to shutdown the child before ending the parent", + $exception->getMessage(), + ), E_USER_ERROR); } })(); diff --git a/lib/Context/Parallel.php b/lib/Context/Parallel.php index 9b4f022..da3074c 100644 --- a/lib/Context/Parallel.php +++ b/lib/Context/Parallel.php @@ -188,8 +188,8 @@ final class Parallel implements Context $channel = new ChannelledSocket($socket, $socket); try { - $channel->send($key); - } catch (\Throwable $exception) { + $channel->send($key)->await(); + } catch (\Throwable) { \trigger_error("Could not send key to parent", E_USER_ERROR); return 1; } @@ -236,16 +236,16 @@ final class Parallel implements Context } try { - $channel->send($result); + $channel->send($result)->await(); } catch (SerializationException $exception) { // Serializing the result failed. Send the reason why. - $channel->send(new ExitFailure($exception)); + $channel->send(new ExitFailure($exception))->await(); } } catch (\Throwable $exception) { - \trigger_error( - "Could not send result to parent; be sure to shutdown the child before ending the parent", - E_USER_ERROR - ); + \trigger_error(sprintf( + "Could not send result to parent: '%s'; be sure to shutdown the child before ending the parent", + $exception->getMessage(), + ), E_USER_ERROR); return 1; } finally { $channel->close(); @@ -355,7 +355,7 @@ final class Parallel implements Context /** * {@inheritdoc} */ - public function send(mixed $data): void + public function send(mixed $data): Future { if ($this->channel === null) { throw new StatusError('The thread has not been started or has already finished.'); @@ -365,9 +365,7 @@ final class Parallel implements Context throw new \Error('Cannot send exit result objects.'); } - try { - $this->channel->send($data); - } catch (ChannelException $e) { + return $this->channel->send($data)->catch(function (\Throwable $e): mixed { if ($this->channel === null) { throw new ContextException( "The thread stopped responding, potentially due to a fatal error or calling exit", @@ -391,7 +389,7 @@ final class Parallel implements Context 'Thread unexpectedly exited with result of type: %s', \is_object($data) ? \get_class($data) : \gettype($data) ), 0, $e); - } + }); } /** diff --git a/lib/Context/Process.php b/lib/Context/Process.php index 1fd10d0..b7eb548 100644 --- a/lib/Context/Process.php +++ b/lib/Context/Process.php @@ -4,6 +4,7 @@ namespace Amp\Parallel\Context; use Amp\CancellationToken; use Amp\CancelledException; +use Amp\Future; use Amp\Parallel\Sync\ChannelException; use Amp\Parallel\Sync\ChannelledSocket; use Amp\Parallel\Sync\ExitResult; @@ -232,7 +233,7 @@ final class Process implements Context /** * {@inheritdoc} */ - public function send(mixed $data): void + public function send(mixed $data): Future { if ($this->channel === null) { throw new StatusError("The process has not been started"); @@ -242,16 +243,14 @@ final class Process implements Context throw new \Error("Cannot send exit result objects"); } - try { - $this->channel->send($data); - } catch (ChannelException $e) { + return $this->channel->send($data)->catch(function (\Throwable $e): mixed { if ($this->channel === null) { throw new ContextException("The process stopped responding, potentially due to a fatal error or calling exit", 0, $e); } try { $data = launch(fn () => $this->join())->await(new TimeoutCancellationToken(0.1)); - } catch (ContextException | ChannelException | CancelledException $ignored) { + } catch (ContextException | ChannelException | CancelledException) { if ($this->isRunning()) { $this->kill(); } @@ -262,7 +261,7 @@ final class Process implements Context 'Process unexpectedly exited with result of type: %s', \is_object($data) ? \get_class($data) : \gettype($data) ), 0, $e); - } + }); } /** diff --git a/lib/Sync/Channel.php b/lib/Sync/Channel.php index f7e724c..e2e2581 100644 --- a/lib/Sync/Channel.php +++ b/lib/Sync/Channel.php @@ -3,6 +3,7 @@ namespace Amp\Parallel\Sync; use Amp\CancellationToken; +use Amp\Future; use Amp\Parallel\Context\StatusError; /** @@ -27,6 +28,8 @@ interface Channel /** * @param mixed $data * + * @return Future Resolved with the data has been successfully written to the channel. + * * @throws StatusError Thrown if the context has not been started. * @throws SynchronizationError If the context has not been started or the context * unexpectedly ends. @@ -34,5 +37,5 @@ interface Channel * @throws \Error If an ExitResult object is given. * @throws SerializationException If serializing the data fails. */ - public function send(mixed $data): void; + public function send(mixed $data): Future; } diff --git a/lib/Sync/ChannelledSocket.php b/lib/Sync/ChannelledSocket.php index dc75ebc..d26b87a 100644 --- a/lib/Sync/ChannelledSocket.php +++ b/lib/Sync/ChannelledSocket.php @@ -5,6 +5,7 @@ namespace Amp\Parallel\Sync; use Amp\ByteStream\ResourceInputStream; use Amp\ByteStream\ResourceOutputStream; use Amp\CancellationToken; +use Amp\Future; use Amp\Serialization\Serializer; final class ChannelledSocket implements Channel @@ -42,9 +43,9 @@ final class ChannelledSocket implements Channel /** * {@inheritdoc} */ - public function send(mixed $data): void + public function send(mixed $data): Future { - $this->channel->send($data); + return $this->channel->send($data); } public function unreference(): void diff --git a/lib/Sync/ChannelledStream.php b/lib/Sync/ChannelledStream.php index 46cf7b8..852ced2 100644 --- a/lib/Sync/ChannelledStream.php +++ b/lib/Sync/ChannelledStream.php @@ -6,6 +6,7 @@ use Amp\ByteStream\InputStream; use Amp\ByteStream\OutputStream; use Amp\ByteStream\StreamException; use Amp\CancellationToken; +use Amp\Future; use Amp\Serialization\Serializer; /** @@ -26,8 +27,8 @@ final class ChannelledStream implements Channel /** * Creates a new channel from the given stream objects. Note that $read and $write can be the same object. * - * @param InputStream $read - * @param OutputStream $write + * @param InputStream $read + * @param OutputStream $write * @param Serializer|null $serializer */ public function __construct(InputStream $read, OutputStream $write, ?Serializer $serializer = null) @@ -41,13 +42,12 @@ final class ChannelledStream implements Channel /** * {@inheritdoc} */ - public function send(mixed $data): void + public function send(mixed $data): Future { - try { - $this->write->write($this->parser->encode($data))->await(); - } catch (StreamException $exception) { - throw new ChannelException("Sending on the channel failed. Did the context die?", 0, $exception); - } + return $this->write->write($this->parser->encode($data)) + ->catch(static function (\Throwable $exception): mixed { + throw new ChannelException("Sending on the channel failed. Did the context die?", 0, $exception); + }); } /** diff --git a/lib/Worker/Internal/WorkerProcess.php b/lib/Worker/Internal/WorkerProcess.php index 9f5e165..38b3bf3 100644 --- a/lib/Worker/Internal/WorkerProcess.php +++ b/lib/Worker/Internal/WorkerProcess.php @@ -4,6 +4,7 @@ namespace Amp\Parallel\Worker\Internal; use Amp\ByteStream; use Amp\CancellationToken; +use Amp\Future; use Amp\Parallel\Context\Context; use Amp\Parallel\Context\ContextException; use Amp\Parallel\Context\Process; @@ -25,9 +26,9 @@ class WorkerProcess implements Context return $this->process->receive($token); } - public function send($data): void + public function send($data): Future { - $this->process->send($data); + return $this->process->send($data); } public function isRunning(): bool diff --git a/lib/Worker/TaskRunner.php b/lib/Worker/TaskRunner.php index 2d54dd1..3740516 100644 --- a/lib/Worker/TaskRunner.php +++ b/lib/Worker/TaskRunner.php @@ -54,10 +54,10 @@ final class TaskRunner } try { - $this->channel->send($result); + $this->channel->send($result)->await(); } catch (SerializationException $exception) { // Could not serialize task result. - $this->channel->send(new Internal\TaskFailure($id, $exception)); + $this->channel->send(new Internal\TaskFailure($id, $exception))->await(); } }); continue; diff --git a/lib/Worker/TaskWorker.php b/lib/Worker/TaskWorker.php index 474ea4a..af29ba7 100644 --- a/lib/Worker/TaskWorker.php +++ b/lib/Worker/TaskWorker.php @@ -11,7 +11,6 @@ use Amp\Parallel\Context\StatusError; use Amp\Parallel\Sync\ChannelException; use Amp\Serialization\SerializationException; use Amp\TimeoutCancellationToken; -use Revolt\EventLoop; use function Amp\launch; /** @@ -133,27 +132,12 @@ abstract class TaskWorker implements Worker $future = $deferred->getFuture(); try { - $this->context->send($job); + $this->context->send($job)->await(); if ($token) { $context = $this->context; - $cancellationId = $token->subscribe(static function () use ($jobId, $context): void { - try { - $context->send($jobId); - } catch (\Throwable) { - return; - } - }); - - EventLoop::queue(static function () use ($future, $token, $cancellationId): void { - try { - $future->await(); - } catch (\Throwable) { - // Ignored. - } finally { - $token->unsubscribe($cancellationId); - } - }); + $cancellationId = $token->subscribe(static fn () => $context->send($jobId)->ignore()); + $future->finally(fn () => $token->unsubscribe($cancellationId))->ignore(); } } catch (SerializationException $exception) { // Could not serialize Task object. @@ -203,7 +187,7 @@ abstract class TaskWorker implements Worker // Wait for pending tasks to finish. Future\settle(\array_map(fn (Deferred $deferred) => $deferred->getFuture(), $this->jobQueue)); - $this->context->send(null); + $this->context->send(null)->await(); try { return launch(fn () => $this->context->join()) diff --git a/test/Context/AbstractContextTest.php b/test/Context/AbstractContextTest.php index 5dd9aa2..3ef7f82 100644 --- a/test/Context/AbstractContextTest.php +++ b/test/Context/AbstractContextTest.php @@ -51,7 +51,7 @@ abstract class AbstractContextTest extends AsyncTestCase $context = $this->createContext(__DIR__ . "/Fixtures/throwing-process.php"); $context->start(); delay(0.1); - $context->send(1); + $context->send(1)->await(); } public function testInvalidScriptPath(): void @@ -159,6 +159,6 @@ abstract class AbstractContextTest extends AsyncTestCase $context = $this->createContext(__DIR__ . "/Fixtures/exiting-process.php"); $context->start(); delay(0.5); - $context->send(1); + $context->send(1)->await(); } } diff --git a/test/Sync/ChannelledSocketTest.php b/test/Sync/ChannelledSocketTest.php index 0f4339a..9e25933 100644 --- a/test/Sync/ChannelledSocketTest.php +++ b/test/Sync/ChannelledSocketTest.php @@ -20,7 +20,7 @@ class ChannelledSocketTest extends AsyncTestCase $message = 'hello'; - EventLoop::queue(fn () => $a->send($message)); + $a->send($message); $data = $b->receive(); self::assertSame($message, $data); } @@ -40,7 +40,7 @@ class ChannelledSocketTest extends AsyncTestCase $message .= \chr(\mt_rand(0, 255)); } - EventLoop::queue(fn () => $a->send($message)); + $a->send($message); $data = $b->receive(); self::assertSame($message, $data); } @@ -72,8 +72,7 @@ class ChannelledSocketTest extends AsyncTestCase $b = new ChannelledSocket($right, $right); // Close $a. $b should close on next read... - $a->send(function () { - }); + $a->send(fn () => null)->await(); $data = $b->receive(); } @@ -88,7 +87,7 @@ class ChannelledSocketTest extends AsyncTestCase $a = new ChannelledSocket($left, $left); $a->close(); - $a->send('hello'); + $a->send('hello')->await(); } /** @@ -118,7 +117,7 @@ class ChannelledSocketTest extends AsyncTestCase } $data = 'test'; - $b->send($data); + $b->send($data)->await(); self::assertSame($data, $a->receive()); } @@ -128,10 +127,10 @@ class ChannelledSocketTest extends AsyncTestCase protected function createSockets(): array { if (($sockets = @\stream_socket_pair( - \stripos(PHP_OS, "win") === 0 ? STREAM_PF_INET : STREAM_PF_UNIX, - STREAM_SOCK_STREAM, - STREAM_IPPROTO_IP - )) === false) { + \stripos(PHP_OS, "win") === 0 ? STREAM_PF_INET : STREAM_PF_UNIX, + STREAM_SOCK_STREAM, + STREAM_IPPROTO_IP + )) === false) { $message = "Failed to create socket pair"; if ($error = \error_get_last()) { $message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]); diff --git a/test/Sync/ChannelledStreamTest.php b/test/Sync/ChannelledStreamTest.php index 90e4d3c..fd7fff2 100644 --- a/test/Sync/ChannelledStreamTest.php +++ b/test/Sync/ChannelledStreamTest.php @@ -22,7 +22,7 @@ class ChannelledStreamTest extends AsyncTestCase $message = 'hello'; - $a->send($message); + $a->send($message)->await(); $data = $b->receive(); $this->assertSame($message, $data); } @@ -42,7 +42,7 @@ class ChannelledStreamTest extends AsyncTestCase $message .= \chr(\mt_rand(0, 255)); } - $a->send($message); + $a->send($message)->await(); $data = $b->receive(); $this->assertSame($message, $data); } @@ -75,8 +75,7 @@ class ChannelledStreamTest extends AsyncTestCase $b = new ChannelledStream($mock, $mock); // Close $a. $b should close on next read... - $a->send(function () { - }); + $a->send(fn () => null)->await(); $data = $b->receive(); } @@ -90,7 +89,7 @@ class ChannelledStreamTest extends AsyncTestCase $mock = $this->createMock(OutputStream::class); $mock->expects($this->once()) ->method('write') - ->will($this->throwException(new StreamException)); + ->willReturn(Future::error(new StreamException)); $a = new ChannelledStream($this->createMock(InputStream::class), $mock); $b = new ChannelledStream( @@ -98,7 +97,7 @@ class ChannelledStreamTest extends AsyncTestCase $this->createMock(OutputStream::class) ); - $a->send('hello'); + $a->send('hello')->await(); } /**