# -*- coding: utf-8 -*-
"""Shared code for Elasticsearch based output modules."""
from __future__ import unicode_literals
import logging
from dfvfs.serializer.json_serializer import JsonPathSpecSerializer
try:
import elasticsearch
except ImportError:
elasticsearch = None
try:
import elasticsearch5
except ImportError:
elasticsearch5 = None
from plaso.lib import errors
from plaso.lib import timelib
from plaso.output import interface
from plaso.output import logger
# Configure the Elasticsearch logger.
if elasticsearch:
elastic_logger = logging.getLogger('elasticsearch.trace')
elastic_logger.setLevel(logging.WARNING)
# Configure the Elasticsearch 5 logger.
if elasticsearch5:
elastic5_logger = logging.getLogger('elasticsearch.trace')
elastic5_logger.setLevel(logging.WARNING)
[docs]class SharedElasticsearchOutputModule(interface.OutputModule):
"""Shared functionality for an Elasticsearch output module."""
# pylint: disable=abstract-method
NAME = 'elastic_shared'
_DEFAULT_DOCUMENT_TYPE = 'plaso_event'
_DEFAULT_FLUSH_INTERVAL = 1000
# Number of seconds to wait before a request to Elasticsearch is timed out.
_DEFAULT_REQUEST_TIMEOUT = 300
def __init__(self, output_mediator):
"""Initializes an Elasticsearch output module.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfvfs.
"""
super(SharedElasticsearchOutputModule, self).__init__(output_mediator)
self._client = None
self._document_type = self._DEFAULT_DOCUMENT_TYPE
self._event_documents = []
self._flush_interval = self._DEFAULT_FLUSH_INTERVAL
self._host = None
self._index_name = None
self._number_of_buffered_events = 0
self._password = None
self._port = None
self._username = None
def _Connect(self):
"""Connects to an Elasticsearch server."""
elastic_hosts = [{'host': self._host, 'port': self._port}]
elastic_http_auth = None
if self._username is not None:
elastic_http_auth = (self._username, self._password)
self._client = elasticsearch.Elasticsearch(
elastic_hosts, http_auth=elastic_http_auth)
logger.debug('Connected to Elasticsearch server: {0:s} port: {1:d}.'.format(
self._host, self._port))
def _CreateIndexIfNotExists(self, index_name, mappings):
"""Creates an Elasticsearch index if it not already exists.
Args:
index_name (str): mame of the index.
mappings (dict[str, object]): mappings of the index.
Raises:
RuntimeError: if the Elasticsearch index cannot be created.
"""
try:
if not self._client.indices.exists(index_name):
self._client.indices.create(
body={'mappings': mappings}, index=index_name)
except elasticsearch.exceptions.ConnectionError as exception:
raise RuntimeError(
'Unable to create Elasticsearch index with error: {0!s}'.format(
exception))
def _FlushEvents(self):
"""Inserts the buffered event documents into Elasticsearch."""
try:
# pylint: disable=unexpected-keyword-arg
# pylint does not recognizes request_timeout as a valid kwarg. According
# to http://elasticsearch-py.readthedocs.io/en/master/api.html#timeout
# it should be supported.
self._client.bulk(
body=self._event_documents, doc_type=self._document_type,
index=self._index_name, request_timeout=self._DEFAULT_REQUEST_TIMEOUT)
except ValueError as exception:
# Ignore problematic events
logger.warning('Unable to bulk insert with error: {0!s}'.format(
exception))
logger.debug('Inserted {0:d} events into Elasticsearch'.format(
self._number_of_buffered_events))
self._event_documents = []
self._number_of_buffered_events = 0
def _GetSanitizedEventValues(self, event):
"""Sanitizes the event for use in Elasticsearch.
The event values need to be sanitized to prevent certain values from
causing problems when indexing with Elasticsearch. For example the path
specification is a nested dictionary which will cause problems for
Elasticsearch automatic indexing.
Args:
event (EventObject): event.
Returns:
dict[str, object]: sanitized event values.
Raises:
NoFormatterFound: if no event formatter can be found to match the data
type in the event.
"""
event_values = {}
for attribute_name, attribute_value in event.GetAttributes():
# Ignore the regvalue attribute as it cause issues when indexing.
if attribute_name == 'regvalue':
continue
if attribute_name == 'pathspec':
try:
attribute_value = JsonPathSpecSerializer.WriteSerialized(
attribute_value)
except TypeError:
continue
event_values[attribute_name] = attribute_value
# Add a string representation of the timestamp.
try:
attribute_value = timelib.Timestamp.RoundToSeconds(event.timestamp)
except TypeError as exception:
logger.warning((
'Unable to round timestamp {0!s}. error: {1!s}. '
'Defaulting to 0').format(event.timestamp, exception))
attribute_value = 0
attribute_value = timelib.Timestamp.CopyToIsoFormat(
attribute_value, timezone=self._output_mediator.timezone)
event_values['datetime'] = attribute_value
message, _ = self._output_mediator.GetFormattedMessages(event)
if message is None:
data_type = getattr(event, 'data_type', 'UNKNOWN')
raise errors.NoFormatterFound(
'Unable to find event formatter for: {0:s}.'.format(data_type))
event_values['message'] = message
# Tags needs to be a list for Elasticsearch to index correctly.
try:
labels = list(event_values['tag'].labels)
except (KeyError, AttributeError):
labels = []
event_values['tag'] = labels
source_short, source = self._output_mediator.GetFormattedSources(event)
if source is None or source_short is None:
data_type = getattr(event, 'data_type', 'UNKNOWN')
raise errors.NoFormatterFound(
'Unable to find event formatter for: {0:s}.'.format(data_type))
event_values['source_short'] = source_short
event_values['source_long'] = source
return event_values
def _InsertEvent(self, event, force_flush=False):
"""Inserts an event.
Events are buffered in the form of documents and inserted to Elasticsearch
when either forced to flush or when the flush interval (threshold) has been
reached.
Args:
event (EventObject): event.
force_flush (bool): True if buffered event documents should be inserted
into Elasticsearch.
"""
if event:
event_document = {'index': {
'_index': self._index_name, '_type': self._document_type}}
event_values = self._GetSanitizedEventValues(event)
self._event_documents.append(event_document)
self._event_documents.append(event_values)
self._number_of_buffered_events += 1
if force_flush or self._number_of_buffered_events > self._flush_interval:
self._FlushEvents()
[docs] def Close(self):
"""Closes connection to Elasticsearch.
Inserts any remaining buffered event documents.
"""
self._InsertEvent(None, force_flush=True)
self._client = None
[docs] def SetDocumentType(self, document_type):
"""Sets the document type.
Args:
document_type (str): document type.
"""
self._document_type = document_type
logger.debug('Elasticsearch document type: {0:s}'.format(document_type))
[docs] def SetFlushInterval(self, flush_interval):
"""Set the flush interval.
Args:
flush_interval (int): number of events to buffer before doing a bulk
insert.
"""
self._flush_interval = flush_interval
logger.debug('Elasticsearch flush interval: {0:d}'.format(flush_interval))
[docs] def SetIndexName(self, index_name):
"""Set the index name.
Args:
index_name (str): name of the index.
"""
self._index_name = index_name
logger.debug('Elasticsearch index name: {0:s}'.format(index_name))
[docs] def SetPassword(self, password):
"""Set the password.
Args:
password (str): password to authenticate with.
"""
self._password = password
logger.debug('Elastic password: ********')
logger.debug('Elasticsearch server: {0!s} port: {1:d}'.format(server, port))
[docs] def SetUsername(self, username):
"""Sets the username.
Args:
username (str): username to authenticate with.
"""
self._username = username
logger.debug('Elasticsearch username: {0!s}'.format(username))
[docs] def WriteEventBody(self, event):
"""Writes an event to the output.
Args:
event (EventObject): event.
"""
self._InsertEvent(event)
[docs]class SharedElasticsearch5OutputModule(SharedElasticsearchOutputModule):
"""Shared output module for Elasticsearch 5."""
def _Connect(self):
"""Connects to an Elasticsearch server."""
elastic_hosts = [{'host': self._host, 'port': self._port}]
elastic_http_auth = None
if self._username is not None:
elastic_http_auth = (self._username, self._password)
self._client = elasticsearch5.Elasticsearch(
elastic_hosts, http_auth=elastic_http_auth)
logger.debug('Connected to Elasticsearch server: {0:s} port: {1:d}.'.format(
self._host, self._port))
def _CreateIndexIfNotExists(self, index_name, mappings):
"""Creates an Elasticsearch index if it not already exists.
Args:
index_name (str): mame of the index.
mappings (dict[str, object]): mappings of the index.
Raises:
RuntimeError: if the Elasticsearch index cannot be created.
"""
try:
super(SharedElasticsearch5OutputModule, self)._CreateIndexIfNotExists(
index_name, mappings)
except elasticsearch5.exceptions.ConnectionError as exception:
raise RuntimeError(
'Unable to create Elasticsearch index with error: {0!s}'.format(
exception))