Big parallel execution refactoring 💎

This commit is contained in:
elfet 2015-02-20 17:14:37 +03:00
parent 9a3287ba89
commit a4a0b334a9
4 changed files with 317 additions and 204 deletions

View File

@ -47,20 +47,12 @@ class Informer
/**
* Print task was ok.
*/
public function endTask($success = true)
public function endTask()
{
if ($success) {
$message = "<info>✔</info>";
$append = 'OK';
} else {
$message = "<fg=red>✘</fg=red>";
$append = 'Error';
}
if ($this->output->getVerbosity() == OutputInterface::VERBOSITY_NORMAL && !$this->output->getWasWritten()) {
$this->output->write("\033[k\033[1A{$message}\n");
$this->output->write("\033[k\033[1A<info>✔</info>\n");
} else {
$this->output->writeln("{$message} {$append}");
$this->output->writeln("<info>✔</info> Ok");
}
}
@ -86,17 +78,33 @@ class Informer
/**
* Print error.
*
* @param \Excetion|null $exception
*
* @param bool $nonFatal
*/
public function taskError(\Exception $exception = null)
public function taskError($nonFatal = true)
{
if ($exception instanceof NonFatalException) {
$message = sprintf('Error: %s', $exception->getMessage());
if ($nonFatal) {
$this->output->writeln("<fg=yellow>✘</fg=yellow> Some errors occurred!");
} else {
$message = 'Some errors occurred!';
$this->output->writeln("<fg=red>✘</fg=red> <options=underscore>Some errors occurred!</options=underscore>");
}
}
$this->output->writeln("<fg=red>✘</fg=red> <options=underscore>{$message}</options=underscore>");
/**
* @param string $serverName
* @param string $exceptionClass
* @param string $message
*/
public function taskException($serverName, $exceptionClass, $message)
{
$message = " $message ";
$this->output->writeln([
"",
"<error>Exception [$exceptionClass] on [$serverName] server</error>",
"<error>" . str_repeat(' ', strlen($message)) . "</error>",
"<error>$message</error>",
"<error>" . str_repeat(' ', strlen($message)) . "</error>",
""
]);
}
}

View File

@ -20,207 +20,301 @@ use Symfony\Component\Process\Process;
class ParallelExecutor implements ExecutorInterface
{
/**
* Try to start server on this port.
*/
const START_PORT = 3333;
/**
* If fails on start port, try until stop port.
*/
const STOP_PORT = 3340;
/**
* @var \Deployer\Task\Task[]
*/
private $tasks;
/**
* @var \Deployer\Server\ServerInterface[]
*/
private $servers;
/**
* @var \Deployer\Server\Environment[]
*/
private $environments;
/**
* @var \Symfony\Component\Console\Input\InputInterface
*/
private $input;
/**
* @var \Symfony\Component\Console\Output\OutputInterface
*/
private $output;
/**
* @var Informer
*/
private $informer;
/**
* @var int
*/
private $port;
/**
* @var Server
*/
private $pure;
/**
* @var \React\EventLoop\LoopInterface
*/
private $loop;
/**
* Wait until all workers finish they tasks. When set this variable to true and send new tasks to workers.
*
* @var bool
*/
private $wait = false;
/**
* @var ArrayStorage
*/
private $outputStorage;
/**
* @var ArrayStorage
*/
private $exceptionStorage;
/**
* Array will contain tasks list what workers has to before moving to next task.
*
* @var array
*/
private $tasksToDo = [];
/**
* Check if current task was successfully finished on all server (no exception was triggered).
*
* @var bool
*/
private $isSuccessfullyFinished = true;
/**
* Check if current task triggered a non-fatal exception.
*
* @var bool
*/
private $hasNonFatalException = false;
/**
* {@inheritdoc}
*/
public function run($tasks, $servers, $environments, $input, $output)
{
$output = new OutputWatcher($output);
$informer = new Informer($output);
$port = 3333;
$this->tasks = $tasks;
$this->servers = $servers;
$this->environments = $environments;
$this->input = $input;
$this->output = new OutputWatcher($output);
$this->informer = new Informer($this->output);
$this->port = self::START_PORT;
start:
$pure = new Server($port);
$loop = $pure->getLoop();
connect:
$outputStorage = $pure['output'] = new QueueStorage();
$exceptionStorage = $pure['exception'] = new QueueStorage();
// Wait until all workers finish they tasks. When set this variable to true and send new tasks to workers.
$wait = false;
// Array will contain tasks list what workers has to before moving to next task.
$tasksToDo = [];
// Check if current task triggered a fatal exception
$hasFatalException = false;
// Check if current task triggered a non-fatal exception
$hasNonFatalException = false;
// Get verbosity.
$verbosity = new VerbosityString($output);
// Get current deploy.php file.
$deployPhpFile = $input->getOption('file');
$this->pure = new Server($this->port);
$this->loop = $this->pure->getLoop();
// Start workers for each server.
$loop->addTimer(0, function () use (
$servers,
$port,
$verbosity,
$deployPhpFile
) {
foreach ($servers as $serverName => $server) {
$workerInput = new ArrayInput([
'--master' => "127.0.0.1:$port",
'--server' => $serverName,
]);
$process = new Process(
"php " . DEPLOYER_BIN .
(null === $deployPhpFile ? "" : " --file=$deployPhpFile") .
" worker $workerInput" .
" $verbosity" .
" &"
);
$process->disableOutput();
$process->run();
}
});
$this->loop->addTimer(0, [$this, 'startWorkers']);
// Wait for output
$loop->addPeriodicTimer(0, function () use ($output, $outputStorage) {
while (count($outputStorage) > 0) {
list($server, $messages, , $type) = $outputStorage->pop();
$format = function ($message) use ($server) {
$message = rtrim($message, "\n");
return implode("\n", array_map(function ($text) use ($server) {
return "[$server] $text";
}, explode("\n", $message)));
};
$output->writeln(array_map($format, (array)$messages), $type);
}
});
// Send workers tasks to do.
$loop->addPeriodicTimer(0, function () use (
&$wait,
&$tasks,
&$tasksToDo,
$servers,
$informer,
$input,
$output,
$loop,
$pure
) {
if (!$wait) {
if (count($tasks) > 0) {
$task = current($tasks);
$taskName = key($tasks);
array_shift($tasks);
$informer->startTask($taskName);
if ($task->isOnce()) {
$task->run(new Context(null, null, $input, $output));
$informer->endTask();
} else {
$tasksToDo = [];
foreach ($servers as $serverName => $server) {
if ($task->runOnServer($serverName)) {
$informer->onServer($serverName);
$tasksToDo[$serverName] = $taskName;
}
}
// Inform all workers what tasks they need to do.
$taskToDoStorage = new ArrayStorage();
$taskToDoStorage->push($tasksToDo);
$pure->setStorage('tasks_to_do', $taskToDoStorage);
$wait = true;
}
} else {
$loop->stop();
}
}
});
// Wait all workers finish they tasks.
$loop->addPeriodicTimer(0, function () use (
&$wait,
&$tasksToDo,
&$hasFatalException,
&$hasNonFatalException,
$pure,
$informer
) {
if ($wait) {
$taskToDoStorage = $pure->getStorage('tasks_to_do');
foreach ($tasksToDo as $serverName => $taskName) {
if (!$taskToDoStorage->has($serverName)) {
$informer->endOnServer($serverName);
unset($tasksToDo[$serverName]);
}
}
if (count($taskToDoStorage) === 0) {
if ($hasFatalException) {
$informer->endTask($hasFatalException === false);
} else {
$informer->taskError();
}
$wait = false;
}
}
});
$this->outputStorage = $this->pure['output'] = new QueueStorage();
$this->loop->addPeriodicTimer(0, [$this, 'catchOutput']);
// Lookup for exception
$loop->addPeriodicTimer(0, function () use (
&$tasks,
&$hasFatalException,
&$hasNonFatalException,
$pure,
$exceptionStorage,
$output,
$informer
) {
while (count($exceptionStorage) > 0) {
list($serverName, $exceptionClass, $message) = $exceptionStorage->pop();
$this->exceptionStorage = $this->pure['exception'] = new QueueStorage();
$this->loop->addPeriodicTimer(0, [$this, 'catchExceptions']);
if ($exceptionClass !== 'Deployer\Task\NonFatalException') {
$message = " $message ";
$output->writeln("");
$output->writeln("<error>Exception [$exceptionClass] on [$serverName] server</error>");
$output->writeln("<error>" . str_repeat(' ', strlen($message)) . "</error>");
$output->writeln("<error>$message</error>");
$output->writeln("<error>" . str_repeat(' ', strlen($message)) . "</error>");
$output->writeln("");
$hasFatalException = true;
} else {
$hasNonFatalException = true;
$informer->taskError(new NonFatalException($message));
}
// Do not run other task.
// Finish all current worker tasks and stop loop.
$tasks = [];
$taskToDoStorage = $pure->getStorage('tasks_to_do');
$taskToDoStorage->delete($serverName);
}
});
// Send workers tasks to do.
$this->loop->addPeriodicTimer(0, [$this, 'sendTasks']);
// Wait all workers finish they tasks.
$this->loop->addPeriodicTimer(0, [$this, 'idle']);
// Start loop
try {
$pure->run();
$this->pure->run();
} catch (ConnectionException $exception) {
// If port is already used, try with another one.
$output->writeln("<error>" . $exception->getMessage() . "</error>");
$port++;
goto start;
if (++$this->port <= self::STOP_PORT) {
goto connect;
}
}
}
/**
* Start workers, put master port, server name to run on, and options stuff.
*/
public function startWorkers()
{
// Get verbosity.
$verbosity = new VerbosityString($this->output);
// Get current deploy.php file.
$deployPhpFile = $this->input->getOption('file');
foreach ($this->servers as $serverName => $server) {
$workerInput = new ArrayInput([
'--master' => '127.0.0.1:' . $this->port,
'--server' => $serverName,
]);
$process = new Process(
"php " . DEPLOYER_BIN .
(null === $deployPhpFile ? "" : " --file=$deployPhpFile") .
" worker $workerInput" .
" $verbosity" .
" &"
);
$process->disableOutput();
$process->run();
}
}
/**
* Wait for output from workers.
*/
public function catchOutput()
{
while (count($this->outputStorage) > 0) {
list($server, $messages, , $type) = $this->outputStorage->pop();
$format = function ($message) use ($server) {
$message = rtrim($message, "\n");
return implode("\n", array_map(function ($text) use ($server) {
return "[$server] $text";
}, explode("\n", $message)));
};
$this->output->writeln(array_map($format, (array)$messages), $type);
}
}
/**
* Wait for exceptions from workers.
*/
public function catchExceptions()
{
while (count($this->exceptionStorage) > 0) {
list($serverName, $exceptionClass, $message) = $this->exceptionStorage->pop();
// Print exception message.
$this->informer->taskException($serverName, $exceptionClass, $message);
// We got some exception, so not.
$this->isSuccessfullyFinished = false;
if ($exceptionClass == 'Deployer\Task\NonFatalException') {
// If we got NonFatalException, continue other tasks.
$this->hasNonFatalException = true;
} else {
// Do not run other task.
// Finish all current worker tasks and stop loop.
$this->tasks = [];
// Worker will not mark this task as done (remove self server name from `tasks_to_do` list),
// so to finish current task execution we need to manually remove it from that list.
$taskToDoStorage = $this->pure->getStorage('tasks_to_do');
$taskToDoStorage->delete($serverName);
}
}
}
/**
* Action time for master! Send tasks `to-do` for workers and go to sleep.
* Also decide when to stop server/loop.
*/
public function sendTasks()
{
if (!$this->wait) {
if (count($this->tasks) > 0) {
// Get task name to do.
$task = current($this->tasks);
$taskName = key($this->tasks);
array_shift($this->tasks);
$this->informer->startTask($taskName);
if ($task->isOnce()) {
$task->run(new Context(null, null, $this->input, $this->output));
$this->informer->endTask();
} else {
$this->tasksToDo = [];
foreach ($this->servers as $serverName => $server) {
if ($task->runOnServer($serverName)) {
$this->informer->onServer($serverName);
$this->tasksToDo[$serverName] = $taskName;
}
}
// Inform all workers what tasks they need to do.
$taskToDoStorage = new ArrayStorage();
$taskToDoStorage->push($this->tasksToDo);
$this->pure->setStorage('tasks_to_do', $taskToDoStorage);
$this->wait = true;
}
} else {
$this->loop->stop();
}
}
}
/**
* While idle master, print information about finished tasks.
*/
public function idle()
{
if ($this->wait) {
$taskToDoStorage = $this->pure->getStorage('tasks_to_do');
foreach ($this->tasksToDo as $serverName => $taskName) {
if (!$taskToDoStorage->has($serverName)) {
$this->informer->endOnServer($serverName);
unset($this->tasksToDo[$serverName]);
}
}
if (count($taskToDoStorage) === 0) {
if ($this->isSuccessfullyFinished) {
$this->informer->endTask();
} else {
$this->informer->taskError($this->hasNonFatalException);
}
// We waited all workers to finish their tasks.
// Wait no more!
$this->wait = false;
// Reset to default for next tasks.
$this->isSuccessfullyFinished = true;
}
}
}
}

View File

@ -36,10 +36,12 @@ class SeriesExecutor implements ExecutorInterface
$informer->onServer($serverName);
try {
$task->run(new Context($server, $env, $input, $output));
} catch (NonFatalException $e) {
$informer->taskError($e);
} catch (NonFatalException $exception) {
$success = false;
$informer->taskException($serverName, 'Deployer\Task\NonFatalException', $exception->getMessage());
}
$informer->endOnServer($serverName);
@ -47,7 +49,11 @@ class SeriesExecutor implements ExecutorInterface
}
}
$informer->endTask($success);
if ($success) {
$informer->endTask();
} else {
$informer->taskError();
}
}
}
}

View File

@ -1,14 +1,19 @@
<?php
/* (c) Anton Medvedev <anton@elfet.ru>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Deployer\Task;
class NonFatalException extends \RuntimeException
{
/**
* Make message mandatory
* Make message mandatory.
*
* @param string $message
* @param integer $code defaults to 0
* @param string $message
* @param integer $code defaults to 0
* @param \Exception|null $previous
*/
public function __construct($message, $code = 0, \Exception $previous = null)