2015-07-13 17:30:59 -05:00
|
|
|
<?php
|
|
|
|
namespace Icicle\Concurrent\Threading;
|
|
|
|
|
2015-08-05 02:48:43 -05:00
|
|
|
use Icicle\Concurrent\Sync\Channel;
|
2015-07-26 17:53:00 -05:00
|
|
|
use Icicle\Coroutine\Coroutine;
|
|
|
|
use Icicle\Loop;
|
|
|
|
|
2015-07-15 12:36:32 -05:00
|
|
|
/**
|
2015-07-26 17:53:00 -05:00
|
|
|
* An internal thread that executes a given function concurrently.
|
2015-07-15 12:36:32 -05:00
|
|
|
*/
|
2015-07-13 17:30:59 -05:00
|
|
|
class Thread extends \Thread
|
|
|
|
{
|
2015-07-26 17:53:00 -05:00
|
|
|
/**
|
|
|
|
* @var ThreadContext An instance of the context local to this thread.
|
|
|
|
*/
|
|
|
|
public $context;
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @var string|null Path to an autoloader to include.
|
|
|
|
*/
|
|
|
|
public $autoloaderPath;
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @var callable The function to execute in the thread.
|
|
|
|
*/
|
|
|
|
private $function;
|
|
|
|
|
2015-08-05 02:48:43 -05:00
|
|
|
public $prepared = false;
|
|
|
|
public $initialized = false;
|
|
|
|
|
|
|
|
private $channel;
|
|
|
|
private $socket;
|
|
|
|
|
2015-07-26 17:53:00 -05:00
|
|
|
/**
|
|
|
|
* Creates a new thread object.
|
|
|
|
*
|
|
|
|
* @param callable $function The function to execute in the thread.
|
|
|
|
*/
|
|
|
|
public function __construct(callable $function)
|
2015-07-14 17:15:10 -05:00
|
|
|
{
|
2015-08-05 13:30:05 -05:00
|
|
|
$this->context = new ThreadContext($this);
|
2015-08-05 02:48:43 -05:00
|
|
|
$this->function = $function;
|
2015-07-14 17:15:10 -05:00
|
|
|
}
|
2015-07-13 17:30:59 -05:00
|
|
|
|
2015-08-05 02:48:43 -05:00
|
|
|
/**
|
|
|
|
* Initializes the thread by injecting values from the parent into threaded memory.
|
|
|
|
*
|
|
|
|
* @param resource $socket The channel socket to communicate to the parent with.
|
|
|
|
*/
|
|
|
|
public function init($socket)
|
2015-07-13 17:30:59 -05:00
|
|
|
{
|
|
|
|
$this->socket = $socket;
|
2015-08-05 02:48:43 -05:00
|
|
|
$this->initialized = true;
|
2015-07-13 17:30:59 -05:00
|
|
|
}
|
|
|
|
|
2015-08-05 02:48:43 -05:00
|
|
|
/**
|
|
|
|
* Runs the thread code and the initialized function.
|
|
|
|
*/
|
2015-07-13 17:30:59 -05:00
|
|
|
public function run()
|
|
|
|
{
|
2015-08-05 02:48:43 -05:00
|
|
|
// First thing we need to do is prepare the thread environment to make
|
|
|
|
// it usable, so lock the thread while we do it. Hopefully we get the
|
|
|
|
// lock first, but if we don't the parent will release and give us a
|
|
|
|
// chance before continuing.
|
|
|
|
$this->lock();
|
|
|
|
|
|
|
|
// First thing we need to do is initialize the class autoloader. If we
|
|
|
|
// don't do this first, objects we receive from other threads will just
|
|
|
|
// be garbage data and unserializable values (like resources) will be
|
|
|
|
// lost. This happens even with thread-safe objects.
|
|
|
|
if (file_exists($this->autoloaderPath)) {
|
|
|
|
require $this->autoloaderPath;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Initialize the thread-local global event loop.
|
|
|
|
Loop\loop();
|
|
|
|
|
2015-08-05 13:30:05 -05:00
|
|
|
// Register a shutdown handler to deal with errors smoothly.
|
|
|
|
//register_shutdown_function([$this, 'handleShutdown']);
|
|
|
|
|
2015-08-05 02:48:43 -05:00
|
|
|
// Now let the parent thread know that we are done preparing the
|
|
|
|
// thread environment and are ready to accept data.
|
|
|
|
$this->prepared = true;
|
|
|
|
$this->notify();
|
|
|
|
$this->unlock();
|
|
|
|
|
|
|
|
// Wait for objects to be injected by the context wrapper object.
|
|
|
|
$this->lock();
|
|
|
|
if (!$this->initialized) {
|
|
|
|
$this->wait();
|
|
|
|
}
|
|
|
|
$this->unlock();
|
|
|
|
|
|
|
|
// At this point, the thread environment has been prepared, and the
|
2015-08-05 13:30:05 -05:00
|
|
|
// parent has finished injecting values into our memory, so begin using
|
|
|
|
// the channel.
|
2015-08-05 02:48:43 -05:00
|
|
|
$this->channel = new LocalObject(new Channel($this->socket));
|
|
|
|
|
2015-08-05 13:30:05 -05:00
|
|
|
// Now that everything is finally ready, invoke the function, closure,
|
|
|
|
// or coroutine passed in from the user.
|
2015-07-26 17:53:00 -05:00
|
|
|
try {
|
2015-08-05 02:48:43 -05:00
|
|
|
if ($this->function instanceof \Closure) {
|
|
|
|
$generator = $this->function->bindTo($this->context)->__invoke();
|
|
|
|
} else {
|
|
|
|
$generator = call_user_func($this->function);
|
2015-07-26 17:53:00 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
if ($generator instanceof \Generator) {
|
|
|
|
$coroutine = new Coroutine($generator);
|
2015-08-05 02:48:43 -05:00
|
|
|
} else {
|
|
|
|
$returnValue = $generator;
|
2015-07-26 17:53:00 -05:00
|
|
|
}
|
|
|
|
|
2015-08-05 13:30:05 -05:00
|
|
|
// Send the return value back to the parent thread.
|
|
|
|
$response = [
|
|
|
|
'ok' => true,
|
|
|
|
'value' => $returnValue,
|
|
|
|
];
|
|
|
|
new Coroutine($this->channel->deref()->send($response));
|
|
|
|
|
|
|
|
Loop\run();
|
|
|
|
} catch (\Exception $exception) {
|
|
|
|
// If normal execution failed and caused an error, catch it and send
|
|
|
|
// it to the parent context so the error can bubble up.
|
|
|
|
$response = [
|
|
|
|
'ok' => false,
|
2015-08-05 02:48:43 -05:00
|
|
|
'panic' => [
|
|
|
|
'message' => $exception->getMessage(),
|
|
|
|
'code' => $exception->getCode(),
|
|
|
|
'trace' => array_map([$this, 'removeTraceArgs'], $exception->getTrace()),
|
|
|
|
],
|
|
|
|
];
|
|
|
|
|
2015-08-05 13:30:05 -05:00
|
|
|
new Coroutine($this->channel->deref()->send($response));
|
2015-08-05 02:48:43 -05:00
|
|
|
} finally {
|
|
|
|
$this->channel->deref()->close();
|
|
|
|
}
|
|
|
|
|
2015-08-05 13:30:05 -05:00
|
|
|
// We don't really need to do this, but let's be explicit about freeing
|
|
|
|
// our resources.
|
2015-08-05 02:48:43 -05:00
|
|
|
$this->channel->free();
|
|
|
|
}
|
|
|
|
|
|
|
|
public function handleShutdown()
|
|
|
|
{
|
|
|
|
if ($error = error_get_last()) {
|
|
|
|
$panic = [
|
|
|
|
'message' => $error['message'],
|
|
|
|
'code' => 0,
|
|
|
|
'trace' => array_map([$this, 'removeTraceArgs'], debug_backtrace()),
|
|
|
|
];
|
2015-07-13 17:30:59 -05:00
|
|
|
|
2015-07-26 17:53:00 -05:00
|
|
|
$this->sendMessage(self::MSG_ERROR);
|
2015-08-05 02:48:43 -05:00
|
|
|
$serialized = serialize($panic);
|
2015-07-26 17:53:00 -05:00
|
|
|
$length = strlen($serialized);
|
2015-08-05 02:48:43 -05:00
|
|
|
fwrite($this->socket, pack('S', $length).$serialized);
|
2015-07-26 17:53:00 -05:00
|
|
|
fclose($this->socket);
|
|
|
|
}
|
2015-07-13 17:30:59 -05:00
|
|
|
}
|
|
|
|
|
2015-08-05 02:48:43 -05:00
|
|
|
public function removeTraceArgs($trace)
|
|
|
|
{
|
|
|
|
unset($trace['args']);
|
|
|
|
return $trace;
|
|
|
|
}
|
2015-07-13 17:30:59 -05:00
|
|
|
}
|