createWorker(); self::assertTrue($worker->submit(new Fixtures\ConstantTask)->await()); $worker->shutdown(); } public function testIsRunning(): void { $worker = $this->createWorker(); self::assertTrue($worker->isRunning()); $worker->shutdown(); self::assertFalse($worker->isRunning()); } public function testIsIdleOnStart(): void { $worker = $this->createWorker(); self::assertTrue($worker->isIdle()); $worker->shutdown(); } public function testSubmitShouldThrowStatusError(): void { $this->expectException(StatusError::class); $this->expectExceptionMessage('shut down'); $worker = $this->createWorker(); self::assertTrue($worker->isIdle()); $worker->shutdown(); $worker->submit(new Fixtures\TestTask(42)); } public function testSubmit(): void { $worker = $this->createWorker(); $returnValue = $worker->submit(new Fixtures\TestTask(42))->await(); self::assertEquals(42, $returnValue); $worker->shutdown(); } public function testSubmitMultipleSynchronous(): void { $worker = $this->createWorker(); $futures = [ $worker->submit(new Fixtures\TestTask(42))->getFuture(), $worker->submit(new Fixtures\TestTask(56))->getFuture(), $worker->submit(new Fixtures\TestTask(72))->getFuture(), ]; self::assertEquals([42, 56, 72], Future\await($futures)); $worker->shutdown(); } public function testSubmitMultipleAsynchronous(): void { $this->setTimeout(5); $worker = $this->createWorker(); $futures = [ $worker->submit(new Fixtures\TestTask(42, 0.2))->getFuture(), $worker->submit(new Fixtures\TestTask(56, 0.3))->getFuture(), $worker->submit(new Fixtures\TestTask(72, 0.1))->getFuture(), ]; self::assertEquals([2 => 72, 0 => 42, 1 => 56], Future\await($futures)); $worker->shutdown(); } public function testSubmitMultipleThenShutdown(): void { $this->setTimeout(5); $worker = $this->createWorker(); $futures = [ $worker->submit(new Fixtures\TestTask(42, 0.2))->getFuture(), $worker->submit(new Fixtures\TestTask(56, 0.3))->getFuture(), $worker->submit(new Fixtures\TestTask(72, 0.1))->getFuture(), ]; // Send shutdown signal, but don't await until tasks have finished. $shutdown = async(fn () => $worker->shutdown()); self::assertEquals([2 => 72, 0 => 42, 1 => 56], Future\await($futures)); $shutdown->await(); // Await shutdown before ending test. } public function testNotIdleOnSubmit(): void { $worker = $this->createWorker(); $future = $worker->submit(new Fixtures\TestTask(42, 0.5))->getFuture(); delay(0.1); // Tick event loop to call Worker::submit() self::assertFalse($worker->isIdle()); $future->await(); $worker->shutdown(); } public function testKill(): void { $this->setTimeout(500); $worker = $this->createWorker(); $job = $worker->submit(new Fixtures\TestTask(42)); $job->getFuture()->ignore(); $worker->kill(); self::assertFalse($worker->isRunning()); } public function testFailingTaskWithException(): void { $worker = $this->createWorker(); try { $worker->submit(new Fixtures\FailingTask(\Exception::class))->await(); } catch (TaskFailureException $exception) { self::assertSame(\Exception::class, $exception->getOriginalClassName()); } $worker->shutdown(); } public function testFailingTaskWithError(): void { $worker = $this->createWorker(); try { $worker->submit(new Fixtures\FailingTask(\Error::class))->await(); } catch (TaskFailureError $exception) { self::assertSame(\Error::class, $exception->getOriginalClassName()); } $worker->shutdown(); } public function testFailingTaskWithPreviousException(): void { $worker = $this->createWorker(); try { $worker->submit(new Fixtures\FailingTask(\Error::class, \Exception::class))->await(); } catch (TaskFailureError $exception) { self::assertSame(\Error::class, $exception->getOriginalClassName()); $previous = $exception->getPrevious(); self::assertInstanceOf(TaskFailureException::class, $previous); self::assertSame(\Exception::class, $previous->getOriginalClassName()); } $worker->shutdown(); } public function testNonAutoloadableTask(): void { $worker = $this->createWorker(); try { $worker->submit(new NonAutoloadableTask)->await(); self::fail("Tasks that cannot be autoloaded should throw an exception"); } catch (TaskFailureError $exception) { self::assertSame("Error", $exception->getOriginalClassName()); self::assertGreaterThan( 0, \strpos($exception->getMessage(), \sprintf("Classes implementing %s", Task::class)) ); } $worker->shutdown(); } public function testUnserializableTask(): void { $worker = $this->createWorker(); try { $worker->submit(new class implements Task { // Anonymous classes are not serializable. public function run(Channel $channel, Cancellation $cancellation): mixed { return null; } })->await(); self::fail("Tasks that cannot be serialized should throw an exception"); } catch (SerializationException $exception) { self::assertSame(0, \strpos($exception->getMessage(), "The given data could not be serialized")); } $worker->shutdown(); } public function testUnserializableResult(): void { $worker = $this->createWorker(); try { $worker->submit(new Fixtures\UnserializableResultTask)->await(); self::fail("Tasks results that cannot be serialized should throw an exception"); } catch (TaskFailureException $exception) { self::assertSame( 0, \strpos($exception->getMessage(), "Amp\Serialization\SerializationException thrown in context") ); } $worker->shutdown(); } public function testNonAutoloadableResult(): void { $worker = $this->createWorker(); try { $worker->submit(new Fixtures\NonAutoloadableResultTask)->await(); self::fail("Tasks results that cannot be autoloaded should throw an exception"); } catch (\Error $exception) { self::assertSame(0, \strpos( $exception->getMessage(), "Class instances returned from Amp\Parallel\Worker\Task::run() must be autoloadable by the Composer autoloader" )); } $worker->shutdown(); } public function testUnserializableTaskFollowedByValidTask(): void { $worker = $this->createWorker(); async(fn () => $worker->submit(new class implements Task { // Anonymous classes are not serializable. public function run(Channel $channel, Cancellation $cancellation): mixed { return null; } }))->ignore(); $future = $worker->submit(new Fixtures\TestTask(42))->getFuture(); self::assertSame(42, $future->await()); $worker->shutdown(); } public function testCustomAutoloader(): void { $worker = $this->createWorker(autoloadPath: __DIR__ . '/Fixtures/custom-bootstrap.php'); self::assertTrue($worker->submit(new Fixtures\AutoloadTestTask)->await()); $worker->shutdown(); } public function testInvalidCustomAutoloader(): void { $this->expectException(\Error::class); $this->expectExceptionMessage('No file found at bootstrap path given'); $worker = $this->createWorker(autoloadPath: __DIR__ . '/Fixtures/not-found.php'); $worker->submit(new Fixtures\AutoloadTestTask)->await(); $worker->shutdown(); } public function testCancellableTask(): void { $this->expectException(TaskCancelledException::class); $worker = $this->createWorker(); try { $worker->submit(new Fixtures\CancellingTask, new TimeoutCancellation(0.5))->await(); } finally { $worker->shutdown(); } } public function testSubmitAfterCancelledTask(): void { $worker = $this->createWorker(); try { $worker->submit(new Fixtures\CancellingTask, new TimeoutCancellation(0.5))->await(); self::fail(TaskCancelledException::class . ' did not fail submit future'); } catch (TaskCancelledException $exception) { // Task should be cancelled, ignore this exception. } self::assertTrue($worker->submit(new Fixtures\ConstantTask)->await()); $worker->shutdown(); } public function testCancelBeforeSubmit(): void { $this->expectException(CancelledException::class); $worker = $this->createWorker(); $deferredCancellation = new DeferredCancellation(); $deferredCancellation->cancel(); try { $worker->submit(new Fixtures\CancellingTask, $deferredCancellation->getCancellation())->await(); } finally { $worker->shutdown(); } } public function testCancellingCompletedTask(): void { $worker = $this->createWorker(); self::assertTrue($worker->submit( new Fixtures\ConstantTask(), new TimeoutCancellation(0.5), )->await()); $worker->shutdown(); } public function testCommunicatingJob(): void { $worker = $this->createWorker(); $cancellation = new TimeoutCancellation(1); $execution = $worker->submit(new CommunicatingTask(), $cancellation); $channel = $execution->getChannel(); self::assertSame('test', $channel->receive($cancellation)); $channel->send('out'); self::assertSame('out', $execution->await($cancellation)); } abstract protected function createWorker(?string $autoloadPath = null): Worker; }