Add working worker pool with busy queue

This commit is contained in:
coderstephen 2015-08-27 22:51:50 -05:00
parent 8326d955c5
commit 6e317abe1b
4 changed files with 241 additions and 10 deletions

24
examples/worker-pool.php Executable file
View File

@ -0,0 +1,24 @@
#!/usr/bin/env php
<?php
require dirname(__DIR__).'/vendor/autoload.php';
use Icicle\Concurrent\Worker\HelloTask;
use Icicle\Concurrent\Worker\WorkerPool;
use Icicle\Coroutine;
use Icicle\Loop;
use Icicle\Promise;
Coroutine\create(function () {
$pool = new WorkerPool(1);
$returnValues = (yield Promise\all([
new Coroutine\Coroutine($pool->enqueue(new HelloTask())),
new Coroutine\Coroutine($pool->enqueue(new HelloTask())),
new Coroutine\Coroutine($pool->enqueue(new HelloTask())),
]));
var_dump($returnValues);
yield $pool->shutdown();
})->done();
Loop\run();

View File

@ -1,8 +1,18 @@
<?php
namespace Icicle\Concurrent\Worker;
class WorkerFactory
/**
* The built-in worker factory type.
*/
class WorkerFactory implements WorkerFactoryInterface
{
/**
* {@inheritdoc}
*
* The type of worker created depends on the extensions available. If multi-threading is enabled, a WorkerThread
* will be created. If threads are not available, a WorkerFork will be created if forking is available, otherwise
* a WorkerProcess will be created.
*/
public function create()
{
if (extension_loaded('pthreads')) {

View File

@ -0,0 +1,15 @@
<?php
namespace Icicle\Concurrent\Worker;
/**
* Interface for factories used to create new workers.
*/
interface WorkerFactoryInterface
{
/**
* Creates a new worker instance.
*
* @return WorkerInterface The newly created worker.
*/
public function create();
}

View File

@ -2,6 +2,10 @@
namespace Icicle\Concurrent\Worker;
use Icicle\Concurrent\Exception\InvalidArgumentError;
use Icicle\Coroutine\Coroutine;
use Icicle\Promise;
use Icicle\Promise\Deferred;
use Icicle\Promise\PromiseInterface;
/**
* Provides a pool of workers that can be used to execute multiple tasks asynchronously.
@ -22,13 +26,39 @@ class WorkerPool
*/
private $maxSize;
/**
* @var WorkerFactoryInterface A worker factory to be used to create new workers.
*/
private $factory;
/**
* @var \SplObjectStorage A collection of all workers in the pool.
*/
private $workers;
/**
* @var \SplObjectStorage A collection of idle workers.
*/
private $idleWorkers;
/**
* @var \SplQueue A queue of tasks waiting to be submitted.
*/
private $busyQueue;
/**
* @var \SplQueue A queue of deferred to be fulfilled for waiting tasks.
*/
private $deferredQueue;
/**
* Creates a new worker pool.
*
* @param int $minSize The minimum number of workers the pool should spawn.
* @param int $maxSize The maximum number of workers the pool should spawn.
* @param int $minSize The minimum number of workers the pool should spawn.
* @param int $maxSize The maximum number of workers the pool should spawn.
* @param WorkerFactoryInterface $factory A worker factory to be used to create new workers.
*/
public function __construct(WorkerFactory $factory, $minSize, $maxSize = null)
public function __construct($minSize, $maxSize = null, WorkerFactoryInterface $factory = null)
{
if (!is_int($minSize) || $minSize < 0) {
throw new InvalidArgumentError('Minimum size must be a non-negative integer.');
@ -42,44 +72,196 @@ class WorkerPool
} else {
$this->maxSize = $maxSize;
}
// Create the default factory if none is given.
$this->factory = $factory ?: new WorkerFactory();
$this->workers = new \SplObjectStorage();
$this->idleWorkers = new \SplObjectStorage();
$this->busyQueue = new \SplQueue();
$this->deferredQueue = new \SplQueue();
// Start up the pool with the minimum number of workers.
while (--$minSize >= 0) {
$this->createWorker();
}
}
/**
* Gets the minimum number of workers the worker pool may have idle.
*
* @return int The minimum number of workers.
*/
public function getMinSize()
{
return $this->minSize;
}
/**
* Gets the maximum number of workers the worker pool may spawn to handle concurrent tasks.
*
* @return int The maximum number of workers.
*/
public function getMaxSize()
{
return $this->maxSize;
}
/**
* Gets the number of workers that have been spawned.
* Gets the number of workers currently running in the pool.
*
* @return int
* @return int The number of workers.
*/
public function getWorkerCount()
{
return $this->workers->count();
}
/**
* Gets the number of workers that are currently idle.
*
* @return int
* @return int The number of idle workers.
*/
public function getIdleWorkerCount()
{
return $this->idleWorkers->count();
}
/**
* Enqueues a task to be executed in the worker pool.
* Enqueues a task to be executed by the worker pool.
*
* @param TaskInterface $task The task to execute.
* @coroutine
*
* @return \Icicle\Promise\PromiseInterface
* @param TaskInterface $task The task to enqueue.
*
* @return \Generator
*
* @resolve mixed The return value of the task.
*/
public function enqueue(TaskInterface $task)
{
$worker = $this->getIdleWorker();
// Enqueue the task if we have an idle worker.
if ($worker) {
yield $this->enqueueToWorker($task, $worker);
return;
}
// If we're at our limit of busy workers, add the task to the waiting list to be enqueued later when a new
// worker becomes available.
$deferred = new Deferred();
$this->busyQueue->enqueue($task);
$this->deferredQueue->enqueue($deferred);
// Yield a promise that will be resolved when the task gets processed later.
yield $deferred->getPromise();
}
/**
* Shuts down the pool and all workers in it.
*
* @coroutine
*
* @return \Generator
*/
public function shutdown()
{
$shutdowns = [];
foreach ($this->workers as $worker) {
$shutdowns[] = new Coroutine($worker->shutdown());
}
yield Promise\all($shutdowns);
}
/**
* Creates a worker and adds them to the pool.
*
* @return WorkerInterface The worker created.
*/
private function createWorker()
{
$worker = $this->factory->create();
$worker->start();
$this->workers->attach($worker);
$this->idleWorkers->attach($worker);
return $worker;
}
/**
* Gets the first available idle worker, or spawns a new worker if able.
*
* @return WorkerInterface|null An idle worker, or null if none could be found.
*/
private function getIdleWorker()
{
// If there are idle workers, select the first one and return it.
if ($this->idleWorkers->count() > 0) {
$this->idleWorkers->rewind();
return $this->idleWorkers->current();
}
// If there are no idle workers and we are allowed to spawn more, do so now.
if ($this->getWorkerCount() < $this->maxSize) {
return $this->createWorker();
}
}
/**
* Enqueues a task to a given worker.
*
* Waits for the task to finish, and resolves with the task's result. When the assigned worker becomes idle again,
* a new coroutine is started to process the busy task queue if needed.
*
* @coroutine
*
* @param TaskInterface $task The task to enqueue.
* @param WorkerInterface $worker The worker to enqueue to.
*
* @return \Generator
*
* @resolve mixed The return value of the task.
*/
private function enqueueToWorker(TaskInterface $task, WorkerInterface $worker)
{
$this->idleWorkers->detach($worker);
yield $worker->enqueue($task);
$this->idleWorkers->attach($worker);
// Spawn a new coroutine to process the busy queue if not empty.
if (!$this->busyQueue->isEmpty()) {
new Coroutine($this->processBusyQueue());
}
}
/**
* Processes the busy queue until it is empty.
*
* @coroutine
*
* @return \Generator
*/
private function processBusyQueue()
{
while (!$this->busyQueue->isEmpty()) {
// If we cannot find an idle worker, give up like a wimp. (Don't worry, some other coroutine will pick up
// the slack).
if (!($worker = $this->getIdleWorker())) {
break;
}
$task = $this->busyQueue->dequeue();
$deferred = $this->deferredQueue->dequeue();
try {
$returnValue = (yield $this->enqueueToWorker($task, $worker));
$deferred->resolve($returnValue);
} catch (\Exception $exception) {
$deferred->reject($exception);
}
}
}
}