Implement blocking http php server for master

This commit is contained in:
Anton Medvedev 2024-10-22 22:19:09 +02:00
parent 67f1c0d200
commit e9d99ca7da
No known key found for this signature in database
13 changed files with 326 additions and 185 deletions

View File

@ -1,5 +1,8 @@
# Upgrade a major version
## Upgrade from 7.x to 8.x
## Upgrade from 6.x to 7.x
### Step 1: Update deploy.php

View File

@ -11,6 +11,7 @@ parameters:
ignoreErrors:
- "#^Constant DEPLOYER_VERSION not found\\.$#"
- "#^Constant DEPLOYER_BIN not found\\.$#"
- "#^Constant MASTER_ENDPOINT not found\\.$#"
- "#CpanelPhp#"
- "#AMQPMessage#"

View File

@ -130,8 +130,6 @@ class MainCommand extends SelectCommand
if (!$plan) {
$this->checkUpdates();
$this->deployer->server->start();
if (!empty($skippedTasks)) {
foreach ($skippedTasks as $taskName) {
$output->writeln("<fg=yellow;options=bold>skip</> $taskName");

View File

@ -45,7 +45,8 @@ class WorkerCommand extends MainCommand
if (!$output->isDecorated() && !defined('NO_ANSI')) {
define('NO_ANSI', 'true');
}
$this->deployer->config->set('master_url', 'http://localhost:' . $input->getOption('port'));
define('MASTER_ENDPOINT', 'http://localhost:' . $input->getOption('port'));
$task = $this->deployer->tasks->get($input->getOption('task'));
$host = $this->deployer->hosts->get($input->getOption('host'));

View File

@ -10,6 +10,7 @@ declare(strict_types=1);
namespace Deployer\Configuration;
use Deployer\Deployer;
use Deployer\Exception\ConfigurationException;
use Deployer\Utility\Httpie;
@ -204,11 +205,13 @@ class Configuration implements \ArrayAccess
public function load(): void
{
if (!$this->has('master_url')) {
if (!Deployer::isWorker()) {
return;
}
$values = Httpie::get($this->get('master_url') . '/load')
$values = Httpie::get(MASTER_ENDPOINT . '/load')
->setopt(CURLOPT_CONNECTTIMEOUT, 0)
->setopt(CURLOPT_TIMEOUT, 0)
->jsonBody([
'host' => $this->get('alias'),
])
@ -218,11 +221,13 @@ class Configuration implements \ArrayAccess
public function save(): void
{
if (!$this->has('master_url')) {
if (!Deployer::isWorker()) {
return;
}
Httpie::get($this->get('master_url') . '/save')
Httpie::get(MASTER_ENDPOINT . '/save')
->setopt(CURLOPT_CONNECTTIMEOUT, 0)
->setopt(CURLOPT_TIMEOUT, 0)
->jsonBody([
'host' => $this->get('alias'),
'config' => $this->persist(),

View File

@ -28,7 +28,6 @@ use Deployer\Component\Ssh\Client;
use Deployer\Configuration\Configuration;
use Deployer\Executor\Master;
use Deployer\Executor\Messenger;
use Deployer\Executor\Server;
use Deployer\Host\Host;
use Deployer\Host\HostCollection;
use Deployer\Host\Localhost;
@ -37,7 +36,6 @@ use Deployer\Logger\Handler\FileHandler;
use Deployer\Logger\Handler\NullHandler;
use Deployer\Logger\Logger;
use Deployer\Selector\Selector;
use Deployer\Task;
use Deployer\Task\ScriptManager;
use Deployer\Task\TaskCollection;
use Deployer\Utility\Httpie;
@ -66,7 +64,6 @@ use Throwable;
* @property ProcessRunner $processRunner
* @property Task\ScriptManager $scriptManager
* @property Selector $selector
* @property Server $server
* @property Master $master
* @property Messenger $messenger
* @property Messenger $logger
@ -79,9 +76,8 @@ class Deployer extends Container
{
/**
* Global instance of deployer. It's can be accessed only after constructor call.
* @var Deployer
*/
private static $instance;
private static Deployer $instance;
public function __construct(Application $console)
{
@ -163,17 +159,11 @@ class Deployer extends Container
$this['messenger'] = function ($c) {
return new Messenger($c['input'], $c['output'], $c['logger']);
};
$this['server'] = function ($c) {
return new Server(
$c['output'],
$this,
);
};
$this['master'] = function ($c) {
return new Master(
$c['hosts'],
$c['input'],
$c['output'],
$c['server'],
$c['messenger'],
);
};
@ -351,7 +341,7 @@ class Deployer extends Container
public static function isWorker(): bool
{
return Deployer::get()->config->has('master_url');
return defined('MASTER_ENDPOINT');
}
/**
@ -359,13 +349,14 @@ class Deployer extends Container
* @return array|bool|string
* @throws \Exception
*/
public static function proxyCallToMaster(Host $host, string $func, ...$arguments)
public static function masterCall(Host $host, string $func, ...$arguments)
{
// As request to master will stop master permanently,
// wait a little bit in order for periodic timer of
// master gather worker outputs and print it to user.
usleep(100000); // Sleep 100ms.
return Httpie::get(get('master_url') . '/proxy')
// As request to master will stop master permanently, wait a little bit
// in order for ticker gather worker outputs and print it to user.
usleep(100_000); // Sleep 100ms.
return Httpie::get(MASTER_ENDPOINT . '/proxy')
->setopt(CURLOPT_CONNECTTIMEOUT, 0) // no timeout
->setopt(CURLOPT_TIMEOUT, 0) // no timeout
->jsonBody([
'host' => $host->getAlias(),

View File

@ -13,9 +13,12 @@ namespace Deployer\Executor;
use Deployer\Component\Ssh\Client;
use Deployer\Component\Ssh\IOArguments;
use Deployer\Deployer;
use Deployer\Exception\Exception;
use Deployer\Host\Host;
use Deployer\Host\HostCollection;
use Deployer\Host\Localhost;
use Deployer\Selector\Selector;
use Deployer\Task\Context;
use Deployer\Task\Task;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
@ -32,40 +35,21 @@ function spinner(string $message = ''): string
class Master
{
/**
* @var InputInterface
*/
private $input;
/**
* @var OutputInterface
*/
private $output;
/**
* @var Server
*/
private $server;
/**
* @var Messenger
*/
private $messenger;
/**
* @var false|string
*/
private $phpBin;
private HostCollection $hosts;
private InputInterface $input;
private OutputInterface $output;
private Messenger $messenger;
private string|false $phpBin;
public function __construct(
HostCollection $hosts,
InputInterface $input,
OutputInterface $output,
Server $server,
Messenger $messenger,
) {
$this->hosts = $hosts;
$this->input = $input;
$this->output = $output;
$this->server = $server;
$this->messenger = $messenger;
$this->phpBin = (new PhpExecutableFinder())->find();
}
@ -182,19 +166,14 @@ class Master
return 0;
}
$callback = function (string $output) {
$output = preg_replace('/\n$/', '', $output);
if (strlen($output) !== 0) {
$this->output->writeln($output);
}
};
$server = new Server('127.0.0.1', 0, $this->output);
/** @var Process[] $processes */
$processes = [];
$this->server->loop->futureTick(function () use (&$processes, $hosts, $task) {
$server->afterRun(function (int $port) use (&$processes, $hosts, $task) {
foreach ($hosts as $host) {
$processes[] = $this->createProcess($host, $task);
$processes[] = $this->createProcess($host, $task, $port);
}
foreach ($processes as $process) {
@ -202,23 +181,61 @@ class Master
}
});
$this->server->loop->addPeriodicTimer(0.03, function ($timer) use (&$processes, $callback) {
$this->gatherOutput($processes, $callback);
$echoCallback = function (string $output) {
$output = preg_replace('/\n$/', '', $output);
if (strlen($output) !== 0) {
$this->output->writeln($output);
}
};
$server->ticker(function () use (&$processes, $server, $echoCallback) {
$this->gatherOutput($processes, $echoCallback);
if ($this->output->isDecorated() && !getenv('CI')) {
$this->output->write(spinner());
}
if ($this->allFinished($processes)) {
$this->server->loop->stop();
$this->server->loop->cancelTimer($timer);
$server->stop();
}
});
$this->server->loop->run();
$server->router(function (string $path, array $payload) {
switch ($path) {
case '/load':
['host' => $host] = $payload;
$host = $this->hosts->get($host);
$config = $host->config()->persist();
return new Response(200, $config);
case '/save':
['host' => $host, 'config' => $config] = $payload;
$host = $this->hosts->get($host);
$host->config()->update($config);
return new Response(200, true);
case '/proxy':
['host' => $host, 'func' => $func, 'arguments' => $arguments] = $payload;
Context::push(new Context($this->hosts->get($host)));
$answer = call_user_func($func, ...$arguments);
Context::pop();
return new Response(200, $answer);
default:
return new Response(404, null);
}
});
$server->run();
if ($this->output->isDecorated() && !getenv('CI')) {
$this->output->write(" \r"); // clear spinner
}
$this->gatherOutput($processes, $callback);
$this->gatherOutput($processes, $echoCallback);
if ($this->cumulativeExitCode($processes) !== 0) {
$this->messenger->endTask($task, true);
@ -227,11 +244,11 @@ class Master
return $this->cumulativeExitCode($processes);
}
protected function createProcess(Host $host, Task $task): Process
protected function createProcess(Host $host, Task $task, int $port): Process
{
$command = [
$this->phpBin, DEPLOYER_BIN,
'worker', '--port', $this->server->getPort(),
'worker', '--port', $port,
'--task', $task,
'--host', $host->getAlias(),
];

33
src/Executor/Response.php Normal file
View File

@ -0,0 +1,33 @@
<?php
declare(strict_types=1);
/* (c) Anton Medvedev <anton@medv.io>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Deployer\Executor;
class Response
{
private int $status;
private mixed $body;
public function __construct(int $status, mixed $body)
{
$this->status = $status;
$this->body = $body;
}
public function getStatus(): int
{
return $this->status;
}
public function getBody(): mixed
{
return $this->body;
}
}

View File

@ -10,106 +10,221 @@ declare(strict_types=1);
namespace Deployer\Executor;
use Deployer\Deployer;
use Closure;
use Deployer\Exception\Exception;
use Deployer\Task\Context;
use Psr\Http\Message\ServerRequestInterface;
use React;
use React\EventLoop\Loop;
use React\Http\HttpServer;
use React\Http\Message\Response;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Throwable;
class Server
{
/**
* @var OutputInterface
*/
private $output;
private string $host;
private int $port;
private OutputInterface $output;
private bool $stop = false;
/**
* @var Deployer
* @var ?resource
*/
private $deployer;
private $socket;
/**
* @var React\EventLoop\LoopInterface
* @var resource[]
*/
public $loop;
private array $clientSockets = [];
/**
* @var int
*/
private $port;
private Closure $afterCallback;
private Closure $tickerCallback;
private Closure $routerCallback;
public function __construct(
OutputInterface $output,
Deployer $deployer,
) {
public function __construct($host, $port, OutputInterface $output)
{
self::checkRequiredExtensionsExists();
$this->host = $host;
$this->port = $port;
$this->output = $output;
$this->deployer = $deployer;
}
public function start()
public static function checkRequiredExtensionsExists(): void
{
$this->loop = Loop::get();
$server = new HttpServer(
$this->loop,
new React\Http\Middleware\StreamingRequestMiddleware(),
new React\Http\Middleware\RequestBodyBufferMiddleware(16 * 1024 * 1024), // 16 MiB
function (ServerRequestInterface $request) {
try {
return $this->router($request);
} catch (Throwable $exception) {
Deployer::printException($this->output, $exception);
return new React\Http\Message\Response(500, ['Content-Type' => 'text/plain'], 'Master error: ' . $exception->getMessage());
}
},
);
$socket = new React\Socket\Server(0, $this->loop);
$server->listen($socket);
$address = $socket->getAddress();
$this->port = parse_url($address, PHP_URL_PORT);
}
private function router(ServerRequestInterface $request): Response
{
$path = $request->getUri()->getPath();
switch ($path) {
case '/load':
['host' => $host] = json_decode((string) $request->getBody(), true);
$host = $this->deployer->hosts->get($host);
$config = json_encode($host->config()->persist());
return new Response(200, ['Content-Type' => 'application/json'], $config);
case '/save':
['host' => $host, 'config' => $config] = json_decode((string) $request->getBody(), true);
$host = $this->deployer->hosts->get($host);
$host->config()->update($config);
return new Response(200, ['Content-Type' => 'application/json'], 'true');
case '/proxy':
['host' => $host, 'func' => $func, 'arguments' => $arguments] = json_decode((string) $request->getBody(), true);
Context::push(new Context($this->deployer->hosts->get($host)));
$answer = call_user_func($func, ...$arguments);
Context::pop();
return new Response(200, ['Content-Type' => 'application/json'], json_encode($answer));
default:
throw new Exception('Server path not found: ' . $request->getUri()->getPath());
if (!function_exists('socket_import_stream')) {
throw new Exception('Required PHP extension "sockets" is not loaded');
}
if (!function_exists('stream_set_blocking')) {
throw new Exception('Required PHP extension "stream" is not loaded');
}
}
public function getPort(): int
public function run(): void
{
return $this->port;
try {
$this->socket = $this->createServerSocket();
$this->updatePort();
if ($this->output->isDebug()) {
$this->output->writeln("[master] Starting server at http://{$this->host}:{$this->port}");
}
($this->afterCallback)($this->port);
while (true) {
$this->acceptNewConnections();
$this->handleClientRequests();
// Prevent CPU exhaustion and 60fps ticker.
usleep(16_000); // 16ms
($this->tickerCallback)();
if ($this->stop) {
break;
}
}
if ($this->output->isDebug()) {
$this->output->writeln("[master] Stopping server at http://{$this->host}:{$this->port}");
}
} finally {
if (isset($this->socket)) {
fclose($this->socket);
}
}
}
/**
* @return resource
* @throws Exception
*/
private function createServerSocket()
{
$server = stream_socket_server("tcp://{$this->host}:{$this->port}", $errno, $errstr);
if (!$server) {
throw new Exception("Socket creation failed: $errstr ($errno)");
}
if (!stream_set_blocking($server, false)) {
throw new Exception("Failed to set server socket to non-blocking mode");
}
return $server;
}
private function updatePort(): void
{
$name = stream_socket_get_name($this->socket, false);
if ($name) {
list(, $port) = explode(':', $name);
$this->port = (int) $port;
} else {
throw new Exception("Failed to get the assigned port");
}
}
private function acceptNewConnections(): void
{
$newClientSocket = @stream_socket_accept($this->socket, 0);
if ($newClientSocket) {
if (!stream_set_blocking($newClientSocket, false)) {
throw new Exception("Failed to set client socket to non-blocking mode");
}
$this->clientSockets[] = $newClientSocket;
}
}
private function handleClientRequests(): void
{
foreach ($this->clientSockets as $key => $clientSocket) {
if (feof($clientSocket)) {
$this->closeClientSocket($clientSocket, $key);
continue;
}
$request = $this->readClientRequest($clientSocket);
list($path, $payload) = $this->parseRequest($request);
$response = ($this->routerCallback)($path, $payload);
$this->sendResponse($clientSocket, $response);
$this->closeClientSocket($clientSocket, $key);
}
}
private function readClientRequest($clientSocket)
{
$request = '';
while (($chunk = @fread($clientSocket, 1024)) !== false) {
$request .= $chunk;
if (strpos($request, "\r\n\r\n") !== false) {
break;
}
}
if ($chunk === false && !feof($clientSocket)) {
throw new Exception("Socket read failed");
}
return $request;
}
private function parseRequest($request)
{
$lines = explode("\r\n", $request);
$requestLine = $lines[0];
$parts = explode(' ', $requestLine);
if (count($parts) !== 3) {
throw new Exception("Malformed request line: $requestLine");
}
$path = $parts[1];
$headers = [];
for ($i = 1; $i < count($lines); $i++) {
$line = $lines[$i];
if (empty($line)) {
break;
}
[$key, $value] = explode(':', $line, 2);
$headers[$key] = trim($value);
}
if (empty($headers['Content-Type']) || $headers['Content-Type'] !== 'application/json') {
throw new Exception("Malformed request: invalid Content-Type");
}
$payload = json_decode(implode("\n", array_slice($lines, $i + 1)), true, flags: JSON_THROW_ON_ERROR);
return [$path, $payload];
}
private function sendResponse($clientSocket, Response $response)
{
$code = $response->getStatus();
$content = json_encode($response->getBody(), flags: JSON_PRETTY_PRINT);
$headers = "HTTP/1.1 $code OK\r\n" .
"Content-Type: application/json\r\n" .
"Content-Length: " . strlen($content) . "\r\n" .
"Connection: close\r\n\r\n";
fwrite($clientSocket, $headers . $content);
}
private function closeClientSocket($clientSocket, $key): void
{
fclose($clientSocket);
unset($this->clientSockets[$key]);
}
public function afterRun(Closure $param): void
{
$this->afterCallback = $param;
}
public function ticker(Closure $param): void
{
$this->tickerCallback = $param;
}
public function router(Closure $param)
{
$this->routerCallback = $param;
}
public function stop(): void
{
$this->stop = true;
}
}

View File

@ -21,10 +21,7 @@ use Throwable;
class Worker
{
/**
* @var Deployer
*/
private $deployer;
private Deployer $deployer;
public function __construct(Deployer $deployer)
{

View File

@ -13,6 +13,7 @@ namespace Deployer\Importer;
use Deployer\Exception\ConfigurationException;
use Deployer\Exception\Exception;
use Symfony\Component\Yaml\Yaml;
use function array_filter;
use function array_keys;
use function Deployer\after;
@ -27,6 +28,7 @@ use function Deployer\set;
use function Deployer\Support\find_line_number;
use function Deployer\task;
use function Deployer\upload;
use const ARRAY_FILTER_USE_KEY;
class Importer

View File

@ -14,30 +14,12 @@ use Deployer\Exception\HttpieException;
class Httpie
{
/**
* @var string
*/
private $method = 'GET';
/**
* @var string
*/
private $url = '';
/**
* @var array
*/
private $headers = [];
/**
* @var string
*/
private $body = '';
/**
* @var array
*/
private $curlopts = [];
/**
* @var bool
*/
private $nothrow = false;
private string $method = 'GET';
private string $url = '';
private array $headers = [];
private string $body = '';
private array $curlopts = [];
private bool $nothrow = false;
public function __construct()
{
@ -109,7 +91,6 @@ class Httpie
$http = clone $this;
$http->body = $body;
$http->headers = array_merge($http->headers, [
'Content-Type' => 'application/json',
'Content-Length' => strlen($http->body),
]);
return $http;
@ -192,10 +173,7 @@ class Httpie
return $result;
}
/**
* @return mixed
*/
public function getJson()
public function getJson(): mixed
{
$result = $this->send();
$response = json_decode($result, true);

View File

@ -749,7 +749,7 @@ function ask(string $message, ?string $default = null, ?array $autocomplete = nu
}
if (Deployer::isWorker()) {
return Deployer::proxyCallToMaster(currentHost(), __FUNCTION__, ...func_get_args());
return Deployer::masterCall(currentHost(), __FUNCTION__, ...func_get_args());
}
/** @var QuestionHelper */
@ -795,7 +795,7 @@ function askChoice(string $message, array $availableChoices, $default = null, bo
}
if (Deployer::isWorker()) {
return Deployer::proxyCallToMaster(currentHost(), __FUNCTION__, ...func_get_args());
return Deployer::masterCall(currentHost(), __FUNCTION__, ...func_get_args());
}
/** @var QuestionHelper */
@ -823,7 +823,7 @@ function askConfirmation(string $message, bool $default = false): bool
}
if (Deployer::isWorker()) {
return Deployer::proxyCallToMaster(currentHost(), __FUNCTION__, ...func_get_args());
return Deployer::masterCall(currentHost(), __FUNCTION__, ...func_get_args());
}
/** @var QuestionHelper */
@ -851,7 +851,7 @@ function askHiddenResponse(string $message): string
}
if (Deployer::isWorker()) {
return (string) Deployer::proxyCallToMaster(currentHost(), __FUNCTION__, ...func_get_args());
return (string) Deployer::masterCall(currentHost(), __FUNCTION__, ...func_get_args());
}
/** @var QuestionHelper */