Source code for plaso.cli.log2timeline_tool
# -*- coding: utf-8 -*-
"""The log2timeline CLI tool."""
from __future__ import unicode_literals
import argparse
import sys
import time
import textwrap
from dfvfs.lib import definitions as dfvfs_definitions
import plaso
# The following import makes sure the output modules are registered.
from plaso import output # pylint: disable=unused-import
from plaso.analyzers.hashers import manager as hashers_manager
from plaso.cli import extraction_tool
from plaso.cli import logger
from plaso.cli import status_view
from plaso.cli import tools
from plaso.cli import views
from plaso.cli.helpers import manager as helpers_manager
from plaso.engine import engine
from plaso.engine import single_process as single_process_engine
from plaso.lib import definitions
from plaso.lib import errors
from plaso.lib import loggers
from plaso.multi_processing import task_engine as multi_process_engine
from plaso.parsers import manager as parsers_manager
from plaso.storage import factory as storage_factory
[docs]class Log2TimelineTool(extraction_tool.ExtractionTool):
"""Log2timeline CLI tool.
Attributes:
dependencies_check (bool): True if the availability and versions of
dependencies should be checked.
list_hashers (bool): True if the hashers should be listed.
list_parsers_and_plugins (bool): True if the parsers and plugins should
be listed.
list_profilers (bool): True if the profilers should be listed.
show_info (bool): True if information about hashers, parsers, plugins,
etc. should be shown.
"""
NAME = 'log2timeline'
DESCRIPTION = textwrap.dedent('\n'.join([
'',
('log2timeline is a command line tool to extract events from '
'individual '),
'files, recursing a directory (e.g. mount point) or storage media ',
'image or device.',
'',
'More information can be gathered from here:',
' https://github.com/log2timeline/plaso/wiki/Using-log2timeline',
'']))
EPILOG = textwrap.dedent('\n'.join([
'',
'Example usage:',
'',
'Run the tool against a storage media image (full kitchen sink)',
' log2timeline.py /cases/mycase/storage.plaso ímynd.dd',
'',
'Instead of answering questions, indicate some of the options on the',
'command line (including data from particular VSS stores).',
(' log2timeline.py -o 63 --vss_stores 1,2 /cases/plaso_vss.plaso '
'image.E01'),
'',
'And that is how you build a timeline using log2timeline...',
'']))
# The window status-view mode has an annoying flicker on Windows,
# hence we default to linear status-view mode instead.
if sys.platform.startswith('win'):
_DEFAULT_STATUS_VIEW_MODE = status_view.StatusView.MODE_LINEAR
else:
_DEFAULT_STATUS_VIEW_MODE = status_view.StatusView.MODE_WINDOW
_SOURCE_TYPES_TO_PREPROCESS = frozenset([
dfvfs_definitions.SOURCE_TYPE_DIRECTORY,
dfvfs_definitions.SOURCE_TYPE_STORAGE_MEDIA_DEVICE,
dfvfs_definitions.SOURCE_TYPE_STORAGE_MEDIA_IMAGE])
def __init__(self, input_reader=None, output_writer=None):
"""Initializes a log2timeline CLI tool.
Args:
input_reader (Optional[InputReader]): input reader, where None indicates
that the stdin input reader should be used.
output_writer (Optional[OutputWriter]): output writer, where None
indicates that the stdout output writer should be used.
"""
super(Log2TimelineTool, self).__init__(
input_reader=input_reader, output_writer=output_writer)
self._command_line_arguments = None
self._enable_sigsegv_handler = False
self._number_of_extraction_workers = 0
self._storage_serializer_format = definitions.SERIALIZER_FORMAT_JSON
self._source_type = None
self._status_view = status_view.StatusView(self._output_writer, self.NAME)
self._status_view_mode = self._DEFAULT_STATUS_VIEW_MODE
self._stdout_output_writer = isinstance(
self._output_writer, tools.StdoutOutputWriter)
self._worker_memory_limit = None
self.dependencies_check = True
self.list_hashers = False
self.list_parsers_and_plugins = False
self.list_profilers = False
self.show_info = False
def _GetPluginData(self):
"""Retrieves the version and various plugin information.
Returns:
dict[str, list[str]]: available parsers and plugins.
"""
return_dict = {}
return_dict['Versions'] = [
('plaso engine', plaso.__version__),
('python', sys.version)]
hashers_information = hashers_manager.HashersManager.GetHashersInformation()
parsers_information = parsers_manager.ParsersManager.GetParsersInformation()
plugins_information = (
parsers_manager.ParsersManager.GetParserPluginsInformation())
presets_information = self._GetParserPresetsInformation()
return_dict['Hashers'] = hashers_information
return_dict['Parsers'] = parsers_information
return_dict['Parser Plugins'] = plugins_information
return_dict['Parser Presets'] = presets_information
return return_dict
[docs] def ParseArguments(self):
"""Parses the command line arguments.
Returns:
bool: True if the arguments were successfully parsed.
"""
loggers.ConfigureLogging()
argument_parser = argparse.ArgumentParser(
description=self.DESCRIPTION, epilog=self.EPILOG, add_help=False,
formatter_class=argparse.RawDescriptionHelpFormatter)
self.AddBasicOptions(argument_parser)
helpers_manager.ArgumentHelperManager.AddCommandLineArguments(
argument_parser, names=['storage_file'])
data_location_group = argument_parser.add_argument_group(
'data location arguments')
argument_helper_names = ['artifact_definitions', 'data_location']
helpers_manager.ArgumentHelperManager.AddCommandLineArguments(
data_location_group, names=argument_helper_names)
extraction_group = argument_parser.add_argument_group(
'extraction arguments')
argument_helper_names = [
'artifact_filters', 'extraction', 'filter_file', 'hashers',
'parsers', 'yara_rules']
helpers_manager.ArgumentHelperManager.AddCommandLineArguments(
extraction_group, names=argument_helper_names)
self.AddStorageMediaImageOptions(extraction_group)
self.AddTimeZoneOption(extraction_group)
self.AddVSSProcessingOptions(extraction_group)
self.AddCredentialOptions(extraction_group)
info_group = argument_parser.add_argument_group('informational arguments')
self.AddInformationalOptions(info_group)
info_group.add_argument(
'--info', dest='show_info', action='store_true', default=False,
help='Print out information about supported plugins and parsers.')
info_group.add_argument(
'--use_markdown', '--use-markdown', dest='use_markdown',
action='store_true', default=False, help=(
'Output lists in Markdown format use in combination with '
'"--hashers list", "--parsers list" or "--timezone list"'))
info_group.add_argument(
'--no_dependencies_check', '--no-dependencies-check',
dest='dependencies_check', action='store_false', default=True,
help='Disable the dependencies check.')
self.AddLogFileOptions(info_group)
helpers_manager.ArgumentHelperManager.AddCommandLineArguments(
info_group, names=['status_view'])
output_group = argument_parser.add_argument_group('output arguments')
helpers_manager.ArgumentHelperManager.AddCommandLineArguments(
output_group, names=['text_prepend'])
processing_group = argument_parser.add_argument_group(
'processing arguments')
self.AddPerformanceOptions(processing_group)
self.AddProcessingOptions(processing_group)
processing_group.add_argument(
'--sigsegv_handler', '--sigsegv-handler', dest='sigsegv_handler',
action='store_true', default=False, help=(
'Enables the SIGSEGV handler. WARNING this functionality is '
'experimental and will a deadlock worker process if a real '
'segfault is caught, but not signal SIGSEGV. This functionality '
'is therefore primarily intended for debugging purposes'))
profiling_group = argument_parser.add_argument_group('profiling arguments')
helpers_manager.ArgumentHelperManager.AddCommandLineArguments(
profiling_group, names=['profiling'])
storage_group = argument_parser.add_argument_group('storage arguments')
helpers_manager.ArgumentHelperManager.AddCommandLineArguments(
storage_group, names=['storage_format'])
argument_parser.add_argument(
self._SOURCE_OPTION, action='store', metavar='SOURCE', nargs='?',
default=None, type=str, help=(
'Path to a source device, file or directory. If the source is '
'a supported storage media device or image file, archive file '
'or a directory, the files within are processed recursively.'))
try:
options = argument_parser.parse_args()
except UnicodeEncodeError:
# If we get here we are attempting to print help in a non-Unicode
# terminal.
self._output_writer.Write('\n')
self._output_writer.Write(argument_parser.format_help())
return False
# Properly prepare the attributes according to local encoding.
if self.preferred_encoding == 'ascii':
logger.warning(
'The preferred encoding of your system is ASCII, which is not '
'optimal for the typically non-ASCII characters that need to be '
'parsed and processed. The tool will most likely crash and die, '
'perhaps in a way that may not be recoverable. A five second delay '
'is introduced to give you time to cancel the runtime and '
'reconfigure your preferred encoding, otherwise continue at own '
'risk.')
time.sleep(5)
if self._process_archives:
logger.warning(
'Scanning archive files currently can cause deadlock. Continue at '
'your own risk.')
time.sleep(5)
try:
self.ParseOptions(options)
except errors.BadConfigOption as exception:
self._output_writer.Write('ERROR: {0!s}\n'.format(exception))
self._output_writer.Write('\n')
self._output_writer.Write(argument_parser.format_usage())
return False
self._command_line_arguments = self.GetCommandLineArguments()
loggers.ConfigureLogging(
debug_output=self._debug_mode, filename=self._log_file,
quiet_mode=self._quiet_mode)
return True
[docs] def ParseOptions(self, options):
"""Parses the options.
Args:
options (argparse.Namespace): command line arguments.
Raises:
BadConfigOption: if the options are invalid.
"""
# The extraction options are dependent on the data location.
helpers_manager.ArgumentHelperManager.ParseOptions(
options, self, names=['data_location'])
# Check the list options first otherwise required options will raise.
argument_helper_names = ['hashers', 'parsers', 'profiling']
helpers_manager.ArgumentHelperManager.ParseOptions(
options, self, names=argument_helper_names)
self._ParseTimezoneOption(options)
self.list_hashers = self._hasher_names_string == 'list'
self.list_parsers_and_plugins = self._parser_filter_expression == 'list'
self.list_profilers = self._profilers == 'list'
self.show_info = getattr(options, 'show_info', False)
if getattr(options, 'use_markdown', False):
self._views_format_type = views.ViewsFactory.FORMAT_TYPE_MARKDOWN
self.dependencies_check = getattr(options, 'dependencies_check', True)
if (self.list_hashers or self.list_parsers_and_plugins or
self.list_profilers or self.list_timezones or self.show_info):
return
self._ParseInformationalOptions(options)
argument_helper_names = [
'artifact_definitions', 'artifact_filters', 'extraction',
'filter_file', 'status_view', 'storage_file', 'storage_format',
'text_prepend', 'yara_rules']
helpers_manager.ArgumentHelperManager.ParseOptions(
options, self, names=argument_helper_names)
self._ParseLogFileOptions(options)
self._ParseStorageMediaOptions(options)
self._ParsePerformanceOptions(options)
self._ParseProcessingOptions(options)
if not self._storage_file_path:
raise errors.BadConfigOption('Missing storage file option.')
serializer_format = getattr(
options, 'serializer_format', definitions.SERIALIZER_FORMAT_JSON)
if serializer_format not in definitions.SERIALIZER_FORMATS:
raise errors.BadConfigOption(
'Unsupported storage serializer format: {0:s}.'.format(
serializer_format))
self._storage_serializer_format = serializer_format
# TODO: where is this defined?
self._operating_system = getattr(options, 'os', None)
if self._operating_system:
self._mount_path = getattr(options, 'filename', None)
helpers_manager.ArgumentHelperManager.ParseOptions(
options, self, names=['status_view'])
self._enable_sigsegv_handler = getattr(options, 'sigsegv_handler', False)
self._EnforceProcessMemoryLimit(self._process_memory_limit)
[docs] def ExtractEventsFromSources(self):
"""Processes the sources and extracts events.
Raises:
BadConfigOption: if the storage file path is invalid or the storage
format not supported.
SourceScannerError: if the source scanner could not find a supported
file system.
UserAbort: if the user initiated an abort.
"""
self._CheckStorageFile(self._storage_file_path, warn_about_existing=True)
scan_context = self.ScanSource(self._source_path)
self._source_type = scan_context.source_type
self._status_view.SetMode(self._status_view_mode)
self._status_view.SetSourceInformation(
self._source_path, self._source_type,
artifact_filters=self._artifact_filters,
filter_file=self._filter_file)
status_update_callback = (
self._status_view.GetExtractionStatusUpdateCallback())
self._output_writer.Write('\n')
self._status_view.PrintExtractionStatusHeader(None)
self._output_writer.Write('Processing started.\n')
session = engine.BaseEngine.CreateSession(
artifact_filter_names=self._artifact_filters,
command_line_arguments=self._command_line_arguments,
debug_mode=self._debug_mode,
filter_file_path=self._filter_file,
preferred_encoding=self.preferred_encoding,
preferred_time_zone=self._preferred_time_zone,
preferred_year=self._preferred_year)
storage_writer = storage_factory.StorageFactory.CreateStorageWriter(
self._storage_format, session, self._storage_file_path)
if not storage_writer:
raise errors.BadConfigOption(
'Unsupported storage format: {0:s}'.format(self._storage_format))
single_process_mode = self._single_process_mode
if self._source_type == dfvfs_definitions.SOURCE_TYPE_FILE:
# No need to multi process a single file source.
single_process_mode = True
if single_process_mode:
extraction_engine = single_process_engine.SingleProcessEngine()
else:
extraction_engine = multi_process_engine.TaskMultiProcessEngine(
use_zeromq=self._use_zeromq)
# If the source is a directory or a storage media image
# run pre-processing.
if self._source_type in self._SOURCE_TYPES_TO_PREPROCESS:
self._PreprocessSources(extraction_engine)
configuration = self._CreateProcessingConfiguration(
extraction_engine.knowledge_base)
self._SetExtractionParsersAndPlugins(configuration, session)
self._SetExtractionPreferredTimeZone(extraction_engine.knowledge_base)
filter_find_specs = engine.BaseEngine.BuildFilterFindSpecs(
self._artifact_definitions_path, self._custom_artifacts_path,
extraction_engine.knowledge_base, self._artifact_filters,
self._filter_file)
processing_status = None
if single_process_mode:
logger.debug('Starting extraction in single process mode.')
processing_status = extraction_engine.ProcessSources(
self._source_path_specs, storage_writer, self._resolver_context,
configuration, filter_find_specs=filter_find_specs,
status_update_callback=status_update_callback)
else:
logger.debug('Starting extraction in multi process mode.')
processing_status = extraction_engine.ProcessSources(
session.identifier, self._source_path_specs, storage_writer,
configuration, enable_sigsegv_handler=self._enable_sigsegv_handler,
filter_find_specs=filter_find_specs,
number_of_worker_processes=self._number_of_extraction_workers,
status_update_callback=status_update_callback,
worker_memory_limit=self._worker_memory_limit)
self._status_view.PrintExtractionSummary(processing_status)
[docs] def ShowInfo(self):
"""Shows information about available hashers, parsers, plugins, etc."""
self._output_writer.Write(
'{0:=^80s}\n'.format(' log2timeline/plaso information '))
plugin_list = self._GetPluginData()
for header, data in plugin_list.items():
table_view = views.ViewsFactory.GetTableView(
self._views_format_type, column_names=['Name', 'Description'],
title=header)
for entry_header, entry_data in sorted(data):
table_view.AddRow([entry_header, entry_data])
table_view.Write(self._output_writer)