diff --git a/src/Monolog/Handler/ElasticSearchHandler.php b/src/Monolog/Handler/ElasticSearchHandler.php new file mode 100644 index 00000000..d465438a --- /dev/null +++ b/src/Monolog/Handler/ElasticSearchHandler.php @@ -0,0 +1,158 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Monolog\Handler; + +use Elasticsearch\Client; +use Elasticsearch\Common\Exceptions\RuntimeException as ElasticSearchRuntimeException; +use Exception; +use InvalidArgumentException; +use Monolog\Formatter\ElasticSearchFormatter; +use Monolog\Formatter\FormatterInterface; +use Monolog\Logger; +use RuntimeException; + +/** + * Elastic Search handler + * + * @link https://www.elastic.co/guide/en/elasticsearch/client/php-api/current/index.html + * + * Simple usage example: + * + * $client = \ElasticSearch\ClientBuilder::create() + * ->setHosts($hosts) + * ->build(); + * + * $options = array( + * 'index' => 'elastic_index_name', + * 'type' => 'elastic_doc_type', + * ); + * $handler = new ElasticSearchHandler($client, $options); + * $log = new Logger('application'); + * $log->pushHandler($handler); + * + * @author Avtandil Kikabidze + */ +class ElasticSearchHandler extends AbstractProcessingHandler +{ + /** + * @var \Elasticsearch\Client + */ + protected $client; + + /** + * @var array Handler config options + */ + protected $options = []; + + /** + * @param \Elasticsearch\Client $client ElasticSearch Client object + * @param array $options Handler configuration + * @param int $level The minimum logging level at which this handler will be triggered + * @param Boolean $bubble Whether the messages that are handled can bubble up the stack or not + */ + public function __construct(Client $client, array $options = [], $level = Logger::DEBUG, $bubble = true) + { + parent::__construct($level, $bubble); + $this->client = $client; + $this->options = array_merge( + [ + 'index' => 'monolog', // Elastic index name + 'type' => 'record', // Elastic document type + 'ignore_error' => false, // Suppress ElasticSearch exceptions + ], + $options + ); + } + + /** + * {@inheritDoc} + */ + protected function write(array $record) + { + $this->bulkSend([$record['formatted']]); + } + + /** + * {@inheritdoc} + */ + public function setFormatter(FormatterInterface $formatter): HandlerInterface + { + if ($formatter instanceof ElasticSearchFormatter) { + return parent::setFormatter($formatter); + } + throw new InvalidArgumentException('ElasticSearchHandler is only compatible with ElasticSearchFormatter'); + } + + /** + * Getter options + * + * @return array + */ + public function getOptions(): array + { + return $this->options; + } + + /** + * {@inheritDoc} + */ + protected function getDefaultFormatter(): FormatterInterface + { + return new ElasticSearchFormatter($this->options['index'], $this->options['type']); + } + + /** + * {@inheritdoc} + */ + public function handleBatch(array $records) + { + $documents = $this->getFormatter()->formatBatch($records); + $this->bulkSend($documents); + } + + /** + * Use ElasticSearch bulk API to send list of documents + * + * @param array $records + * @throws \RuntimeException + */ + protected function bulkSend(array $records) + { + try { + $params = [ + 'body' => [], + ]; + + foreach ($records as $record) { + $params['body'][] = [ + 'index' => [ + '_index' => $record['_index'], + '_type' => $record['_type'], + ], + ]; + unset($record['_index'], $record['_type']); + + $params['body'][] = $record; + } + + $responses = $this->client->bulk($params); + + if ($responses['errors'] === true) { + throw new ElasticSearchRuntimeException('ElasticSearch returned error for one of the records'); + } + } catch (Exception $e) { + if (! $this->options['ignore_error']) { + throw new RuntimeException('Error sending messages to ElasticSearch', 0, $e); + } + } + } +} diff --git a/tests/Monolog/Handler/ElasticSearchHandlerTest.php b/tests/Monolog/Handler/ElasticSearchHandlerTest.php new file mode 100644 index 00000000..1045e034 --- /dev/null +++ b/tests/Monolog/Handler/ElasticSearchHandlerTest.php @@ -0,0 +1,266 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Monolog\Handler; + +use Elasticsearch\ClientBuilder; +use Monolog\Formatter\ElasticSearchFormatter; +use Monolog\Formatter\NormalizerFormatter; +use Monolog\Test\TestCase; +use Monolog\Logger; +use Elasticsearch\Client; + +class ElasticSearchHandlerTest extends TestCase +{ + /** + * @var Client mock + */ + protected $client; + + /** + * @var array Default handler options + */ + protected $options = [ + 'index' => 'my_index', + 'type' => 'doc_type', + ]; + + public function setUp() + { + // ElasticSearch lib required + if (!class_exists('Elasticsearch\Client')) { + $this->markTestSkipped('elasticsearch/elasticsearch not installed'); + } + + // base mock ElasticSearch Client object + $this->client = $this->getMockBuilder('Elasticsearch\Client') + ->setMethods(['bulk']) + ->disableOriginalConstructor() + ->getMock(); + } + + /** + * @covers Monolog\Handler\ElasticSearchHandler::write + * @covers Monolog\Handler\ElasticSearchHandler::handleBatch + * @covers Monolog\Handler\ElasticSearchHandler::bulkSend + * @covers Monolog\Handler\ElasticSearchHandler::getDefaultFormatter + */ + public function testHandle() + { + // log message + $msg = [ + 'level' => Logger::ERROR, + 'level_name' => 'ERROR', + 'channel' => 'meh', + 'context' => ['foo' => 7, 'bar', 'class' => new \stdClass], + 'datetime' => new \DateTimeImmutable("@0"), + 'extra' => [], + 'message' => 'log', + ]; + + // format expected result + $formatter = new ElasticSearchFormatter($this->options['index'], $this->options['type']); + $data = $formatter->format($msg); + unset($data['_index'], $data['_type']); + + $expected = [ + 'body' => [ + [ + 'index' => [ + '_index' => $this->options['index'], + '_type' => $this->options['type'], + ], + ], + $data, + ] + ]; + + // setup ES client mock + $this->client->expects($this->any()) + ->method('bulk') + ->with($expected); + + // perform tests + $handler = new ElasticSearchHandler($this->client, $this->options); + $handler->handle($msg); + $handler->handleBatch([$msg]); + } + + /** + * @covers Monolog\Handler\ElasticSearchHandler::setFormatter + */ + public function testSetFormatter() + { + $handler = new ElasticSearchHandler($this->client); + $formatter = new ElasticSearchFormatter('index_new', 'type_new'); + $handler->setFormatter($formatter); + $this->assertInstanceOf('Monolog\Formatter\ElasticSearchFormatter', $handler->getFormatter()); + $this->assertEquals('index_new', $handler->getFormatter()->getIndex()); + $this->assertEquals('type_new', $handler->getFormatter()->getType()); + } + + /** + * @covers Monolog\Handler\ElasticSearchHandler::setFormatter + * @expectedException InvalidArgumentException + * @expectedExceptionMessage ElasticSearchHandler is only compatible with ElasticSearchFormatter + */ + public function testSetFormatterInvalid() + { + $handler = new ElasticSearchHandler($this->client); + $formatter = new NormalizerFormatter(); + $handler->setFormatter($formatter); + } + + /** + * @covers Monolog\Handler\ElasticSearchHandler::__construct + * @covers Monolog\Handler\ElasticSearchHandler::getOptions + */ + public function testOptions() + { + $expected = [ + 'index' => $this->options['index'], + 'type' => $this->options['type'], + 'ignore_error' => false, + ]; + $handler = new ElasticSearchHandler($this->client, $this->options); + $this->assertEquals($expected, $handler->getOptions()); + } + + /** + * @covers Monolog\Handler\ElasticSearchHandler::bulkSend + * @dataProvider providerTestConnectionErrors + */ + public function testConnectionErrors($ignore, $expectedError) + { + $hosts = [['host' => '127.0.0.1', 'port' => 1]]; + $client = ClientBuilder::create() + ->setHosts($hosts) + ->build(); + + $handlerOpts = ['ignore_error' => $ignore]; + $handler = new ElasticSearchHandler($client, $handlerOpts); + + if ($expectedError) { + $this->expectException($expectedError[0]); + $this->expectExceptionMessage($expectedError[1]); + $handler->handle($this->getRecord()); + } else { + $this->assertFalse($handler->handle($this->getRecord())); + } + } + + /** + * @return array + */ + public function providerTestConnectionErrors() + { + return [ + [false, ['RuntimeException', 'Error sending messages to ElasticSearch']], + [true, false], + ]; + } + + /** + * Integration test using localhost Elastic Search server + * + * @covers Monolog\Handler\ElasticSearchHandler::__construct + * @covers Monolog\Handler\ElasticSearchHandler::handleBatch + * @covers Monolog\Handler\ElasticSearchHandler::bulkSend + * @covers Monolog\Handler\ElasticSearchHandler::getDefaultFormatter + */ + public function testHandleIntegration() + { + $msg = [ + 'level' => Logger::ERROR, + 'level_name' => 'ERROR', + 'channel' => 'meh', + 'context' => ['foo' => 7, 'bar', 'class' => new \stdClass], + 'datetime' => new \DateTimeImmutable("@0"), + 'extra' => [], + 'message' => 'log', + ]; + + $expected = $msg; + $expected['datetime'] = $msg['datetime']->format(\DateTime::ISO8601); + $expected['context'] = [ + 'class' => ["stdClass" => []], + 'foo' => 7, + 0 => 'bar', + ]; + + $client = ClientBuilder::create() + ->build(); + $handler = new ElasticSearchHandler($client, $this->options); + try { + $handler->handleBatch([$msg]); + } catch (\RuntimeException $e) { + $this->markTestSkipped('Cannot connect to Elastic Search server on localhost'); + } + + // check document id from ES server response + $documentId = $this->getCreatedDocId($client->transport->getLastConnection()->getLastRequestInfo()); + $this->assertNotEmpty($documentId, 'No elastic document id received'); + + // retrieve document source from ES and validate + $document = $this->getDocSourceFromElastic( + $client, + $this->options['index'], + $this->options['type'], + $documentId + ); + + $this->assertEquals($expected, $document); + + // remove test index from ES + $client->indices()->delete(['index' => $this->options['index']]); + } + + /** + * Return last created document id from ES response + * + * @param array $info ElasticSearch last request info + * @return string|null + */ + protected function getCreatedDocId(array $info) + { + $data = json_decode($info['response']['body'], true); + + if (!empty($data['items'][0]['index']['_id'])) { + return $data['items'][0]['index']['_id']; + } + } + + /** + * Retrieve document by id from Elasticsearch + * + * @param Client $client ElasticSearch client + * @param string $index + * @param string $type + * @param string $documentId + * @return array + */ + protected function getDocSourceFromElastic(Client $client, $index, $type, $documentId) + { + $params = [ + 'index' => $index, + 'type' => $type, + 'id' => $documentId + ]; + + $data = $client->get($params); + + if (!empty($data['_source'])) { + return $data['_source']; + } + + return []; + } +}