From 33a5b89ff7b75dd64a4da5d667e70f5bc3d302cd Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Wed, 29 Nov 2017 14:40:07 -0600 Subject: [PATCH] Refactor Parcels --- lib/Sync/Parcel.php | 19 +--- lib/Sync/SharedMemoryParcel.php | 157 ++++++++++++++------------- lib/Thread/Internal/Storage.php | 32 ------ lib/Thread/Parcel.php | 98 ----------------- lib/Thread/ThreadedParcel.php | 83 ++++++++++++++ lib/Worker/AbstractWorker.php | 4 + test/Sync/AbstractParcelTest.php | 31 +----- test/Sync/SharedMemoryParcelTest.php | 71 ++++-------- test/Thread/ParcelTest.php | 15 --- test/Thread/ThreadedParcelTest.php | 34 ++++++ 10 files changed, 229 insertions(+), 315 deletions(-) delete mode 100644 lib/Thread/Internal/Storage.php delete mode 100644 lib/Thread/Parcel.php create mode 100644 lib/Thread/ThreadedParcel.php delete mode 100644 test/Thread/ParcelTest.php create mode 100644 test/Thread/ThreadedParcelTest.php diff --git a/lib/Sync/Parcel.php b/lib/Sync/Parcel.php index 487207e..2085137 100644 --- a/lib/Sync/Parcel.php +++ b/lib/Sync/Parcel.php @@ -22,11 +22,8 @@ interface Parcel { * Asynchronously invokes a callback while maintaining an exclusive lock on the parcel. The current value of the * parcel is provided as the first argument to the callback function. * - * The arguments passed to the callback depend on the implementing object. If the callback throws an exception, - * the lock on the object will be immediately released. - * - * @param callable $callback The synchronized callback to invoke. - * The callback may be a regular function or a coroutine. + * @param callable $callback The synchronized callback to invoke. The parcel value is given as the single argument + * to the callback function. The callback may be a regular function or a coroutine. * * @return \Amp\Promise Resolves with the return value of $callback or fails if $callback * throws an exception. @@ -34,17 +31,7 @@ interface Parcel { public function synchronized(callable $callback): Promise; /** - * Unwraps the parcel and returns the value inside the parcel. - * - * @return mixed The value inside the parcel. + * @return \Amp\Promise A promise for the value inside the parcel. */ public function unwrap(): Promise; - - /** - * Clones the parcel object, resulting in a new, independent parcel. - * - * When a parcel is cloned, a new parcel is created and the original - * parcel's value is duplicated and copied to the new parcel. - */ - public function __clone(); } diff --git a/lib/Sync/SharedMemoryParcel.php b/lib/Sync/SharedMemoryParcel.php index 8b8f6da..9af9c70 100644 --- a/lib/Sync/SharedMemoryParcel.php +++ b/lib/Sync/SharedMemoryParcel.php @@ -7,6 +7,7 @@ use Amp\Failure; use Amp\Promise; use Amp\Success; use Amp\Sync\PosixSemaphore; +use function Amp\call; /** * A container object for sharing a value across contexts. @@ -30,7 +31,7 @@ use Amp\Sync\PosixSemaphore; * @see http://man7.org/linux/man-pages/man2/shmctl.2.html How shared memory works on Linux. * @see https://msdn.microsoft.com/en-us/library/ms810613.aspx How shared memory works on Windows. */ -class SharedMemoryParcel implements Parcel, \Serializable { +class SharedMemoryParcel implements Parcel { /** @var int The byte offset to the start of the object data in memory. */ const MEM_DATA_OFFSET = 7; @@ -40,6 +41,9 @@ class SharedMemoryParcel implements Parcel, \Serializable { const STATE_MOVED = 2; const STATE_FREED = 3; + /** @var string */ + private $id; + /** @var int The shared memory segment key. */ private $key; @@ -49,6 +53,35 @@ class SharedMemoryParcel implements Parcel, \Serializable { /** @var int An open handle to the shared memory segment. */ private $handle; + /** @var int */ + private $initializer = 0; + + /** + * @param string $id + * @param mixed $value + * @param int $size The initial size in bytes of the shared memory segment. It will automatically be expanded as + * necessary. + * @param int $permissions Permissions to access the semaphore. Use file permission format specified as 0xxx. + * + * @return \Amp\Parallel\Sync\SharedMemoryParcel + */ + public static function create(string $id, $value, int $size = 8192, int $permissions = 0600): self { + $parcel = new self($id); + $parcel->init($value, $size, $permissions); + return $parcel; + } + + /** + * @param string $id + * + * @return \Amp\Parallel\Sync\SharedMemoryParcel + */ + public static function use(string $id): self { + $parcel = new self($id); + $parcel->open(); + return $parcel; + } + /** * Creates a new local object container. * @@ -61,12 +94,13 @@ class SharedMemoryParcel implements Parcel, \Serializable { * @param int $permissions The access permissions to set for the object. * If not specified defaults to 0600. */ - public function __construct($value, int $size = 16384, int $permissions = 0600) { + private function __construct(string $id) { if (!\extension_loaded("shmop")) { throw new \Error(__CLASS__ . " requires the shmop extension."); } - $this->init($value, $size, $permissions); + $this->id = $id; + $this->key = self::makeKey($this->id); } /** @@ -74,13 +108,18 @@ class SharedMemoryParcel implements Parcel, \Serializable { * @param int $size * @param int $permissions */ - private function init($value, int $size = 16384, int $permissions = 0600) { - $this->key = \abs(\crc32(\spl_object_hash($this))); + private function init($value, int $size = 8192, int $permissions = 0600) { + $this->semaphore = PosixSemaphore::create($this->id, 1); + $this->initializer = \getmypid(); + $this->memOpen($this->key, 'n', $permissions, $size + self::MEM_DATA_OFFSET); $this->setHeader(self::STATE_ALLOCATED, 0, $permissions); $this->wrap($value); + } - $this->semaphore = new PosixSemaphore(1); + private function open() { + $this->semaphore = PosixSemaphore::use($this->id); + $this->memOpen($this->key, 'w', 0, 0); } /** @@ -91,7 +130,7 @@ class SharedMemoryParcel implements Parcel, \Serializable { * * @return bool True if the object is freed, otherwise false. */ - public function isFreed(): bool { + private function isFreed(): bool { // If we are no longer connected to the memory segment, check if it has // been invalidated. if ($this->handle !== null) { @@ -177,22 +216,16 @@ class SharedMemoryParcel implements Parcel, \Serializable { * @return \Generator */ private function doSynchronized(callable $callback): \Generator { - /** @var \Amp\Parallel\Sync\Lock $lock */ + /** @var \Amp\Sync\Lock $lock */ $lock = yield $this->semaphore->acquire(); try { $value = yield $this->unwrap(); - $result = $callback($value); + $result = yield call($callback, $value); - if ($result instanceof \Generator) { - $result = new Coroutine($result); + if ($result !== null) { + $this->wrap($result); } - - if ($result instanceof Promise) { - $result = yield $result; - } - - $this->wrap(null === $result ? $value : $result); } finally { $lock->release(); } @@ -207,73 +240,37 @@ class SharedMemoryParcel implements Parcel, \Serializable { * The memory containing the shared value will be invalidated. When all * process disconnect from the object, the shared memory block will be * destroyed by the OS. - * - * Calling `free()` on an object already freed will have no effect. */ - public function free() { - if (!$this->isFreed()) { - // Invalidate the memory block by setting its state to FREED. - $this->setHeader(static::STATE_FREED, 0, 0); - - // Request the block to be deleted, then close our local handle. - $this->memDelete(); - \shmop_close($this->handle); - $this->handle = null; - - $this->semaphore->free(); + public function __destruct() { + if ($this->initializer === 0 || $this->initializer !== \getmypid()) { + return; } - } - /** - * Serializes the local object handle. - * - * Note that this does not serialize the object that is referenced, just the - * object handle. - * - * @return string The serialized object handle. - */ - public function serialize(): string { - return \serialize([$this->key, $this->semaphore]); - } - - /** - * Unserializes the local object handle. - * - * @param string $serialized The serialized object handle. - */ - public function unserialize($serialized) { - list($this->key, $this->semaphore) = \unserialize($serialized); - $this->memOpen($this->key, 'w', 0, 0); - } - - /** - * {@inheritdoc} - */ - public function __clone() { - $value = $this->unwrap(); - $header = $this->getHeader(); - $this->init($value, $header['size'], $header['permissions']); - } - - /** - * Gets information about the object for debugging purposes. - * - * @return array An array of debugging information. - */ - public function __debugInfo() { if ($this->isFreed()) { - return [ - 'id' => $this->key, - 'object' => null, - 'freed' => true, - ]; + return; } - return [ - 'id' => $this->key, - 'object' => $this->unwrap(), - 'freed' => false, - ]; + // Invalidate the memory block by setting its state to FREED. + $this->setHeader(static::STATE_FREED, 0, 0); + + // Request the block to be deleted, then close our local handle. + $this->memDelete(); + \shmop_close($this->handle); + $this->handle = null; + + $this->semaphore = null; + } + + /** + * Private method to prevent cloning. + */ + private function __clone() { + } + + /** + * Private method to prevent serialization. + */ + private function __sleep() { } /** @@ -371,4 +368,8 @@ class SharedMemoryParcel implements Parcel, \Serializable { throw new SharedMemoryException('Failed to discard shared memory block.'); } } + + private static function makeKey(string $id): int { + return \abs(\unpack("l", \md5($id, true))[1]); + } } diff --git a/lib/Thread/Internal/Storage.php b/lib/Thread/Internal/Storage.php deleted file mode 100644 index a2df245..0000000 --- a/lib/Thread/Internal/Storage.php +++ /dev/null @@ -1,32 +0,0 @@ -value = $value; - } - - /** - * @return mixed - */ - public function get() { - return $this->value; - } - - /** - * @param mixed $value - */ - public function set($value) { - $this->value = $value; - } -} diff --git a/lib/Thread/Parcel.php b/lib/Thread/Parcel.php deleted file mode 100644 index e27f081..0000000 --- a/lib/Thread/Parcel.php +++ /dev/null @@ -1,98 +0,0 @@ -init($value); - } - - /** - * @param mixed $value - */ - private function init($value) { - $this->mutex = new Mutex; - $this->storage = new Internal\Storage($value); - } - - /** - * {@inheritdoc} - */ - public function unwrap(): Promise { - return new Success($this->storage->get()); - } - - /** - * {@inheritdoc} - */ - protected function wrap($value) { - $this->storage->set($value); - } - - /** - * @return \Amp\Promise - */ - public function synchronized(callable $callback): Promise { - return new Coroutine($this->doSynchronized($callback)); - } - - /** - * @coroutine - * - * Asynchronously invokes a callable while maintaining an exclusive lock on the container. - * - * @param callable $callback The function to invoke. The value in the container will be passed as the first - * argument. - * - * @return \Generator - */ - private function doSynchronized(callable $callback): \Generator { - /** @var \Amp\Parallel\Sync\Lock $lock */ - $lock = yield $this->mutex->acquire(); - - try { - $value = yield $this->unwrap(); - $result = $callback($value); - - if ($result instanceof \Generator) { - $result = new Coroutine($result); - } - - if ($result instanceof Promise) { - $result = yield $result; - } - - $this->wrap(null === $result ? $value : $result); - } finally { - $lock->release(); - } - - return $result; - } - - /** - * {@inheritdoc} - */ - public function __clone() { - $this->init($this->unwrap()); - } -} diff --git a/lib/Thread/ThreadedParcel.php b/lib/Thread/ThreadedParcel.php new file mode 100644 index 0000000..baa0cb8 --- /dev/null +++ b/lib/Thread/ThreadedParcel.php @@ -0,0 +1,83 @@ +mutex = new ThreadedMutex; + $this->storage = new class($value) extends \Threaded { + /** @var mixed */ + private $value; + + /** + * @param mixed $value + */ + public function __construct($value) { + $this->value = $value; + } + + /** + * @return mixed + */ + public function get() { + return $this->value; + } + + /** + * @param mixed $value + */ + public function set($value) { + $this->value = $value; + } + }; + } + + /** + * {@inheritdoc} + */ + public function unwrap(): Promise { + return new Success($this->storage->get()); + } + + /** + * @return \Amp\Promise + */ + public function synchronized(callable $callback): Promise { + return call(function () use ($callback) { + /** @var \Amp\Sync\Lock $lock */ + $lock = yield $this->mutex->acquire(); + + try { + $result = yield call($callback, $this->storage->get()); + + if ($result !== null) { + $this->storage->set($result); + } + } finally { + $lock->release(); + } + + return $result; + }); + } +} diff --git a/lib/Worker/AbstractWorker.php b/lib/Worker/AbstractWorker.php index 60c3b81..b76bbd7 100644 --- a/lib/Worker/AbstractWorker.php +++ b/lib/Worker/AbstractWorker.php @@ -29,6 +29,10 @@ abstract class AbstractWorker implements Worker { * @param \Amp\Parallel\Context $context */ public function __construct(Context $context) { + if ($context->isRunning()) { + throw new \Error("The context was already running"); + } + $this->context = $context; $this->onResolve = function ($exception, $data) { diff --git a/test/Sync/AbstractParcelTest.php b/test/Sync/AbstractParcelTest.php index 22e8d25..fa9904f 100644 --- a/test/Sync/AbstractParcelTest.php +++ b/test/Sync/AbstractParcelTest.php @@ -2,8 +2,8 @@ namespace Amp\Parallel\Test\Sync; -use Amp\Loop; use Amp\PHPUnit\TestCase; +use Amp\Promise; abstract class AbstractParcelTest extends TestCase { /** @@ -11,20 +11,15 @@ abstract class AbstractParcelTest extends TestCase { */ abstract protected function createParcel($value); - public function testConstructor() { - $object = $this->createParcel(new \stdClass()); - $this->assertInternalType('object', $object->unwrap()); - } - public function testUnwrapIsOfCorrectType() { - $object = $this->createParcel(new \stdClass()); - $this->assertInstanceOf('stdClass', $object->unwrap()); + $object = $this->createParcel(new \stdClass); + $this->assertInstanceOf('stdClass', Promise\wait($object->unwrap())); } public function testUnwrapIsEqual() { $object = new \stdClass; $shared = $this->createParcel($object); - $this->assertEquals($object, $shared->unwrap()); + $this->assertEquals($object, Promise\wait($shared->unwrap())); } /** @@ -57,22 +52,4 @@ abstract class AbstractParcelTest extends TestCase { $awaitable->onResolve($callback); } - - /** - * @depends testSynchronized - */ - public function testCloneIsNewParcel() { - Loop::run(function () { - $original = $this->createParcel(1); - - $clone = clone $original; - - $this->assertSame(2, yield $clone->synchronized(function () { - return 2; - })); - - $this->assertSame(1, yield $original->unwrap()); - $this->assertSame(2, $clone->unwrap()); - }); - } } diff --git a/test/Sync/SharedMemoryParcelTest.php b/test/Sync/SharedMemoryParcelTest.php index baca715..bb38f7f 100644 --- a/test/Sync/SharedMemoryParcelTest.php +++ b/test/Sync/SharedMemoryParcelTest.php @@ -2,6 +2,7 @@ namespace Amp\Parallel\Test\Sync; +use Amp\Loop; use Amp\Parallel\Sync\SharedMemoryParcel; use Amp\Promise; @@ -10,62 +11,27 @@ use Amp\Promise; * @requires extension sysvmsg */ class SharedMemoryParcelTest extends AbstractParcelTest { + const ID = __CLASS__; + private $parcel; protected function createParcel($value) { - $this->parcel = new SharedMemoryParcel($value); + $this->parcel = SharedMemoryParcel::create(self::ID, $value); return $this->parcel; } public function tearDown() { - if ($this->parcel !== null) { - $this->parcel->free(); - } - } - - public function testNewObjectIsNotFreed() { - $object = new SharedMemoryParcel(new \stdClass()); - $this->assertFalse($object->isFreed()); - $object->free(); - } - - public function testFreeReleasesObject() { - $object = new SharedMemoryParcel(new \stdClass()); - $object->free(); - $this->assertTrue($object->isFreed()); - } - - /** - * @expectedException \Amp\Parallel\Sync\SharedMemoryException - */ - public function testUnwrapThrowsErrorIfFreed() { - $object = new SharedMemoryParcel(new \stdClass()); - $object->free(); - Promise\wait($object->unwrap()); - } - - public function testCloneIsNewObject() { - $object = new \stdClass; - $shared = new SharedMemoryParcel($object); - $clone = clone $shared; - - $this->assertNotSame($shared, $clone); - $this->assertNotSame($object, Promise\wait($clone->unwrap())); - $this->assertNotEquals($shared->__debugInfo()['id'], $clone->__debugInfo()['id']); - - $clone->free(); - $shared->free(); + $this->parcel = null; } public function testObjectOverflowMoved() { - $object = new SharedMemoryParcel('hi', 14); + $object = SharedMemoryParcel::create(self::ID, 'hi', 2); $awaitable = $object->synchronized(function () { return 'hello world'; }); Promise\wait($awaitable); $this->assertEquals('hello world', Promise\wait($object->unwrap())); - $object->free(); } /** @@ -73,30 +39,37 @@ class SharedMemoryParcelTest extends AbstractParcelTest { * @requires extension pcntl */ public function testSetInSeparateProcess() { - $object = new SharedMemoryParcel(42); + $object = SharedMemoryParcel::create(self::ID, 42); $this->doInFork(function () use ($object) { - $awaitable = $object->synchronized(function () { - return 43; + $awaitable = $object->synchronized(function ($value) { + return $value + 1; }); Promise\wait($awaitable); }); $this->assertEquals(43, Promise\wait($object->unwrap())); - $object->free(); } /** * @group posix * @requires extension pcntl */ - public function testFreeInSeparateProcess() { - $object = new SharedMemoryParcel(42); + public function testInSeparateProcess() { + $parcel = SharedMemoryParcel::create(self::ID, 42); - $this->doInFork(function () use ($object) { - $object->free(); + $this->doInFork(function () { + Loop::run(function () { + $parcel = SharedMemoryParcel::use(self::ID); + $this->assertSame(43, yield $parcel->synchronized(function ($value) { + $this->assertSame(42, $value); + return $value + 1; + })); + }); }); - $this->assertTrue($object->isFreed()); + Loop::run(function () use ($parcel) { + $this->assertSame(43, yield $parcel->unwrap()); + }); } } diff --git a/test/Thread/ParcelTest.php b/test/Thread/ParcelTest.php deleted file mode 100644 index 895dcc4..0000000 --- a/test/Thread/ParcelTest.php +++ /dev/null @@ -1,15 +0,0 @@ -synchronized(function (int $value) { + return $value + 1; + }); + return 0; + }, $parcel); + + $this->assertSame(0, yield $thread->join()); + $this->assertSame($value + 1, yield $parcel->unwrap()); + }); + } +}