# -*- coding: utf-8 -*-
"""An output module that saves events to Elasticsearch."""
from __future__ import unicode_literals
from collections import Counter
import logging
from dfvfs.serializer.json_serializer import JsonPathSpecSerializer
try:
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import ConnectionError as ElasticConnectionError
except ImportError:
Elasticsearch = None
from plaso.lib import errors
from plaso.lib import timelib
from plaso.output import interface
from plaso.output import manager
# Configure Elasticsearch logger
elastic_logger = logging.getLogger('elasticsearch.trace')
elastic_logger.setLevel(logging.WARNING)
[docs]class ElasticSearchHelper(object):
"""Elasticsearch helper class."""
def __init__(
self, output_mediator, host, port, flush_interval, index_name, mapping,
doc_type, elastic_password=None, elastic_user=None):
"""Create a Elasticsearch helper.
Args:
output_mediator (OutputMediator): The output mediator object.
host (str): IP address or hostname for the server.
port (int): Port number for the server.
flush_interval (int): How many events to queue before being indexed.
index_name (str): Name of the Elasticsearch index.
mapping (dict): Elasticsearch index configuration.
doc_type (str): Elasticsearch document type name.
elastic_password (Optional[str]): Elasticsearch password to authenticate
with.
elastic_user (Optional[str]): Elasticsearch username to authenticate with.
"""
super(ElasticSearchHelper, self).__init__()
elastic_hosts = [{'host': host, 'port': port}]
if elastic_user is None:
self.client = Elasticsearch(elastic_hosts)
else:
self.client = Elasticsearch(
elastic_hosts, http_auth=(elastic_user, elastic_password))
self._output_mediator = output_mediator
self._index = self._EnsureIndexExists(index_name, mapping)
self._doc_type = doc_type
self._flush_interval = flush_interval
self._events = []
self._counter = Counter()
self._elastic_user = elastic_user
self._elastic_password = elastic_password
[docs] def AddEvent(self, event_object, force_flush=False):
"""Index event in Elasticsearch.
Args:
event_object (EventObject): the event object.
force_flush (bool): Force bulk insert of events in the queue.
"""
if event_object:
self._events.append(
{'index': {'_index': self._index, '_type': self._doc_type}})
self._events.append(self._GetSanitizedEventValues(event_object))
self._counter['events'] += 1
# Check if we need to flush the queued events
if force_flush or self._counter['events'] % self._flush_interval == 0:
self._FlushEventsToElasticSearch()
def _EnsureIndexExists(self, index_name, mapping):
"""Create Elasticsearch index.
Args:
index_name: Name of the index.
mapping: Mapping for the index
Returns:
Name of the index.
"""
try:
if not self.client.indices.exists(index_name):
self.client.indices.create(
index=index_name, body={'mappings': mapping})
except ElasticConnectionError as exception:
raise RuntimeError(
'Unable to connect to Elasticsearch backend with error: {0!s}'.format(
exception))
return index_name
def _GetSanitizedEventValues(self, event_object):
"""Builds a dictionary from an event_object.
The event object need to be sanitized to prevent certain values from causing
problems when indexing with Elasticsearch. For example the value of
the pathspec attribute is a nested dictionary which will cause problems for
Elasticsearch automatic indexing.
Args:
event_object (EventObject): event object.
Returns:
Dictionary with sanitized event object values.
"""
event_values = {}
for attribute_name, attribute_value in event_object.GetAttributes():
# Ignore the regvalue attribute as it cause issues when indexing
if attribute_name == 'regvalue':
continue
if attribute_name == 'pathspec':
try:
attribute_value = JsonPathSpecSerializer.WriteSerialized(
attribute_value)
except TypeError:
continue
event_values[attribute_name] = attribute_value
# Add string representation of the timestamp
try:
attribute_value = timelib.Timestamp.RoundToSeconds(event_object.timestamp)
except TypeError as exception:
logging.warning(
('Unable to round timestamp {0!s}. error: {1!s}. '
'Defaulting to 0').format(
event_object.timestamp, exception))
attribute_value = 0
attribute_value = timelib.Timestamp.CopyToIsoFormat(
attribute_value, timezone=self._output_mediator.timezone)
event_values['datetime'] = attribute_value
message, _ = self._output_mediator.GetFormattedMessages(event_object)
if message is None:
raise errors.NoFormatterFound(
'Unable to find event formatter for: {0:s}.'.format(
getattr(event_object, 'data_type', 'UNKNOWN')))
event_values['message'] = message
# Tags needs to be a list for Elasticsearch to index correctly.
try:
labels = list(event_values['tag'].labels)
except (KeyError, AttributeError):
labels = []
event_values['tag'] = labels
source_short, source = self._output_mediator.GetFormattedSources(
event_object)
if source is None or source_short is None:
raise errors.NoFormatterFound(
'Unable to find event formatter for: {0:s}.'.format(
getattr(event_object, 'data_type', 'UNKNOWN')))
event_values['source_short'] = source_short
event_values['source_long'] = source
return event_values
def _FlushEventsToElasticSearch(self):
"""Insert events in bulk to Elasticsearch."""
try:
self.client.bulk(
index=self._index, doc_type=self._doc_type, body=self._events)
except ValueError as e:
# Ignore problematic events
logging.warning('{0:s}'.format(e))
# Clear the events list
self._events = []
logging.info('{0:d} events added'.format(self._counter['events']))
[docs]class ElasticSearchOutputModule(interface.OutputModule):
"""Output module for Elasticsearch."""
NAME = 'elastic'
DESCRIPTION = 'Saves the events into an Elasticsearch database.'
# Strings longer than this will not be analyzed by elasticsearch.
_ELASTIC_ANALYZER_STRING_LIMIT = 10922
def __init__(self, output_mediator):
"""Initializes the output module object.
Args:
output_mediator: The output mediator object (instance of OutputMediator).
"""
super(ElasticSearchOutputModule, self).__init__(output_mediator)
self._doc_type = None
self._elastic = None
self._elastic_password = None
self._elastic_user = None
self._flush_interval = None
self._host = None
self._index_name = None
self._mapping = None
self._output_mediator = output_mediator
self._port = None
self._raw_fields = False
[docs] def Close(self):
"""Close connection to the Elasticsearch database.
Sends any remaining buffered events for indexing.
"""
self._elastic.AddEvent(event_object=None, force_flush=True)
[docs] def SetFlushInterval(self, flush_interval):
"""Set the flush interval.
Args:
flush_interval (int): Number of events to buffer before bulk insert.
"""
self._flush_interval = flush_interval
logging.info('Flush interval: {0:d}'.format(self._flush_interval))
[docs] def SetIndexName(self, index_name):
"""Set the index name.
Args:
index_name: the index name.
"""
self._index_name = index_name
logging.info('Index name: {0:s}'.format(self._index_name))
[docs] def SetDocType(self, doc_type):
"""Set the port.
Args:
doc_type (str): The document type to use when indexing.
"""
self._doc_type = doc_type
logging.info('Document type: {0:s}'.format(self._doc_type))
[docs] def SetRawFields(self, raw_fields):
"""Set raw (not analyzed) fields.
This is used for sorting and aggregations in Elasticsearch.
https://www.elastic.co/guide/en/elasticsearch/guide/current/
multi-fields.html
Args:
raw_fields (bool): Add not-analyzed index for string fields.
"""
self._raw_fields = raw_fields
logging.info('Add non analyzed string fields: {0!s}'.format(
self._raw_fields))
[docs] def SetElasticUser(self, elastic_user):
"""Set the Elastic username.
Args:
elastic_user (str): Elastic user to authenticate with.
"""
self._elastic_user = elastic_user
logging.info('Elastic user: {0:s}'.format(self._elastic_user))
[docs] def SetElasticPassword(self, elastic_password):
"""Set the Elastic password.
Args:
elastic_password (str): Elastic password to authenticate with.
"""
self._elastic_password = elastic_password
logging.info('Elastic password: {0:s}'.format('****'))
[docs] def WriteEventBody(self, event):
"""Writes the body of an event to the output.
Args:
event (EventObject): event.
"""
self._elastic.AddEvent(event)
manager.OutputManager.RegisterOutput(
ElasticSearchOutputModule, disabled=Elasticsearch is None)