Remove cache from Task::run()

This commit is contained in:
Aaron Piotrowski 2023-02-06 17:41:20 -06:00
parent d8815ea03e
commit b19d87fc50
No known key found for this signature in database
GPG Key ID: 5B456E6AABA44A63
15 changed files with 43 additions and 45 deletions

View File

@ -80,7 +80,6 @@ In the example below, a `Task` is defined which calls a blocking function (`file
Child processes executing tasks may be reused to execute multiple tasks.
```php
use Amp\Cache\AtomicCache;
use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;
@ -92,11 +91,8 @@ class FetchTask implements Task
) {
}
public function run(
Channel $channel,
AtomicCache $cache,
Cancellation $cancellation,
): string {
public function run(Channel $channel, Cancellation $cancellation): string
{
return file_get_contents($url); // Example blocking function
}
}
@ -118,7 +114,32 @@ $data = $future->await();
#### Sharing data between tasks
`Task::run()` is provided an `AtomicCache` object which is shared amongst all tasks executed on a single child process, including across different `Task` implementations. This storage can be used to access and store data between task executions and reuse resources initiated by prior tasks, such as a database connection or open file handle. A worker may be executing multiple tasks, so consider using the methods of `AtomicCache` which guarantee mutual exclusion when creating or updating cache values if a task uses async I/O to generate a cache value.
Tasks may wish to share data between tasks runs. A `Cache` instance stored in a static property that is only initialized within `Task::run()` is our recommended strategy to share data.
```php
use Amp\Cache\LocalCache;
use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;
final class ExampleTask implements Task
{
private static ?LocalCache $cache = null;
public function run(Channel $channel, Cancellation $cancellation): mixed
{
$cache = self::$cache ??= new LocalCache();
$cachedValue = $cache->get('cache-key');
// Use and modify $cachedValue...
$cache->set('cache-key', $updatedValue);
return $updatedValue;
}
}
```
You may wish to provide a hook to initialize the cache with mock data for testing.
A worker may be executing multiple tasks, so consider using `AtomicCache` instead of `LocalCache` when creating or updating cache values if a task uses async I/O to generate a cache value. `AtomicCache` has methods which provide mutual exclusion based on a cache key.
#### Task cancellation

View File

@ -28,7 +28,6 @@
"php": ">=8.1",
"amphp/amp": "^3",
"amphp/byte-stream": "^2",
"amphp/cache": "^2",
"amphp/parser": "^1",
"amphp/pipeline": "^1",
"amphp/process": "^2",

View File

@ -2,11 +2,8 @@
namespace Amp\Parallel\Worker\Internal;
use Amp\Cache\AtomicCache;
use Amp\Cache\LocalCache;
use Amp\Parallel\Worker;
use Amp\Sync\Channel;
use Amp\Sync\LocalKeyedMutex;
return static function (Channel $channel) use ($argc, $argv): int {
if (!\defined("AMP_WORKER")) {
@ -25,9 +22,7 @@ return static function (Channel $channel) use ($argc, $argv): int {
})();
}
$cache = new AtomicCache(new LocalCache(), new LocalKeyedMutex());
Worker\runTasks($channel, $cache);
Worker\runTasks($channel);
return 0;
};

View File

@ -2,7 +2,6 @@
namespace Amp\Parallel\Worker;
use Amp\Cache\AtomicCache;
use Amp\Cancellation;
use Amp\Sync\Channel;
@ -20,10 +19,9 @@ interface Task
* Executed when running the Task in a worker.
*
* @param Channel<TReceive, TSend> $channel Communication channel to parent process.
* @param AtomicCache $cache AtomicCache instance shared between all Tasks executed on the Worker.
* @param Cancellation $cancellation Tasks may safely ignore this parameter if they are not cancellable.
*
* @return TResult A specific type can (and should) be declared in implementing classes.
*/
public function run(Channel $channel, AtomicCache $cache, Cancellation $cancellation): mixed;
public function run(Channel $channel, Cancellation $cancellation): mixed;
}

View File

@ -2,7 +2,6 @@
namespace Amp\Parallel\Worker;
use Amp\Cache\AtomicCache;
use Amp\Cancellation;
use Amp\CancelledException;
use Amp\DeferredCancellation;
@ -87,7 +86,7 @@ function workerFactory(WorkerFactory $factory = null): WorkerFactory
/**
* Runs the tasks, receiving tasks from the parent and sending the result of those tasks.
*/
function runTasks(Channel $channel, AtomicCache $cache): void
function runTasks(Channel $channel): void
{
/** @var array<string, DeferredCancellation> $cancellationSources */
$cancellationSources = [];
@ -114,10 +113,9 @@ function runTasks(Channel $channel, AtomicCache $cache): void
$queue,
$jobChannel,
$channel,
$cache
): void {
try {
$result = $data->getTask()->run($jobChannel, $cache, $source->getCancellation());
$result = $data->getTask()->run($jobChannel, $source->getCancellation());
if ($result instanceof Future) {
$result = $result->await($source->getCancellation());

View File

@ -2,7 +2,6 @@
namespace Amp\Parallel\Test\Worker;
use Amp\Cache\AtomicCache;
use Amp\Cancellation;
use Amp\Future;
use Amp\Parallel\Context\ContextFactory;
@ -23,7 +22,7 @@ use function Amp\delay;
class NonAutoloadableTask implements Task
{
public function run(Channel $channel, AtomicCache $cache, Cancellation $cancellation): int
public function run(Channel $channel, Cancellation $cancellation): int
{
return 1;
}
@ -223,7 +222,7 @@ abstract class AbstractWorkerTest extends AsyncTestCase
try {
$worker->submit(new class implements Task { // Anonymous classes are not serializable.
public function run(Channel $channel, AtomicCache $cache, Cancellation $cancellation): mixed
public function run(Channel $channel, Cancellation $cancellation): mixed
{
return null;
}
@ -275,7 +274,7 @@ abstract class AbstractWorkerTest extends AsyncTestCase
$worker = $this->createWorker();
async(fn () => $worker->submit(new class implements Task { // Anonymous classes are not serializable.
public function run(Channel $channel, AtomicCache $cache, Cancellation $cancellation): mixed
public function run(Channel $channel, Cancellation $cancellation): mixed
{
return null;
}

View File

@ -2,14 +2,13 @@
namespace Amp\Parallel\Test\Worker\Fixtures;
use Amp\Cache\AtomicCache;
use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;
class AutoloadTestTask implements Task
{
public function run(Channel $channel, AtomicCache $cache, Cancellation $cancellation): bool
public function run(Channel $channel, Cancellation $cancellation): bool
{
return \class_exists('CustomAutoloadClass', true);
}

View File

@ -2,7 +2,6 @@
namespace Amp\Parallel\Test\Worker\Fixtures;
use Amp\Cache\AtomicCache;
use Amp\Cancellation;
use Amp\DeferredFuture;
use Amp\Future;
@ -11,7 +10,7 @@ use Amp\Sync\Channel;
class CancellingTask implements Task
{
public function run(Channel $channel, AtomicCache $cache, Cancellation $cancellation): Future
public function run(Channel $channel, Cancellation $cancellation): Future
{
$deferred = new DeferredFuture;
$cancellation->subscribe($deferred->error(...));

View File

@ -2,14 +2,13 @@
namespace Amp\Parallel\Test\Worker\Fixtures;
use Amp\Cache\AtomicCache;
use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;
class CommunicatingTask implements Task
{
public function run(Channel $channel, AtomicCache $cache, Cancellation $cancellation): string
public function run(Channel $channel, Cancellation $cancellation): string
{
$channel->send('test');
return $channel->receive($cancellation);

View File

@ -2,14 +2,13 @@
namespace Amp\Parallel\Test\Worker\Fixtures;
use Amp\Cache\AtomicCache;
use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;
class ConstantTask implements Task
{
public function run(Channel $channel, AtomicCache $cache, Cancellation $cancellation): bool
public function run(Channel $channel, Cancellation $cancellation): bool
{
return \defined("AMP_WORKER");
}

View File

@ -2,7 +2,6 @@
namespace Amp\Parallel\Test\Worker\Fixtures;
use Amp\Cache\AtomicCache;
use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;
@ -25,7 +24,7 @@ class FailingTask implements Task
*
*
*/
public function run(Channel $channel, AtomicCache $cache, Cancellation $cancellation): never
public function run(Channel $channel, Cancellation $cancellation): never
{
$previous = $this->previousExceptionType ? new $this->previousExceptionType : null;
throw new $this->exceptionType('Test', 0, $previous);

View File

@ -2,14 +2,13 @@
namespace Amp\Parallel\Test\Worker\Fixtures;
use Amp\Cache\AtomicCache;
use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;
class NonAutoloadableResultTask implements Task
{
public function run(Channel $channel, AtomicCache $cache, Cancellation $cancellation): NonAutoloadableClass
public function run(Channel $channel, Cancellation $cancellation): NonAutoloadableClass
{
require __DIR__ . "/non-autoloadable-class.php";
return new NonAutoloadableClass;

View File

@ -2,7 +2,6 @@
namespace Amp\Parallel\Test\Worker\Fixtures;
use Amp\Cache\AtomicCache;
use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;
@ -19,7 +18,7 @@ class TestTask implements Task
$this->delay = $delay;
}
public function run(Channel $channel, AtomicCache $cache, Cancellation $cancellation): mixed
public function run(Channel $channel, Cancellation $cancellation): mixed
{
if ($this->delay) {
delay($this->delay);

View File

@ -2,14 +2,13 @@
namespace Amp\Parallel\Test\Worker\Fixtures;
use Amp\Cache\AtomicCache;
use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;
class UnserializableResultTask implements Task
{
public function run(Channel $channel, AtomicCache $cache, Cancellation $cancellation): \Closure
public function run(Channel $channel, Cancellation $cancellation): \Closure
{
return function () {
// Anonymous functions are not serializable.

View File

@ -2,8 +2,6 @@
namespace Amp\Parallel\Test\Worker;
use Amp\Cache\AtomicCache;
use Amp\Cache\Cache;
use Amp\Cancellation;
use Amp\Future;
use Amp\Parallel\Worker;
@ -12,7 +10,6 @@ use Amp\Parallel\Worker\WorkerFactory;
use Amp\Parallel\Worker\WorkerPool;
use Amp\PHPUnit\AsyncTestCase;
use Amp\Sync\Channel;
use Amp\Sync\KeyedMutex;
function nonAutoloadableFunction(): void
{
@ -42,7 +39,6 @@ class FunctionsTest extends AsyncTestCase
$future = Future::complete($task->run(
$channel,
new AtomicCache($this->createMock(Cache::class), $this->createMock(KeyedMutex::class)),
$this->createMock(Cancellation::class),
));