# -*- coding: utf-8 -*-
"""The Elastic Search output module CLI arguments helper."""
from __future__ import unicode_literals
import getpass
from uuid import uuid4
from plaso.lib import errors
from plaso.cli.helpers import interface
from plaso.cli.helpers import manager
from plaso.cli.helpers import server_config
from plaso.output import elastic
[docs]class ElasticSearchServerArgumentsHelper(server_config.ServerArgumentsHelper):
"""Elastic Search server CLI arguments helper."""
_DEFAULT_SERVER = '127.0.0.1'
_DEFAULT_PORT = 9200
[docs]class ElasticSearchOutputArgumentsHelper(interface.ArgumentsHelper):
"""Elastic Search output module CLI arguments helper."""
NAME = 'elastic'
CATEGORY = 'output'
DESCRIPTION = 'Argument helper for the Elastic Search output modules.'
_DEFAULT_INDEX_NAME = uuid4().hex
_DEFAULT_DOCUMENT_TYPE = 'plaso_event'
_DEFAULT_FLUSH_INTERVAL = 1000
_DEFAULT_RAW_FIELDS = False
_DEFAULT_ELASTIC_USER = None
[docs] @classmethod
def AddArguments(cls, argument_group):
"""Adds command line arguments the helper supports to an argument group.
This function takes an argument parser or an argument group object and adds
to it all the command line arguments this helper supports.
Args:
argument_group (argparse._ArgumentGroup|argparse.ArgumentParser):
argparse group.
"""
argument_group.add_argument(
'--index_name', dest='index_name', type=str, action='store',
default=cls._DEFAULT_INDEX_NAME, help=(
'Name of the index in ElasticSearch.'))
argument_group.add_argument(
'--doc_type', dest='document_type', type=str,
action='store', default=cls._DEFAULT_DOCUMENT_TYPE, help=(
'Name of the document type that will be used in ElasticSearch.'))
argument_group.add_argument(
'--flush_interval', dest='flush_interval', type=int,
action='store', default=cls._DEFAULT_FLUSH_INTERVAL, help=(
'Events to queue up before bulk insert to ElasticSearch.'))
argument_group.add_argument(
'--raw_fields', dest='raw_fields', action='store_true',
default=cls._DEFAULT_RAW_FIELDS, help=(
'Export string fields that will not be analyzed by Lucene.'))
argument_group.add_argument(
'--elastic_user', dest='elastic_user', action='store',
default=cls._DEFAULT_ELASTIC_USER, help=(
'Username to use for Elasticsearch authentication.'))
ElasticSearchServerArgumentsHelper.AddArguments(argument_group)
# pylint: disable=arguments-differ
[docs] @classmethod
def ParseOptions(cls, options, output_module):
"""Parses and validates options.
Args:
options (argparse.Namespace): parser options.
output_module (OutputModule): output module to configure.
Raises:
BadConfigObject: when the output module object is of the wrong type.
BadConfigOption: when a configuration parameter fails validation.
"""
elastic_output_modules = (
elastic.ElasticsearchOutputModule, elastic.Elasticsearch5OutputModule)
if not isinstance(output_module, elastic_output_modules):
raise errors.BadConfigObject(
'Output module is not an instance of ElasticsearchOutputModule')
index_name = cls._ParseStringOption(
options, 'index_name', default_value=cls._DEFAULT_INDEX_NAME)
document_type = cls._ParseStringOption(
options, 'document_type', default_value=cls._DEFAULT_DOCUMENT_TYPE)
flush_interval = cls._ParseNumericOption(
options, 'flush_interval', default_value=cls._DEFAULT_FLUSH_INTERVAL)
raw_fields = getattr(
options, 'raw_fields', cls._DEFAULT_RAW_FIELDS)
elastic_user = cls._ParseStringOption(
options, 'elastic_user', default_value=cls._DEFAULT_ELASTIC_USER)
if elastic_user is not None:
elastic_password = getpass.getpass(
'Enter your Elasticsearch password: ')
else:
elastic_password = None
ElasticSearchServerArgumentsHelper.ParseOptions(options, output_module)
output_module.SetIndexName(index_name)
output_module.SetDocumentType(document_type)
output_module.SetFlushInterval(flush_interval)
output_module.SetRawFields(raw_fields)
output_module.SetUsername(elastic_user)
output_module.SetPassword(elastic_password)
manager.ArgumentHelperManager.RegisterHelper(ElasticSearchOutputArgumentsHelper)