mirror of
https://github.com/amphp/parallel.git
synced 2025-02-22 22:02:33 +01:00
Various improvements for worker pools
This commit is contained in:
parent
725d94f379
commit
95cbd8a365
@ -10,13 +10,12 @@ use Icicle\Promise;
|
||||
|
||||
Coroutine\create(function () {
|
||||
$returnValues = (yield Promise\all([
|
||||
new Coroutine\Coroutine(Worker\enqueue(new HelloTask())),
|
||||
new Coroutine\Coroutine(Worker\enqueue(new HelloTask())),
|
||||
new Coroutine\Coroutine(Worker\enqueue(new HelloTask())),
|
||||
Worker\enqueue(new HelloTask()),
|
||||
Worker\enqueue(new HelloTask()),
|
||||
Worker\enqueue(new HelloTask()),
|
||||
]));
|
||||
var_dump($returnValues);
|
||||
|
||||
yield Worker\pool()->shutdown();
|
||||
var_dump($returnValues);
|
||||
})->done();
|
||||
|
||||
Loop\run();
|
||||
|
@ -2,6 +2,7 @@
|
||||
namespace Icicle\Concurrent\Worker;
|
||||
|
||||
use Icicle\Concurrent\Exception\InvalidArgumentError;
|
||||
use Icicle\Concurrent\Exception\SynchronizationError;
|
||||
use Icicle\Coroutine\Coroutine;
|
||||
use Icicle\Promise;
|
||||
use Icicle\Promise\Deferred;
|
||||
@ -14,8 +15,23 @@ use Icicle\Promise\PromiseInterface;
|
||||
* tasks simultaneously. The load on each worker is balanced such that tasks
|
||||
* are completed as soon as possible and workers are used efficiently.
|
||||
*/
|
||||
class WorkerPool
|
||||
class Pool implements WorkerInterface
|
||||
{
|
||||
/**
|
||||
* @var int The default minimum pool size.
|
||||
*/
|
||||
const DEFAULT_MIN_SIZE = 8;
|
||||
|
||||
/**
|
||||
* @var int The default maximum pool size.
|
||||
*/
|
||||
const DEFAULT_MAX_SIZE = 32;
|
||||
|
||||
/**
|
||||
* @var bool Indicates if the pool is currently running.
|
||||
*/
|
||||
private $running = false;
|
||||
|
||||
/**
|
||||
* @var int The minimum number of workers the pool should spawn.
|
||||
*/
|
||||
@ -46,49 +62,61 @@ class WorkerPool
|
||||
*/
|
||||
private $busyQueue;
|
||||
|
||||
/**
|
||||
* @var \SplQueue A queue of deferred to be fulfilled for waiting tasks.
|
||||
*/
|
||||
private $deferredQueue;
|
||||
|
||||
/**
|
||||
* Creates a new worker pool.
|
||||
*
|
||||
* @param int $minSize The minimum number of workers the pool should spawn.
|
||||
* @param int $maxSize The maximum number of workers the pool should spawn.
|
||||
* @param WorkerFactoryInterface $factory A worker factory to be used to create new workers.
|
||||
* @param int|null $minSize The minimum number of workers the pool should spawn. Defaults to
|
||||
* `Pool::DEFAULT_MIN_SIZE`.
|
||||
* @param int|null $maxSize The maximum number of workers the pool should spawn. Defaults to
|
||||
* `Pool::DEFAULT_MAX_SIZE`.
|
||||
* @param WorkerFactoryInterface|null $factory A worker factory to be used to create new workers.
|
||||
*/
|
||||
public function __construct($minSize, $maxSize = null, WorkerFactoryInterface $factory = null)
|
||||
public function __construct($minSize = null, $maxSize = null, WorkerFactoryInterface $factory = null)
|
||||
{
|
||||
$minSize = $minSize ?: static::DEFAULT_MIN_SIZE;
|
||||
$maxSize = $minSize ?: static::DEFAULT_MAX_SIZE;
|
||||
|
||||
if (!is_int($minSize) || $minSize < 0) {
|
||||
throw new InvalidArgumentError('Minimum size must be a non-negative integer.');
|
||||
}
|
||||
$this->minSize = $minSize;
|
||||
|
||||
if ($maxSize === null) {
|
||||
$this->maxSize = $minSize;
|
||||
} elseif (!is_int($maxSize) || $maxSize < 0) {
|
||||
throw new InvalidArgumentError('Maximum size must be a non-negative integer.');
|
||||
} else {
|
||||
$this->maxSize = $maxSize;
|
||||
if (!is_int($maxSize) || $maxSize < 0 || $maxSize < $minSize) {
|
||||
throw new InvalidArgumentError('Maximum size must be a non-negative integer at least '.$minSize.'.');
|
||||
}
|
||||
|
||||
$this->maxSize = $maxSize;
|
||||
$this->minSize = $minSize;
|
||||
|
||||
// Create the default factory if none is given.
|
||||
$this->factory = $factory ?: new WorkerFactory();
|
||||
|
||||
$this->workers = new \SplObjectStorage();
|
||||
$this->idleWorkers = new \SplObjectStorage();
|
||||
$this->busyQueue = new \SplQueue();
|
||||
$this->deferredQueue = new \SplQueue();
|
||||
|
||||
// Start up the pool with the minimum number of workers.
|
||||
while (--$minSize >= 0) {
|
||||
$this->createWorker();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the minimum number of workers the worker pool may have idle.
|
||||
* Checks if the pool is running.
|
||||
*
|
||||
* @return bool True if the pool is running, otherwise false.
|
||||
*/
|
||||
public function isRunning()
|
||||
{
|
||||
return $this->running;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the pool has any idle workers.
|
||||
*
|
||||
* @return bool True if the pool has at least one idle worker, otherwise false.
|
||||
*/
|
||||
public function isIdle()
|
||||
{
|
||||
return $this->idleWorkers->count() > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the minimum number of workers the pool may have idle.
|
||||
*
|
||||
* @return int The minimum number of workers.
|
||||
*/
|
||||
@ -98,7 +126,7 @@ class WorkerPool
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the maximum number of workers the worker pool may spawn to handle concurrent tasks.
|
||||
* Gets the maximum number of workers the pool may spawn to handle concurrent tasks.
|
||||
*
|
||||
* @return int The maximum number of workers.
|
||||
*/
|
||||
@ -127,6 +155,23 @@ class WorkerPool
|
||||
return $this->idleWorkers->count();
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the worker pool execution.
|
||||
*
|
||||
* When the worker pool starts up, the minimum number of workers will be created. This adds some overhead to
|
||||
* starting the pool, but allows for greater performance during runtime.
|
||||
*/
|
||||
public function start()
|
||||
{
|
||||
// Start up the pool with the minimum number of workers.
|
||||
$count = $this->minSize;
|
||||
while (--$count >= 0) {
|
||||
$this->createWorker();
|
||||
}
|
||||
|
||||
$this->running = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Enqueues a task to be executed by the worker pool.
|
||||
*
|
||||
@ -138,11 +183,17 @@ class WorkerPool
|
||||
*
|
||||
* @resolve mixed The return value of the task.
|
||||
*/
|
||||
public function enqueue(TaskInterface $task)
|
||||
public function enqueue(TaskInterface $task /* , ...$args */)
|
||||
{
|
||||
if (!$this->running) {
|
||||
throw new SynchronizationError('The worker pool has not been started.');
|
||||
}
|
||||
|
||||
$args = array_slice(func_get_args(), 1);
|
||||
|
||||
// Enqueue the task if we have an idle worker.
|
||||
if ($worker = $this->getIdleWorker()) {
|
||||
yield $this->enqueueToWorker($task, $worker);
|
||||
yield $this->enqueueToWorker($worker, $task, $args);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -172,6 +223,30 @@ class WorkerPool
|
||||
}
|
||||
|
||||
yield Promise\all($shutdowns);
|
||||
$this->running = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Kills all workers in the pool and halts the worker pool.
|
||||
*/
|
||||
public function kill()
|
||||
{
|
||||
foreach ($this->workers as $worker) {
|
||||
$worker->kill();
|
||||
}
|
||||
|
||||
$this->running = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Shuts down the pool when it is destroyed.
|
||||
*/
|
||||
public function __destruct()
|
||||
{
|
||||
if ($this->isRunning()) {
|
||||
$coroutine = new Coroutine($this->shutdown());
|
||||
$coroutine->done();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -216,17 +291,18 @@ class WorkerPool
|
||||
*
|
||||
* @coroutine
|
||||
*
|
||||
* @param TaskInterface $task The task to enqueue.
|
||||
* @param WorkerInterface $worker The worker to enqueue to.
|
||||
* @param TaskInterface $task The task to enqueue.
|
||||
* @param array $args An array of arguments to pass to the task.
|
||||
*
|
||||
* @return \Generator
|
||||
*
|
||||
* @resolve mixed The return value of the task.
|
||||
*/
|
||||
private function enqueueToWorker(TaskInterface $task, WorkerInterface $worker)
|
||||
private function enqueueToWorker(WorkerInterface $worker, TaskInterface $task, array $args = [])
|
||||
{
|
||||
$this->idleWorkers->detach($worker);
|
||||
yield $worker->enqueue($task);
|
||||
yield call_user_func_array([$worker, 'enqueue'], array_merge([$task], $args));
|
||||
$this->idleWorkers->attach($worker);
|
||||
|
||||
// Spawn a new coroutine to process the busy queue if not empty.
|
||||
@ -256,7 +332,7 @@ class WorkerPool
|
||||
$deferred = $this->deferredQueue->dequeue();
|
||||
|
||||
try {
|
||||
$returnValue = (yield $this->enqueueToWorker($task, $worker));
|
||||
$returnValue = (yield $this->enqueueToWorker($worker, $task));
|
||||
$deferred->resolve($returnValue);
|
||||
} catch (\Exception $exception) {
|
||||
$deferred->reject($exception);
|
@ -4,6 +4,7 @@ namespace Icicle\Concurrent\Worker;
|
||||
use Icicle\Concurrent\ContextInterface;
|
||||
use Icicle\Concurrent\Exception\SynchronizationError;
|
||||
use Icicle\Concurrent\Worker\Internal\TaskFailure;
|
||||
use Icicle\Coroutine\Coroutine;
|
||||
|
||||
class Worker implements WorkerInterface
|
||||
{
|
||||
@ -49,31 +50,13 @@ class Worker implements WorkerInterface
|
||||
$this->context->start();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function kill()
|
||||
{
|
||||
$this->context->kill();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function shutdown()
|
||||
{
|
||||
yield $this->context->send([null, []]);
|
||||
|
||||
yield $this->context->join();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function enqueue(TaskInterface $task /* , ...$args */)
|
||||
{
|
||||
if (!$this->context->isRunning()) {
|
||||
throw new SynchronizationError('Worker has not been started.');
|
||||
throw new SynchronizationError('The worker has not been started.');
|
||||
}
|
||||
|
||||
$args = array_slice(func_get_args(), 1);
|
||||
@ -92,4 +75,33 @@ class Worker implements WorkerInterface
|
||||
|
||||
yield $result;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function shutdown()
|
||||
{
|
||||
yield $this->context->send([null, []]);
|
||||
|
||||
yield $this->context->join();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function kill()
|
||||
{
|
||||
$this->context->kill();
|
||||
}
|
||||
|
||||
/**
|
||||
* Shuts down the worker when it is destroyed.
|
||||
*/
|
||||
public function __destruct()
|
||||
{
|
||||
if ($this->isRunning()) {
|
||||
$coroutine = new Coroutine($this->shutdown());
|
||||
$coroutine->done();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -7,9 +7,9 @@ namespace Icicle\Concurrent\Worker;
|
||||
interface WorkerInterface
|
||||
{
|
||||
/**
|
||||
* Checks if the context is running.
|
||||
* Checks if the worker is running.
|
||||
*
|
||||
* @return bool True if the context is running, otherwise false.
|
||||
* @return bool True if the worker is running, otherwise false.
|
||||
*/
|
||||
public function isRunning();
|
||||
|
||||
@ -25,20 +25,6 @@ interface WorkerInterface
|
||||
*/
|
||||
public function start();
|
||||
|
||||
/**
|
||||
* Immediately kills the context.
|
||||
*/
|
||||
public function kill();
|
||||
|
||||
/**
|
||||
* @coroutine
|
||||
*
|
||||
* @return \Generator
|
||||
*
|
||||
* @resolve int Exit code.
|
||||
*/
|
||||
public function shutdown();
|
||||
|
||||
/**
|
||||
* @coroutine
|
||||
*
|
||||
@ -51,4 +37,18 @@ interface WorkerInterface
|
||||
* @resolve mixed Task return value.
|
||||
*/
|
||||
public function enqueue(TaskInterface $task);
|
||||
|
||||
/**
|
||||
* @coroutine
|
||||
*
|
||||
* @return \Generator
|
||||
*
|
||||
* @resolve int Exit code.
|
||||
*/
|
||||
public function shutdown();
|
||||
|
||||
/**
|
||||
* Immediately kills the context.
|
||||
*/
|
||||
public function kill();
|
||||
}
|
||||
|
@ -1,9 +1,11 @@
|
||||
<?php
|
||||
namespace Icicle\Concurrent\Worker;
|
||||
|
||||
use Icicle\Coroutine\Coroutine;
|
||||
|
||||
if (!function_exists(__NAMESPACE__ . '\pool')) {
|
||||
/**
|
||||
* Returns the default worker pool for the current context.
|
||||
* Returns the global worker pool for the current context.
|
||||
*
|
||||
* If the pool has not been initialized, a minimum and maximum size can be given to create the pool with.
|
||||
*
|
||||
@ -11,18 +13,15 @@ if (!function_exists(__NAMESPACE__ . '\pool')) {
|
||||
* @param int|null $maxSize The maximum number of workers the pool should spawn.
|
||||
* @param WorkerFactoryInterface|null $factory A worker factory to be used to create new workers.
|
||||
*
|
||||
* @return WorkerPool
|
||||
* @return Pool The global worker pool instance.
|
||||
*/
|
||||
function pool($minSize = null, $maxSize = null, WorkerFactoryInterface $factory = null)
|
||||
{
|
||||
static $instance;
|
||||
|
||||
if (null === $instance) {
|
||||
if (null !== $minSize) {
|
||||
$instance = new WorkerPool($minSize, $maxSize, $factory);
|
||||
} else {
|
||||
$instance = new WorkerPool(8, 32);
|
||||
}
|
||||
$instance = new Pool($minSize, $maxSize, $factory);
|
||||
$instance->start();
|
||||
}
|
||||
|
||||
return $instance;
|
||||
@ -31,16 +30,14 @@ if (!function_exists(__NAMESPACE__ . '\pool')) {
|
||||
/**
|
||||
* Enqueues a task to be executed by the worker pool.
|
||||
*
|
||||
* @coroutine
|
||||
*
|
||||
* @param TaskInterface $task The task to enqueue.
|
||||
*
|
||||
* @return \Generator
|
||||
* @return \Icicle\Promise\PromiseInterface
|
||||
*
|
||||
* @resolve mixed The return value of the task.
|
||||
*/
|
||||
function enqueue(TaskInterface $task)
|
||||
{
|
||||
return pool()->enqueue($task);
|
||||
return new Coroutine(pool()->enqueue($task));
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user