diff --git a/examples/worker-pool.php b/examples/worker-pool.php index 6d236e2..e1a1779 100755 --- a/examples/worker-pool.php +++ b/examples/worker-pool.php @@ -10,13 +10,12 @@ use Icicle\Promise; Coroutine\create(function () { $returnValues = (yield Promise\all([ - new Coroutine\Coroutine(Worker\enqueue(new HelloTask())), - new Coroutine\Coroutine(Worker\enqueue(new HelloTask())), - new Coroutine\Coroutine(Worker\enqueue(new HelloTask())), + Worker\enqueue(new HelloTask()), + Worker\enqueue(new HelloTask()), + Worker\enqueue(new HelloTask()), ])); - var_dump($returnValues); - yield Worker\pool()->shutdown(); + var_dump($returnValues); })->done(); Loop\run(); diff --git a/src/Worker/WorkerPool.php b/src/Worker/Pool.php similarity index 65% rename from src/Worker/WorkerPool.php rename to src/Worker/Pool.php index 0d79ba3..e80ad10 100644 --- a/src/Worker/WorkerPool.php +++ b/src/Worker/Pool.php @@ -2,6 +2,7 @@ namespace Icicle\Concurrent\Worker; use Icicle\Concurrent\Exception\InvalidArgumentError; +use Icicle\Concurrent\Exception\SynchronizationError; use Icicle\Coroutine\Coroutine; use Icicle\Promise; use Icicle\Promise\Deferred; @@ -14,8 +15,23 @@ use Icicle\Promise\PromiseInterface; * tasks simultaneously. The load on each worker is balanced such that tasks * are completed as soon as possible and workers are used efficiently. */ -class WorkerPool +class Pool implements WorkerInterface { + /** + * @var int The default minimum pool size. + */ + const DEFAULT_MIN_SIZE = 8; + + /** + * @var int The default maximum pool size. + */ + const DEFAULT_MAX_SIZE = 32; + + /** + * @var bool Indicates if the pool is currently running. + */ + private $running = false; + /** * @var int The minimum number of workers the pool should spawn. */ @@ -46,49 +62,61 @@ class WorkerPool */ private $busyQueue; - /** - * @var \SplQueue A queue of deferred to be fulfilled for waiting tasks. - */ - private $deferredQueue; - /** * Creates a new worker pool. * - * @param int $minSize The minimum number of workers the pool should spawn. - * @param int $maxSize The maximum number of workers the pool should spawn. - * @param WorkerFactoryInterface $factory A worker factory to be used to create new workers. + * @param int|null $minSize The minimum number of workers the pool should spawn. Defaults to + * `Pool::DEFAULT_MIN_SIZE`. + * @param int|null $maxSize The maximum number of workers the pool should spawn. Defaults to + * `Pool::DEFAULT_MAX_SIZE`. + * @param WorkerFactoryInterface|null $factory A worker factory to be used to create new workers. */ - public function __construct($minSize, $maxSize = null, WorkerFactoryInterface $factory = null) + public function __construct($minSize = null, $maxSize = null, WorkerFactoryInterface $factory = null) { + $minSize = $minSize ?: static::DEFAULT_MIN_SIZE; + $maxSize = $minSize ?: static::DEFAULT_MAX_SIZE; + if (!is_int($minSize) || $minSize < 0) { throw new InvalidArgumentError('Minimum size must be a non-negative integer.'); } - $this->minSize = $minSize; - if ($maxSize === null) { - $this->maxSize = $minSize; - } elseif (!is_int($maxSize) || $maxSize < 0) { - throw new InvalidArgumentError('Maximum size must be a non-negative integer.'); - } else { - $this->maxSize = $maxSize; + if (!is_int($maxSize) || $maxSize < 0 || $maxSize < $minSize) { + throw new InvalidArgumentError('Maximum size must be a non-negative integer at least '.$minSize.'.'); } + $this->maxSize = $maxSize; + $this->minSize = $minSize; + // Create the default factory if none is given. $this->factory = $factory ?: new WorkerFactory(); $this->workers = new \SplObjectStorage(); $this->idleWorkers = new \SplObjectStorage(); $this->busyQueue = new \SplQueue(); - $this->deferredQueue = new \SplQueue(); - - // Start up the pool with the minimum number of workers. - while (--$minSize >= 0) { - $this->createWorker(); - } } /** - * Gets the minimum number of workers the worker pool may have idle. + * Checks if the pool is running. + * + * @return bool True if the pool is running, otherwise false. + */ + public function isRunning() + { + return $this->running; + } + + /** + * Checks if the pool has any idle workers. + * + * @return bool True if the pool has at least one idle worker, otherwise false. + */ + public function isIdle() + { + return $this->idleWorkers->count() > 0; + } + + /** + * Gets the minimum number of workers the pool may have idle. * * @return int The minimum number of workers. */ @@ -98,7 +126,7 @@ class WorkerPool } /** - * Gets the maximum number of workers the worker pool may spawn to handle concurrent tasks. + * Gets the maximum number of workers the pool may spawn to handle concurrent tasks. * * @return int The maximum number of workers. */ @@ -127,6 +155,23 @@ class WorkerPool return $this->idleWorkers->count(); } + /** + * Starts the worker pool execution. + * + * When the worker pool starts up, the minimum number of workers will be created. This adds some overhead to + * starting the pool, but allows for greater performance during runtime. + */ + public function start() + { + // Start up the pool with the minimum number of workers. + $count = $this->minSize; + while (--$count >= 0) { + $this->createWorker(); + } + + $this->running = true; + } + /** * Enqueues a task to be executed by the worker pool. * @@ -138,11 +183,17 @@ class WorkerPool * * @resolve mixed The return value of the task. */ - public function enqueue(TaskInterface $task) + public function enqueue(TaskInterface $task /* , ...$args */) { + if (!$this->running) { + throw new SynchronizationError('The worker pool has not been started.'); + } + + $args = array_slice(func_get_args(), 1); + // Enqueue the task if we have an idle worker. if ($worker = $this->getIdleWorker()) { - yield $this->enqueueToWorker($task, $worker); + yield $this->enqueueToWorker($worker, $task, $args); return; } @@ -172,6 +223,30 @@ class WorkerPool } yield Promise\all($shutdowns); + $this->running = false; + } + + /** + * Kills all workers in the pool and halts the worker pool. + */ + public function kill() + { + foreach ($this->workers as $worker) { + $worker->kill(); + } + + $this->running = false; + } + + /** + * Shuts down the pool when it is destroyed. + */ + public function __destruct() + { + if ($this->isRunning()) { + $coroutine = new Coroutine($this->shutdown()); + $coroutine->done(); + } } /** @@ -216,17 +291,18 @@ class WorkerPool * * @coroutine * - * @param TaskInterface $task The task to enqueue. * @param WorkerInterface $worker The worker to enqueue to. + * @param TaskInterface $task The task to enqueue. + * @param array $args An array of arguments to pass to the task. * * @return \Generator * * @resolve mixed The return value of the task. */ - private function enqueueToWorker(TaskInterface $task, WorkerInterface $worker) + private function enqueueToWorker(WorkerInterface $worker, TaskInterface $task, array $args = []) { $this->idleWorkers->detach($worker); - yield $worker->enqueue($task); + yield call_user_func_array([$worker, 'enqueue'], array_merge([$task], $args)); $this->idleWorkers->attach($worker); // Spawn a new coroutine to process the busy queue if not empty. @@ -256,7 +332,7 @@ class WorkerPool $deferred = $this->deferredQueue->dequeue(); try { - $returnValue = (yield $this->enqueueToWorker($task, $worker)); + $returnValue = (yield $this->enqueueToWorker($worker, $task)); $deferred->resolve($returnValue); } catch (\Exception $exception) { $deferred->reject($exception); diff --git a/src/Worker/Worker.php b/src/Worker/Worker.php index 04771bc..5ef9805 100644 --- a/src/Worker/Worker.php +++ b/src/Worker/Worker.php @@ -4,6 +4,7 @@ namespace Icicle\Concurrent\Worker; use Icicle\Concurrent\ContextInterface; use Icicle\Concurrent\Exception\SynchronizationError; use Icicle\Concurrent\Worker\Internal\TaskFailure; +use Icicle\Coroutine\Coroutine; class Worker implements WorkerInterface { @@ -49,31 +50,13 @@ class Worker implements WorkerInterface $this->context->start(); } - /** - * {@inheritdoc} - */ - public function kill() - { - $this->context->kill(); - } - - /** - * {@inheritdoc} - */ - public function shutdown() - { - yield $this->context->send([null, []]); - - yield $this->context->join(); - } - /** * {@inheritdoc} */ public function enqueue(TaskInterface $task /* , ...$args */) { if (!$this->context->isRunning()) { - throw new SynchronizationError('Worker has not been started.'); + throw new SynchronizationError('The worker has not been started.'); } $args = array_slice(func_get_args(), 1); @@ -92,4 +75,33 @@ class Worker implements WorkerInterface yield $result; } -} \ No newline at end of file + + /** + * {@inheritdoc} + */ + public function shutdown() + { + yield $this->context->send([null, []]); + + yield $this->context->join(); + } + + /** + * {@inheritdoc} + */ + public function kill() + { + $this->context->kill(); + } + + /** + * Shuts down the worker when it is destroyed. + */ + public function __destruct() + { + if ($this->isRunning()) { + $coroutine = new Coroutine($this->shutdown()); + $coroutine->done(); + } + } +} diff --git a/src/Worker/WorkerInterface.php b/src/Worker/WorkerInterface.php index c4b018f..7e5c90c 100644 --- a/src/Worker/WorkerInterface.php +++ b/src/Worker/WorkerInterface.php @@ -7,9 +7,9 @@ namespace Icicle\Concurrent\Worker; interface WorkerInterface { /** - * Checks if the context is running. + * Checks if the worker is running. * - * @return bool True if the context is running, otherwise false. + * @return bool True if the worker is running, otherwise false. */ public function isRunning(); @@ -25,20 +25,6 @@ interface WorkerInterface */ public function start(); - /** - * Immediately kills the context. - */ - public function kill(); - - /** - * @coroutine - * - * @return \Generator - * - * @resolve int Exit code. - */ - public function shutdown(); - /** * @coroutine * @@ -51,4 +37,18 @@ interface WorkerInterface * @resolve mixed Task return value. */ public function enqueue(TaskInterface $task); + + /** + * @coroutine + * + * @return \Generator + * + * @resolve int Exit code. + */ + public function shutdown(); + + /** + * Immediately kills the context. + */ + public function kill(); } diff --git a/src/Worker/functions.php b/src/Worker/functions.php index f5d4439..7a2ef7b 100644 --- a/src/Worker/functions.php +++ b/src/Worker/functions.php @@ -1,9 +1,11 @@ start(); } return $instance; @@ -31,16 +30,14 @@ if (!function_exists(__NAMESPACE__ . '\pool')) { /** * Enqueues a task to be executed by the worker pool. * - * @coroutine - * * @param TaskInterface $task The task to enqueue. * - * @return \Generator + * @return \Icicle\Promise\PromiseInterface * * @resolve mixed The return value of the task. */ function enqueue(TaskInterface $task) { - return pool()->enqueue($task); + return new Coroutine(pool()->enqueue($task)); } }