Remove Channel::createSocketPair(); refactor Thread

This commit is contained in:
Aaron Piotrowski 2015-09-04 16:22:41 -05:00
parent 2e6ecab842
commit ff44afde6d
4 changed files with 45 additions and 49 deletions

View File

@ -13,6 +13,7 @@ use Icicle\Concurrent\Sync\Internal\ExitStatusInterface;
use Icicle\Concurrent\Sync\Internal\ExitSuccess;
use Icicle\Coroutine\Coroutine;
use Icicle\Loop;
use Icicle\Socket;
use Icicle\Socket\Stream\DuplexStream;
/**
@ -138,6 +139,9 @@ class Fork implements ContextInterface
/**
* Starts the context execution.
*
* @throws \Icicle\Concurrent\Exception\ForkException If forking fails.
* @throws \Icicle\Socket\Exception\FailureException If creating a socket pair fails.
*/
public function start()
{
@ -145,7 +149,7 @@ class Fork implements ContextInterface
throw new StatusError('The context has already been started.');
}
list($parent, $child) = Channel::createSocketPair();
list($parent, $child) = Socket\pair();
switch ($pid = pcntl_fork()) {
case -1: // Failure

View File

@ -59,27 +59,6 @@ class Channel implements ChannelInterface
};
}
/**
* Returns a pair of connected stream socket resources.
*
* Creates a new channel connection and returns two connections to the
* channel. Each connection is a peer and interacts with the other, even
* across threads or processes.
*
* @return resource[] Pair of socket resources.
*
* @throws \Icicle\Concurrent\Exception\ChannelException If creating the sockets fails.
*/
public static function createSocketPair()
{
// Create a socket pair.
if (($sockets = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP)) === false) {
throw new ChannelException('Failed to create channel sockets.');
}
return $sockets;
}
/**
* {@inheritdoc}
*/

View File

@ -9,6 +9,7 @@ use Icicle\Concurrent\Exception\ThreadException;
use Icicle\Concurrent\Sync\Channel;
use Icicle\Concurrent\Sync\Internal\ExitStatusInterface;
use Icicle\Coroutine;
use Icicle\Socket;
use Icicle\Socket\Stream\DuplexStream;
/**
@ -38,9 +39,14 @@ class Thread implements ContextInterface
private $socket;
/**
* @var bool
* @var callable
*/
private $started = false;
private $function;
/**
* @var mixed[]
*/
private $args;
/**
* Spawns a new thread and runs it.
@ -76,10 +82,19 @@ class Thread implements ContextInterface
}
}
list($channel, $this->socket) = Channel::createSocketPair();
$this->function = $function;
$this->args = $args;
}
$this->thread = new Internal\Thread($this->socket, $function, $args);
$this->channel = new Channel(new DuplexStream($channel));
/**
* Returns the thread to the condition before starting. The new thread can be started and run independently of the
* first thread.
*/
public function __clone()
{
$this->thread = null;
$this->socket = null;
$this->channel = null;
}
/**
@ -89,26 +104,31 @@ class Thread implements ContextInterface
*/
public function isRunning()
{
return $this->started && $this->thread->isRunning() && $this->channel->isOpen();
return null !== $this->thread && $this->thread->isRunning() && $this->channel->isOpen();
}
/**
* Spawns the thread and begins the thread's execution.
*
* @throws StatusError If the thread has already been started.
* @throws ThreadException If starting the thread was unsuccessful.
* @throws \Icicle\Concurrent\Exception\StatusError If the thread has already been started.
* @throws \Icicle\Concurrent\Exception\ThreadException If starting the thread was unsuccessful.
* @throws \Icicle\Socket\Exception\FailureException If creating a socket pair fails.
*/
public function start()
{
if ($this->started) {
if (null !== $this->thread) {
throw new StatusError('The thread has already been started.');
}
list($channel, $this->socket) = Socket\pair();
$this->thread = new Internal\Thread($this->socket, $this->function, $this->args);
if (!$this->thread->start(PTHREADS_INHERIT_INI | PTHREADS_INHERIT_FUNCTIONS | PTHREADS_INHERIT_CLASSES)) {
throw new ThreadException('Failed to start the thread.');
}
$this->started = true;
$this->channel = new Channel(new DuplexStream($channel));
}
/**
@ -130,7 +150,7 @@ class Thread implements ContextInterface
*/
private function close()
{
if ($this->started && $this->channel->isOpen()) {
if (null !== $this->channel && $this->channel->isOpen()) {
$this->channel->close();
}
@ -154,7 +174,7 @@ class Thread implements ContextInterface
*/
public function join()
{
if (!$this->started) {
if (null === $this->thread) {
throw new StatusError('The thread has not been started.');
}
@ -181,7 +201,7 @@ class Thread implements ContextInterface
*/
public function receive()
{
if (!$this->started) {
if (null === $this->thread) {
throw new StatusError('The thread has not been started.');
}
@ -204,7 +224,7 @@ class Thread implements ContextInterface
*/
public function send($data)
{
if (!$this->started) {
if (null === $this->thread) {
throw new StatusError('The thread has not been started.');
}

View File

@ -4,6 +4,7 @@ namespace Icicle\Tests\Concurrent\Sync;
use Icicle\Concurrent\Sync\Channel;
use Icicle\Coroutine;
use Icicle\Loop;
use Icicle\Socket;
use Icicle\Socket\Stream\DuplexStream;
use Icicle\Stream\DuplexStreamInterface;
use Icicle\Stream\ReadableStreamInterface;
@ -12,14 +13,6 @@ use Icicle\Tests\Concurrent\TestCase;
class ChannelTest extends TestCase
{
public function testCreateSocketPair()
{
list($a, $b) = Channel::createSocketPair();
$this->assertInternalType('resource', $a);
$this->assertInternalType('resource', $b);
}
public function testIsOpen()
{
$mock = $this->getMock(DuplexStreamInterface::class);
@ -82,7 +75,7 @@ class ChannelTest extends TestCase
public function testSendReceive()
{
Coroutine\create(function () {
list($a, $b) = Channel::createSocketPair();
list($a, $b) = Socket\pair();
$a = new Channel(new DuplexStream($a));
$b = new Channel(new DuplexStream($b));
@ -104,7 +97,7 @@ class ChannelTest extends TestCase
public function testInvalidDataReceived()
{
Coroutine\create(function () {
list($a, $b) = Channel::createSocketPair();
list($a, $b) = Socket\pair();
$a = new Channel($stream = new DuplexStream($a));
$b = new Channel(new DuplexStream($b));
@ -123,7 +116,7 @@ class ChannelTest extends TestCase
public function testSendUnserializableData()
{
Coroutine\create(function () {
list($a, $b) = Channel::createSocketPair();
list($a, $b) = Socket\pair();
$a = new Channel(new DuplexStream($a));
$b = new Channel(new DuplexStream($b));
@ -142,7 +135,7 @@ class ChannelTest extends TestCase
public function testSendAfterClose()
{
Coroutine\create(function () {
list($a, $b) = Channel::createSocketPair();
list($a, $b) = Socket\pair();
$a = new Channel(new DuplexStream($a));
$b = new Channel(new DuplexStream($b));
@ -162,7 +155,7 @@ class ChannelTest extends TestCase
public function testReceiveAfterClose()
{
Coroutine\create(function () {
list($a, $b) = Channel::createSocketPair();
list($a, $b) = Socket\pair();
$a = new Channel(new DuplexStream($a));
$b = new Channel(new DuplexStream($b));