Source code for plaso.cli.extraction_tool
# -*- coding: utf-8 -*-
"""The extraction CLI tool."""
from __future__ import unicode_literals
from dfvfs.resolver import context as dfvfs_context
# The following import makes sure the analyzers are registered.
from plaso import analyzers # pylint: disable=unused-import
# The following import makes sure the parsers are registered.
from plaso import parsers # pylint: disable=unused-import
from plaso.cli import logger
from plaso.cli import storage_media_tool
from plaso.cli import tool_options
from plaso.cli.helpers import manager as helpers_manager
from plaso.engine import configurations
from plaso.engine import engine
from plaso.lib import definitions
from plaso.lib import errors
from plaso.parsers import manager as parsers_manager
[docs]class ExtractionTool(
storage_media_tool.StorageMediaTool,
tool_options.HashersOptions,
tool_options.ParsersOptions,
tool_options.ProfilingOptions,
tool_options.StorageFileOptions):
"""Extraction CLI tool."""
# Approximately 250 MB of queued items per worker.
_DEFAULT_QUEUE_SIZE = 125000
_BYTES_IN_A_MIB = 1024 * 1024
def __init__(self, input_reader=None, output_writer=None):
"""Initializes an CLI tool.
Args:
input_reader (Optional[InputReader]): input reader, where None indicates
that the stdin input reader should be used.
output_writer (Optional[OutputWriter]): output writer, where None
indicates that the stdout output writer should be used.
"""
super(ExtractionTool, self).__init__(
input_reader=input_reader, output_writer=output_writer)
self._artifacts_registry = None
self._buffer_size = 0
self._mount_path = None
self._operating_system = None
self._preferred_year = None
self._process_archives = False
self._process_compressed_streams = True
self._process_memory_limit = None
self._queue_size = self._DEFAULT_QUEUE_SIZE
self._resolver_context = dfvfs_context.Context()
self._single_process_mode = False
self._storage_file_path = None
self._storage_format = definitions.STORAGE_FORMAT_SQLITE
self._temporary_directory = None
self._text_prepend = None
self._use_zeromq = True
self._yara_rules_string = None
def _CreateProcessingConfiguration(self, knowledge_base):
"""Creates a processing configuration.
Args:
knowledge_base (KnowledgeBase): contains information from the source
data needed for parsing.
Returns:
ProcessingConfiguration: processing configuration.
"""
# TODO: pass preferred_encoding.
configuration = configurations.ProcessingConfiguration()
configuration.artifact_filters = self._artifact_filters
configuration.credentials = self._credential_configurations
configuration.debug_output = self._debug_mode
configuration.event_extraction.text_prepend = self._text_prepend
configuration.extraction.hasher_file_size_limit = (
self._hasher_file_size_limit)
configuration.extraction.hasher_names_string = self._hasher_names_string
configuration.extraction.process_archives = self._process_archives
configuration.extraction.process_compressed_streams = (
self._process_compressed_streams)
configuration.extraction.yara_rules_string = self._yara_rules_string
configuration.filter_file = self._filter_file
configuration.input_source.mount_path = self._mount_path
configuration.log_filename = self._log_file
configuration.parser_filter_expression = self._parser_filter_expression
configuration.preferred_year = self._preferred_year
configuration.profiling.directory = self._profiling_directory
configuration.profiling.sample_rate = self._profiling_sample_rate
configuration.profiling.profilers = self._profilers
configuration.temporary_directory = self._temporary_directory
if not configuration.parser_filter_expression:
operating_system = knowledge_base.GetValue('operating_system')
operating_system_product = knowledge_base.GetValue(
'operating_system_product')
operating_system_version = knowledge_base.GetValue(
'operating_system_version')
parser_filter_expression = (
parsers_manager.ParsersManager.GetPresetForOperatingSystem(
operating_system, operating_system_product,
operating_system_version))
if parser_filter_expression:
logger.info('Parser filter expression changed to: {0:s}'.format(
parser_filter_expression))
configuration.parser_filter_expression = parser_filter_expression
return configuration
def _ParsePerformanceOptions(self, options):
"""Parses the performance options.
Args:
options (argparse.Namespace): command line arguments.
Raises:
BadConfigOption: if the options are invalid.
"""
self._buffer_size = getattr(options, 'buffer_size', 0)
if self._buffer_size:
# TODO: turn this into a generic function that supports more size
# suffixes both MB and MiB and also that does not allow m as a valid
# indicator for MiB since m represents milli not Mega.
try:
if self._buffer_size[-1].lower() == 'm':
self._buffer_size = int(self._buffer_size[:-1], 10)
self._buffer_size *= self._BYTES_IN_A_MIB
else:
self._buffer_size = int(self._buffer_size, 10)
except ValueError:
raise errors.BadConfigOption(
'Invalid buffer size: {0:s}.'.format(self._buffer_size))
self._queue_size = self.ParseNumericOption(options, 'queue_size')
def _ParseProcessingOptions(self, options):
"""Parses the processing options.
Args:
options (argparse.Namespace): command line arguments.
Raises:
BadConfigOption: if the options are invalid.
"""
self._single_process_mode = getattr(options, 'single_process', False)
argument_helper_names = [
'process_resources', 'temporary_directory', 'workers', 'zeromq']
helpers_manager.ArgumentHelperManager.ParseOptions(
options, self, names=argument_helper_names)
def _PreprocessSources(self, extraction_engine):
"""Preprocesses the sources.
Args:
extraction_engine (BaseEngine): extraction engine to preprocess
the sources.
"""
logger.debug('Starting preprocessing.')
try:
artifacts_registry = engine.BaseEngine.BuildArtifactsRegistry(
self._artifact_definitions_path, self._custom_artifacts_path)
extraction_engine.PreprocessSources(
artifacts_registry, self._source_path_specs,
resolver_context=self._resolver_context)
except IOError as exception:
logger.error('Unable to preprocess with error: {0!s}'.format(exception))
logger.debug('Preprocessing done.')
def _SetExtractionParsersAndPlugins(self, configuration, session):
"""Sets the parsers and plugins before extraction.
Args:
configuration (ProcessingConfiguration): processing configuration.
session (Session): session.
"""
names_generator = parsers_manager.ParsersManager.GetParserAndPluginNames(
parser_filter_expression=configuration.parser_filter_expression)
session.enabled_parser_names = list(names_generator)
session.parser_filter_expression = configuration.parser_filter_expression
def _SetExtractionPreferredTimeZone(self, knowledge_base):
"""Sets the preferred time zone before extraction.
Args:
knowledge_base (KnowledgeBase): contains information from the source
data needed for parsing.
"""
# Note session.preferred_time_zone will default to UTC but
# self._preferred_time_zone is None when not set.
if self._preferred_time_zone:
try:
knowledge_base.SetTimeZone(self._preferred_time_zone)
except ValueError:
# pylint: disable=protected-access
logger.warning(
'Unsupported time zone: {0:s}, defaulting to {1:s}'.format(
self._preferred_time_zone, knowledge_base._time_zone.zone))
[docs] def AddPerformanceOptions(self, argument_group):
"""Adds the performance options to the argument group.
Args:
argument_group (argparse._ArgumentGroup): argparse argument group.
"""
argument_group.add_argument(
'--buffer_size', '--buffer-size', '--bs', dest='buffer_size',
action='store', default=0, help=(
'The buffer size for the output (defaults to 196MiB).'))
argument_group.add_argument(
'--queue_size', '--queue-size', dest='queue_size', action='store',
default=0, help=(
'The maximum number of queued items per worker '
'(defaults to {0:d})').format(self._DEFAULT_QUEUE_SIZE))
[docs] def AddProcessingOptions(self, argument_group):
"""Adds the processing options to the argument group.
Args:
argument_group (argparse._ArgumentGroup): argparse argument group.
"""
argument_group.add_argument(
'--single_process', '--single-process', dest='single_process',
action='store_true', default=False, help=(
'Indicate that the tool should run in a single process.'))
argument_helper_names = ['temporary_directory', 'workers', 'zeromq']
if self._CanEnforceProcessMemoryLimit():
argument_helper_names.append('process_resources')
helpers_manager.ArgumentHelperManager.AddCommandLineArguments(
argument_group, names=argument_helper_names)