Refactor Parcels

This commit is contained in:
Aaron Piotrowski 2017-11-29 14:40:07 -06:00
parent 996654920d
commit 33a5b89ff7
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
10 changed files with 229 additions and 315 deletions

View File

@ -22,11 +22,8 @@ interface Parcel {
* Asynchronously invokes a callback while maintaining an exclusive lock on the parcel. The current value of the
* parcel is provided as the first argument to the callback function.
*
* The arguments passed to the callback depend on the implementing object. If the callback throws an exception,
* the lock on the object will be immediately released.
*
* @param callable $callback The synchronized callback to invoke.
* The callback may be a regular function or a coroutine.
* @param callable $callback The synchronized callback to invoke. The parcel value is given as the single argument
* to the callback function. The callback may be a regular function or a coroutine.
*
* @return \Amp\Promise<mixed> Resolves with the return value of $callback or fails if $callback
* throws an exception.
@ -34,17 +31,7 @@ interface Parcel {
public function synchronized(callable $callback): Promise;
/**
* Unwraps the parcel and returns the value inside the parcel.
*
* @return mixed The value inside the parcel.
* @return \Amp\Promise<mixed> A promise for the value inside the parcel.
*/
public function unwrap(): Promise;
/**
* Clones the parcel object, resulting in a new, independent parcel.
*
* When a parcel is cloned, a new parcel is created and the original
* parcel's value is duplicated and copied to the new parcel.
*/
public function __clone();
}

View File

@ -7,6 +7,7 @@ use Amp\Failure;
use Amp\Promise;
use Amp\Success;
use Amp\Sync\PosixSemaphore;
use function Amp\call;
/**
* A container object for sharing a value across contexts.
@ -30,7 +31,7 @@ use Amp\Sync\PosixSemaphore;
* @see http://man7.org/linux/man-pages/man2/shmctl.2.html How shared memory works on Linux.
* @see https://msdn.microsoft.com/en-us/library/ms810613.aspx How shared memory works on Windows.
*/
class SharedMemoryParcel implements Parcel, \Serializable {
class SharedMemoryParcel implements Parcel {
/** @var int The byte offset to the start of the object data in memory. */
const MEM_DATA_OFFSET = 7;
@ -40,6 +41,9 @@ class SharedMemoryParcel implements Parcel, \Serializable {
const STATE_MOVED = 2;
const STATE_FREED = 3;
/** @var string */
private $id;
/** @var int The shared memory segment key. */
private $key;
@ -49,6 +53,35 @@ class SharedMemoryParcel implements Parcel, \Serializable {
/** @var int An open handle to the shared memory segment. */
private $handle;
/** @var int */
private $initializer = 0;
/**
* @param string $id
* @param mixed $value
* @param int $size The initial size in bytes of the shared memory segment. It will automatically be expanded as
* necessary.
* @param int $permissions Permissions to access the semaphore. Use file permission format specified as 0xxx.
*
* @return \Amp\Parallel\Sync\SharedMemoryParcel
*/
public static function create(string $id, $value, int $size = 8192, int $permissions = 0600): self {
$parcel = new self($id);
$parcel->init($value, $size, $permissions);
return $parcel;
}
/**
* @param string $id
*
* @return \Amp\Parallel\Sync\SharedMemoryParcel
*/
public static function use(string $id): self {
$parcel = new self($id);
$parcel->open();
return $parcel;
}
/**
* Creates a new local object container.
*
@ -61,12 +94,13 @@ class SharedMemoryParcel implements Parcel, \Serializable {
* @param int $permissions The access permissions to set for the object.
* If not specified defaults to 0600.
*/
public function __construct($value, int $size = 16384, int $permissions = 0600) {
private function __construct(string $id) {
if (!\extension_loaded("shmop")) {
throw new \Error(__CLASS__ . " requires the shmop extension.");
}
$this->init($value, $size, $permissions);
$this->id = $id;
$this->key = self::makeKey($this->id);
}
/**
@ -74,13 +108,18 @@ class SharedMemoryParcel implements Parcel, \Serializable {
* @param int $size
* @param int $permissions
*/
private function init($value, int $size = 16384, int $permissions = 0600) {
$this->key = \abs(\crc32(\spl_object_hash($this)));
private function init($value, int $size = 8192, int $permissions = 0600) {
$this->semaphore = PosixSemaphore::create($this->id, 1);
$this->initializer = \getmypid();
$this->memOpen($this->key, 'n', $permissions, $size + self::MEM_DATA_OFFSET);
$this->setHeader(self::STATE_ALLOCATED, 0, $permissions);
$this->wrap($value);
}
$this->semaphore = new PosixSemaphore(1);
private function open() {
$this->semaphore = PosixSemaphore::use($this->id);
$this->memOpen($this->key, 'w', 0, 0);
}
/**
@ -91,7 +130,7 @@ class SharedMemoryParcel implements Parcel, \Serializable {
*
* @return bool True if the object is freed, otherwise false.
*/
public function isFreed(): bool {
private function isFreed(): bool {
// If we are no longer connected to the memory segment, check if it has
// been invalidated.
if ($this->handle !== null) {
@ -177,22 +216,16 @@ class SharedMemoryParcel implements Parcel, \Serializable {
* @return \Generator
*/
private function doSynchronized(callable $callback): \Generator {
/** @var \Amp\Parallel\Sync\Lock $lock */
/** @var \Amp\Sync\Lock $lock */
$lock = yield $this->semaphore->acquire();
try {
$value = yield $this->unwrap();
$result = $callback($value);
$result = yield call($callback, $value);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
if ($result !== null) {
$this->wrap($result);
}
if ($result instanceof Promise) {
$result = yield $result;
}
$this->wrap(null === $result ? $value : $result);
} finally {
$lock->release();
}
@ -207,73 +240,37 @@ class SharedMemoryParcel implements Parcel, \Serializable {
* The memory containing the shared value will be invalidated. When all
* process disconnect from the object, the shared memory block will be
* destroyed by the OS.
*
* Calling `free()` on an object already freed will have no effect.
*/
public function free() {
if (!$this->isFreed()) {
// Invalidate the memory block by setting its state to FREED.
$this->setHeader(static::STATE_FREED, 0, 0);
// Request the block to be deleted, then close our local handle.
$this->memDelete();
\shmop_close($this->handle);
$this->handle = null;
$this->semaphore->free();
public function __destruct() {
if ($this->initializer === 0 || $this->initializer !== \getmypid()) {
return;
}
}
/**
* Serializes the local object handle.
*
* Note that this does not serialize the object that is referenced, just the
* object handle.
*
* @return string The serialized object handle.
*/
public function serialize(): string {
return \serialize([$this->key, $this->semaphore]);
}
/**
* Unserializes the local object handle.
*
* @param string $serialized The serialized object handle.
*/
public function unserialize($serialized) {
list($this->key, $this->semaphore) = \unserialize($serialized);
$this->memOpen($this->key, 'w', 0, 0);
}
/**
* {@inheritdoc}
*/
public function __clone() {
$value = $this->unwrap();
$header = $this->getHeader();
$this->init($value, $header['size'], $header['permissions']);
}
/**
* Gets information about the object for debugging purposes.
*
* @return array An array of debugging information.
*/
public function __debugInfo() {
if ($this->isFreed()) {
return [
'id' => $this->key,
'object' => null,
'freed' => true,
];
return;
}
return [
'id' => $this->key,
'object' => $this->unwrap(),
'freed' => false,
];
// Invalidate the memory block by setting its state to FREED.
$this->setHeader(static::STATE_FREED, 0, 0);
// Request the block to be deleted, then close our local handle.
$this->memDelete();
\shmop_close($this->handle);
$this->handle = null;
$this->semaphore = null;
}
/**
* Private method to prevent cloning.
*/
private function __clone() {
}
/**
* Private method to prevent serialization.
*/
private function __sleep() {
}
/**
@ -371,4 +368,8 @@ class SharedMemoryParcel implements Parcel, \Serializable {
throw new SharedMemoryException('Failed to discard shared memory block.');
}
}
private static function makeKey(string $id): int {
return \abs(\unpack("l", \md5($id, true))[1]);
}
}

View File

@ -1,32 +0,0 @@
<?php
namespace Amp\Parallel\Thread\Internal;
/**
* @internal
*/
class Storage extends \Threaded {
/** @var mixed */
private $value;
/**
* @param mixed $value
*/
public function __construct($value) {
$this->value = $value;
}
/**
* @return mixed
*/
public function get() {
return $this->value;
}
/**
* @param mixed $value
*/
public function set($value) {
$this->value = $value;
}
}

View File

@ -1,98 +0,0 @@
<?php
namespace Amp\Parallel\Thread;
use Amp\Coroutine;
use Amp\Parallel\Sync\Parcel as SyncParcel;
use Amp\Promise;
use Amp\Success;
/**
* A thread-safe container that shares a value between multiple threads.
*/
class Parcel implements SyncParcel {
/** @var \Amp\Parallel\Thread\Mutex */
private $mutex;
/** @var \Amp\Parallel\Thread\Internal\Storage */
private $storage;
/**
* Creates a new shared object container.
*
* @param mixed $value The value to store in the container.
*/
public function __construct($value) {
$this->init($value);
}
/**
* @param mixed $value
*/
private function init($value) {
$this->mutex = new Mutex;
$this->storage = new Internal\Storage($value);
}
/**
* {@inheritdoc}
*/
public function unwrap(): Promise {
return new Success($this->storage->get());
}
/**
* {@inheritdoc}
*/
protected function wrap($value) {
$this->storage->set($value);
}
/**
* @return \Amp\Promise
*/
public function synchronized(callable $callback): Promise {
return new Coroutine($this->doSynchronized($callback));
}
/**
* @coroutine
*
* Asynchronously invokes a callable while maintaining an exclusive lock on the container.
*
* @param callable<mixed> $callback The function to invoke. The value in the container will be passed as the first
* argument.
*
* @return \Generator
*/
private function doSynchronized(callable $callback): \Generator {
/** @var \Amp\Parallel\Sync\Lock $lock */
$lock = yield $this->mutex->acquire();
try {
$value = yield $this->unwrap();
$result = $callback($value);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
$result = yield $result;
}
$this->wrap(null === $result ? $value : $result);
} finally {
$lock->release();
}
return $result;
}
/**
* {@inheritdoc}
*/
public function __clone() {
$this->init($this->unwrap());
}
}

View File

@ -0,0 +1,83 @@
<?php
namespace Amp\Parallel\Thread;
use Amp\Parallel\Sync\Parcel;
use Amp\Promise;
use Amp\Success;
use Amp\Sync\ThreadedMutex;
use function Amp\call;
/**
* A thread-safe container that shares a value between multiple threads.
*/
class ThreadedParcel implements Parcel {
/** @var \Amp\Sync\ThreadedMutex */
private $mutex;
/** @var \Threaded */
private $storage;
/**
* Creates a new shared object container.
*
* @param mixed $value The value to store in the container.
*/
public function __construct($value) {
$this->mutex = new ThreadedMutex;
$this->storage = new class($value) extends \Threaded {
/** @var mixed */
private $value;
/**
* @param mixed $value
*/
public function __construct($value) {
$this->value = $value;
}
/**
* @return mixed
*/
public function get() {
return $this->value;
}
/**
* @param mixed $value
*/
public function set($value) {
$this->value = $value;
}
};
}
/**
* {@inheritdoc}
*/
public function unwrap(): Promise {
return new Success($this->storage->get());
}
/**
* @return \Amp\Promise
*/
public function synchronized(callable $callback): Promise {
return call(function () use ($callback) {
/** @var \Amp\Sync\Lock $lock */
$lock = yield $this->mutex->acquire();
try {
$result = yield call($callback, $this->storage->get());
if ($result !== null) {
$this->storage->set($result);
}
} finally {
$lock->release();
}
return $result;
});
}
}

View File

@ -29,6 +29,10 @@ abstract class AbstractWorker implements Worker {
* @param \Amp\Parallel\Context $context
*/
public function __construct(Context $context) {
if ($context->isRunning()) {
throw new \Error("The context was already running");
}
$this->context = $context;
$this->onResolve = function ($exception, $data) {

View File

@ -2,8 +2,8 @@
namespace Amp\Parallel\Test\Sync;
use Amp\Loop;
use Amp\PHPUnit\TestCase;
use Amp\Promise;
abstract class AbstractParcelTest extends TestCase {
/**
@ -11,20 +11,15 @@ abstract class AbstractParcelTest extends TestCase {
*/
abstract protected function createParcel($value);
public function testConstructor() {
$object = $this->createParcel(new \stdClass());
$this->assertInternalType('object', $object->unwrap());
}
public function testUnwrapIsOfCorrectType() {
$object = $this->createParcel(new \stdClass());
$this->assertInstanceOf('stdClass', $object->unwrap());
$object = $this->createParcel(new \stdClass);
$this->assertInstanceOf('stdClass', Promise\wait($object->unwrap()));
}
public function testUnwrapIsEqual() {
$object = new \stdClass;
$shared = $this->createParcel($object);
$this->assertEquals($object, $shared->unwrap());
$this->assertEquals($object, Promise\wait($shared->unwrap()));
}
/**
@ -57,22 +52,4 @@ abstract class AbstractParcelTest extends TestCase {
$awaitable->onResolve($callback);
}
/**
* @depends testSynchronized
*/
public function testCloneIsNewParcel() {
Loop::run(function () {
$original = $this->createParcel(1);
$clone = clone $original;
$this->assertSame(2, yield $clone->synchronized(function () {
return 2;
}));
$this->assertSame(1, yield $original->unwrap());
$this->assertSame(2, $clone->unwrap());
});
}
}

View File

@ -2,6 +2,7 @@
namespace Amp\Parallel\Test\Sync;
use Amp\Loop;
use Amp\Parallel\Sync\SharedMemoryParcel;
use Amp\Promise;
@ -10,62 +11,27 @@ use Amp\Promise;
* @requires extension sysvmsg
*/
class SharedMemoryParcelTest extends AbstractParcelTest {
const ID = __CLASS__;
private $parcel;
protected function createParcel($value) {
$this->parcel = new SharedMemoryParcel($value);
$this->parcel = SharedMemoryParcel::create(self::ID, $value);
return $this->parcel;
}
public function tearDown() {
if ($this->parcel !== null) {
$this->parcel->free();
}
}
public function testNewObjectIsNotFreed() {
$object = new SharedMemoryParcel(new \stdClass());
$this->assertFalse($object->isFreed());
$object->free();
}
public function testFreeReleasesObject() {
$object = new SharedMemoryParcel(new \stdClass());
$object->free();
$this->assertTrue($object->isFreed());
}
/**
* @expectedException \Amp\Parallel\Sync\SharedMemoryException
*/
public function testUnwrapThrowsErrorIfFreed() {
$object = new SharedMemoryParcel(new \stdClass());
$object->free();
Promise\wait($object->unwrap());
}
public function testCloneIsNewObject() {
$object = new \stdClass;
$shared = new SharedMemoryParcel($object);
$clone = clone $shared;
$this->assertNotSame($shared, $clone);
$this->assertNotSame($object, Promise\wait($clone->unwrap()));
$this->assertNotEquals($shared->__debugInfo()['id'], $clone->__debugInfo()['id']);
$clone->free();
$shared->free();
$this->parcel = null;
}
public function testObjectOverflowMoved() {
$object = new SharedMemoryParcel('hi', 14);
$object = SharedMemoryParcel::create(self::ID, 'hi', 2);
$awaitable = $object->synchronized(function () {
return 'hello world';
});
Promise\wait($awaitable);
$this->assertEquals('hello world', Promise\wait($object->unwrap()));
$object->free();
}
/**
@ -73,30 +39,37 @@ class SharedMemoryParcelTest extends AbstractParcelTest {
* @requires extension pcntl
*/
public function testSetInSeparateProcess() {
$object = new SharedMemoryParcel(42);
$object = SharedMemoryParcel::create(self::ID, 42);
$this->doInFork(function () use ($object) {
$awaitable = $object->synchronized(function () {
return 43;
$awaitable = $object->synchronized(function ($value) {
return $value + 1;
});
Promise\wait($awaitable);
});
$this->assertEquals(43, Promise\wait($object->unwrap()));
$object->free();
}
/**
* @group posix
* @requires extension pcntl
*/
public function testFreeInSeparateProcess() {
$object = new SharedMemoryParcel(42);
public function testInSeparateProcess() {
$parcel = SharedMemoryParcel::create(self::ID, 42);
$this->doInFork(function () use ($object) {
$object->free();
$this->doInFork(function () {
Loop::run(function () {
$parcel = SharedMemoryParcel::use(self::ID);
$this->assertSame(43, yield $parcel->synchronized(function ($value) {
$this->assertSame(42, $value);
return $value + 1;
}));
});
});
$this->assertTrue($object->isFreed());
Loop::run(function () use ($parcel) {
$this->assertSame(43, yield $parcel->unwrap());
});
}
}

View File

@ -1,15 +0,0 @@
<?php
namespace Amp\Parallel\Test\Thread;
use Amp\Parallel\Test\Sync\AbstractParcelTest;
use Amp\Parallel\Thread\Parcel;
/**
* @requires extension pthreads
*/
class ParcelTest extends AbstractParcelTest {
protected function createParcel($value) {
return new Parcel($value);
}
}

View File

@ -0,0 +1,34 @@
<?php
namespace Amp\Parallel\Test\Thread;
use Amp\Loop;
use Amp\Parallel\Test\Sync\AbstractParcelTest;
use Amp\Parallel\Thread\Thread;
use Amp\Parallel\Thread\ThreadedParcel;
/**
* @requires extension pthreads
*/
class ThreadedParcelTest extends AbstractParcelTest {
protected function createParcel($value) {
return new ThreadedParcel($value);
}
public function testWithinThread() {
Loop::run(function () {
$value = 1;
$parcel = new ThreadedParcel($value);
$thread = Thread::spawn(function (ThreadedParcel $parcel) {
$parcel->synchronized(function (int $value) {
return $value + 1;
});
return 0;
}, $parcel);
$this->assertSame(0, yield $thread->join());
$this->assertSame($value + 1, yield $parcel->unwrap());
});
}
}