mirror of
https://github.com/amphp/parallel.git
synced 2025-01-16 20:28:21 +01:00
Add IPC docs
This commit is contained in:
parent
f013263aff
commit
7b53bad38e
135
README.md
135
README.md
@ -13,7 +13,7 @@ To be as flexible as possible, this library comes with a collection of non-block
|
||||
- PHP 8.1+
|
||||
|
||||
#### Optional requirements to use threads instead of processes
|
||||
- PHP 8.2+ ZTS build
|
||||
- PHP 8.2+ ZTS
|
||||
- [`ext-parallel`](https://github.com/krakjoe/parallel)
|
||||
|
||||
## Installation
|
||||
@ -187,7 +187,7 @@ The return value of the child callable is available using the `Context::join()`
|
||||
#### Child Process or Thread
|
||||
|
||||
```php
|
||||
# child.php
|
||||
// child.php
|
||||
|
||||
use Amp\Sync\Channel;
|
||||
|
||||
@ -205,7 +205,7 @@ return function (Channel $channel): mixed {
|
||||
#### Parent Process
|
||||
|
||||
```php
|
||||
# parent.php
|
||||
// parent.php
|
||||
|
||||
use Amp\Parallel\Context\ProcessContext;
|
||||
|
||||
@ -228,6 +228,15 @@ Child processes or threads are also great for CPU-intensive operations such as i
|
||||
|
||||
An execution context can be created using the function `Amp\Parallel\Context\startContext()`, which uses the global `ContextFactory`. The global factory is an instance of `DefaultContextFactory` by default, but this instance can be overridden using the function `Amp\Parallel\Context\contextFactory()`.
|
||||
|
||||
```php
|
||||
// Using the global context factory from Amp\Parallel\Context\contextFactory()
|
||||
$context = Amp\Parallel\Context\startContext(__DIR__ . '/child.php');
|
||||
|
||||
// Creating a specific context factory and using it to create a context.
|
||||
$contextFactory = new Amp\Parallel\Context\ProcessContextFactory();
|
||||
$context = $contextFactory->start(__DIR__ . '/child.php');
|
||||
```
|
||||
|
||||
Context factories are used by worker pools to create the context which executes tasks. Providing a custom `ContextFactory` to a worker pool allows custom bootstrapping or other behavior within pool workers.
|
||||
|
||||
An execution context can be created by a `ContextFactory`. The worker pool uses context factories to create workers.
|
||||
@ -236,6 +245,126 @@ A global worker pool is available and can be set using the function `Amp\Paralle
|
||||
Passing an instance of `WorkerPool` will set the global pool to the given instance.
|
||||
Invoking the function without an instance will return the current global instance.
|
||||
|
||||
### IPC
|
||||
|
||||
A context is created with a single `Channel` which may be used to bidirectionally send data between the parent and child. Channels are a high-level data exchange, allowing serializable data to be sent over a channel. The `Channel` implementation handles serializing and unserializing data, message framing, and chunking over the underlying socket between the parent and child.
|
||||
|
||||
> **Note**
|
||||
> Channels should be used to send only _data_ between the parent and child. Attempting to send resources such as database connections or file handles on a channel will not work. Such resources should be opened in each child process.
|
||||
> One notable exception to this rule: server and client network sockets may be sent between parent and child using tools provided by [`amphp/cluster`](https://github.com/amphp/cluster).
|
||||
|
||||
The example code below defines a class, `AppMessage`, containing a message type enum and the associated message data which is dependent upon the enum case. All messages sent over the channel between the parent and child use an instance of `AppMessage` to define message intent. Alternatively, the child could use a different class for replies, but that was not done here for the sake of brevity. Any messaging strategy may be employed which is best suited your application, the only requirement is that any structure sent over a channel must be serializable.
|
||||
|
||||
The example below sends a message to the child to process an image after receiving a path from STDIN, then waits for the reply from the child. When an empty path is provided, the parent sends `null` to the child to break the child out of the message loop and waits for the child to exit before exiting itself.
|
||||
|
||||
```php
|
||||
// AppMessage.php
|
||||
|
||||
class AppMessage {
|
||||
public function __construct(
|
||||
public readonly AppMessageType $type,
|
||||
public readonly mixed $data,
|
||||
) {
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
```php
|
||||
// AppMessageType.php
|
||||
|
||||
enum AppMessageType {
|
||||
case ProcessedImage;
|
||||
case ProcessImageFromPath;
|
||||
// Other enum cases for further message types...
|
||||
}
|
||||
```
|
||||
|
||||
```php
|
||||
// parent.php
|
||||
|
||||
use Amp\Parallel\Context\ProcessContextFactory;
|
||||
|
||||
$contextFactory = new ProcessContextFactory();
|
||||
$context = $contextFactory->start(__DIR__ . '/child.php');
|
||||
|
||||
$stdin = Amp\ByteStream\getStdin();
|
||||
|
||||
while ($path = $stdin->read()) {
|
||||
$message = new AppMessage(AppMessageType::ProcessImageFromPath, $path);
|
||||
$context->send($message);
|
||||
|
||||
$reply = $context->receive(); // Wait for reply from child context with processed image data.
|
||||
}
|
||||
|
||||
$context->send(null); // End loop in child process.
|
||||
$context->join();
|
||||
```
|
||||
|
||||
```php
|
||||
// child.php
|
||||
|
||||
use Amp\Sync\Channel;
|
||||
|
||||
return function (Channel $channel): void {
|
||||
/** @var AppMessage|null $message */
|
||||
while ($message = $channel->receive()) {
|
||||
$reply = match ($message->type) {
|
||||
AppMessageType::ProcessImageFromPath => new AppMessage(
|
||||
AppMessageType::ProcessedImage,
|
||||
ImageProcessor::process($message->data),
|
||||
),
|
||||
// Handle other message types...
|
||||
}
|
||||
|
||||
$channel->send($reply);
|
||||
}
|
||||
};
|
||||
```
|
||||
|
||||
#### Creating an IPC socket
|
||||
|
||||
Sometimes it is necessary to create another socket for specialized IPC between a parent and child context. One such example is sending sockets between a parent and child process using `ClientSocketReceivePipe` and `ClientSocketSendPipe`, which are found in [`amphp/cluster`](https://github.com/amphp/cluster). An instance of `IpcHub` in the parent and the `Amp\Parallel\Ipc\connect()` function in the child.
|
||||
|
||||
The example below creates a separate IPC socket between a parent and child, then uses [`amphp/cluster`](https://github.com/amphp/cluster) to create instances of `ClientSocketReceivePipe` and `ClientSocketSendPipe` in the parent and child, respectively.
|
||||
|
||||
```php
|
||||
// parent.php
|
||||
use Amp\Cluster\ClientSocketSendPipe;
|
||||
use Amp\Parallel\Context\ProcessContextFactory;
|
||||
use Amp\Parallel\Ipc\LocalIpcHub;
|
||||
|
||||
$ipcHub = new LocalIpcHub();
|
||||
|
||||
// Sharing the IpcHub instance with the context factory isn't required,
|
||||
// but reduces the number of opened sockets.
|
||||
$contextFactory = new ProcessContextFactory(ipcHub: $ipcHub);
|
||||
|
||||
$context = $contextFactory->start(__DIR__ . '/child.php');
|
||||
|
||||
$connectionKey = $ipcHub->generateKey();
|
||||
$context->send(['uri' => $ipcHub->getUri(), 'key' => $connectionKey]);
|
||||
|
||||
// $socket will be a bidirectional socket to the child.
|
||||
$socket = $ipcHub->accept($connectionKey);
|
||||
|
||||
$socketPipe = new ClientSocketSendPipe($socket);
|
||||
```
|
||||
|
||||
```php
|
||||
// child.php
|
||||
use Amp\Cluster\ClientSocketReceivePipe;
|
||||
use Amp\Sync\Channel;
|
||||
|
||||
return function (Channel $channel): void {
|
||||
['uri' => $uri, 'key' => $connectionKey] = $channel->receive();
|
||||
|
||||
// $socket will be a bidirectional socket to the parent.
|
||||
$socket = Amp\Parallel\Ipc\connect($uri, $connectionKey);
|
||||
|
||||
$socketPipe = new ClientSocketReceivePipe($socket);
|
||||
};
|
||||
```
|
||||
|
||||
### Debugging
|
||||
|
||||
Step debugging may be used in child processes with PhpStorm and Xdebug by listening for debug connections in the IDE.
|
||||
|
Loading…
x
Reference in New Issue
Block a user