Source code for plaso.cli.psteal_tool

# -*- coding: utf-8 -*-
"""The psteal CLI tool."""

from __future__ import unicode_literals

import argparse
import collections
import datetime
import os
import sys
import textwrap

from dfvfs.lib import definitions as dfvfs_definitions

# The following import makes sure the output modules are registered.
from plaso import output  # pylint: disable=unused-import

from plaso.cli import extraction_tool
from plaso.cli import logger
from plaso.cli import status_view
from plaso.cli import tool_options
from plaso.cli import views
from plaso.cli.helpers import manager as helpers_manager
from plaso.engine import engine
from plaso.engine import knowledge_base
from plaso.engine import single_process as single_process_engine
from plaso.lib import errors
from plaso.lib import loggers
from plaso.multi_processing import psort
from plaso.multi_processing import task_engine as multi_process_engine
from plaso.parsers import manager as parsers_manager
from plaso.storage import factory as storage_factory


[docs]class PstealTool( extraction_tool.ExtractionTool, tool_options.HashersOptions, tool_options.OutputModuleOptions, tool_options.ParsersOptions, tool_options.StorageFileOptions): """Psteal CLI tool. Psteal extract events from the provided source and stores them in an intermediate storage file. After extraction an output log file is created. This mimics the behaviour of the log2timeline.pl. The tool currently doesn't support any of the log2timeline or psort tools' flags. Attributes: dependencies_check (bool): True if the availability and versions of dependencies should be checked. list_hashers (bool): True if the hashers should be listed. list_language_identifiers (bool): True if information about the language identifiers should be shown. list_output_modules (bool): True if information about the output modules should be shown. list_parsers_and_plugins (bool): True if the parsers and plugins should be listed. """ NAME = 'psteal' # TODO: is textwrap.dedent or the join really needed here? DESCRIPTION = textwrap.dedent('\n'.join([ '', ('psteal is a command line tool to extract events from individual '), 'files, recursing a directory (e.g. mount point) or storage media ', 'image or device. The output events will be stored in a storage file.', 'This tool will then read the output and process the events into a CSV ', 'file.', '', 'More information can be gathered from here:', ' https://github.com/log2timeline/plaso/wiki/Using-log2timeline', ''])) EPILOG = textwrap.dedent('\n'.join([ '', 'Example usage:', '', 'Run the tool against a storage media image (full kitchen sink)', ' psteal.py --source ímynd.dd -w imynd.timeline.txt', '', 'And that is how you build a timeline using psteal...', ''])) # The window status-view mode has an annoying flicker on Windows, # hence we default to linear status-view mode instead. if sys.platform.startswith('win'): _DEFAULT_STATUS_VIEW_MODE = status_view.StatusView.MODE_LINEAR else: _DEFAULT_STATUS_VIEW_MODE = status_view.StatusView.MODE_WINDOW _SOURCE_TYPES_TO_PREPROCESS = frozenset([ dfvfs_definitions.SOURCE_TYPE_DIRECTORY, dfvfs_definitions.SOURCE_TYPE_STORAGE_MEDIA_DEVICE, dfvfs_definitions.SOURCE_TYPE_STORAGE_MEDIA_IMAGE]) def __init__(self, input_reader=None, output_writer=None): """Initializes the CLI tool object. Args: input_reader (Optional[InputReader]): input reader, where None indicates that the stdin input reader should be used. output_writer (Optional[OutputWriter]): output writer, where None indicates that the stdout output writer should be used. """ super(PstealTool, self).__init__( input_reader=input_reader, output_writer=output_writer) self._artifacts_registry = None self._command_line_arguments = None self._deduplicate_events = True self._enable_sigsegv_handler = False self._knowledge_base = knowledge_base.KnowledgeBase() self._number_of_analysis_reports = 0 self._number_of_extraction_workers = 0 self._output_format = None self._parsers_manager = parsers_manager.ParsersManager self._preferred_language = 'en-US' self._preferred_year = None self._status_view_mode = self._DEFAULT_STATUS_VIEW_MODE self._status_view = status_view.StatusView(self._output_writer, self.NAME) self._time_slice = None self._use_time_slicer = False self.list_hashers = False self.list_language_identifiers = False self.list_output_modules = False self.list_parsers_and_plugins = False self.list_timezones = False def _GenerateStorageFileName(self): """Generates a name for the storage file. The result use a timestamp and the basename of the source path. Returns: str: a filename for the storage file in the form <time>-<source>.plaso Raises: BadConfigOption: raised if the source path is not set. """ if not self._source_path: raise errors.BadConfigOption('Please define a source (--source).') timestamp = datetime.datetime.now() datetime_string = timestamp.strftime('%Y%m%dT%H%M%S') source_path = os.path.abspath(self._source_path) if source_path.endswith(os.path.sep): source_path = os.path.dirname(source_path) source_name = os.path.basename(source_path) if not source_name or source_name in ('/', '\\'): # The user passed the filesystem's root as source source_name = 'ROOT' return '{0:s}-{1:s}.plaso'.format(datetime_string, source_name) def _PrintAnalysisReportsDetails( self, storage_reader, number_of_analysis_reports): """Prints the details of the analysis reports. Args: storage_reader (StorageReader): storage reader. number_of_analysis_reports (int): number of analysis reports. """ for index, analysis_report in enumerate( storage_reader.GetAnalysisReports()): if index + 1 <= number_of_analysis_reports: continue title = 'Analysis report: {0:d}'.format(index) table_view = views.ViewsFactory.GetTableView( self._views_format_type, title=title) table_view.AddRow(['String', analysis_report.GetString()]) table_view.Write(self._output_writer)
[docs] def AnalyzeEvents(self): """Analyzes events from a plaso storage file and generate a report. Raises: BadConfigOption: when a configuration parameter fails validation. RuntimeError: if a non-recoverable situation is encountered. """ session = engine.BaseEngine.CreateSession( command_line_arguments=self._command_line_arguments, preferred_encoding=self.preferred_encoding) storage_reader = storage_factory.StorageFactory.CreateStorageReaderForFile( self._storage_file_path) if not storage_reader: logger.error('Format of storage file: {0:s} not supported'.format( self._storage_file_path)) return self._number_of_analysis_reports = ( storage_reader.GetNumberOfAnalysisReports()) storage_reader.Close() configuration = self._CreateProcessingConfiguration( self._knowledge_base) counter = collections.Counter() if self._output_format != 'null': self._status_view.SetMode(self._status_view_mode) self._status_view.SetStorageFileInformation(self._storage_file_path) status_update_callback = ( self._status_view.GetAnalysisStatusUpdateCallback()) storage_reader = ( storage_factory.StorageFactory.CreateStorageReaderForFile( self._storage_file_path)) # TODO: add single processing support. analysis_engine = psort.PsortMultiProcessEngine( use_zeromq=self._use_zeromq) events_counter = analysis_engine.ExportEvents( self._knowledge_base, storage_reader, self._output_module, configuration, deduplicate_events=self._deduplicate_events, status_update_callback=status_update_callback, time_slice=self._time_slice, use_time_slicer=self._use_time_slicer) counter += events_counter for item, value in iter(session.analysis_reports_counter.items()): counter[item] = value if self._quiet_mode: return self._output_writer.Write('Processing completed.\n') table_view = views.ViewsFactory.GetTableView( self._views_format_type, title='Counter') for element, count in counter.most_common(): if not element: element = 'N/A' table_view.AddRow([element, count]) table_view.Write(self._output_writer) storage_reader = storage_factory.StorageFactory.CreateStorageReaderForFile( self._storage_file_path) self._PrintAnalysisReportsDetails( storage_reader, self._number_of_analysis_reports) self._output_writer.Write('Storage file is {0:s}\n'.format(
self._storage_file_path))
[docs] def ExtractEventsFromSources(self): """Processes the sources and extract events. This is a stripped down copy of tools/log2timeline.py that doesn't support the full set of flags. The defaults for these are hard coded in the constructor of this class. Raises: BadConfigOption: if the storage format is not supported. SourceScannerError: if the source scanner could not find a supported file system. UserAbort: if the user initiated an abort. """ self._CheckStorageFile(self._storage_file_path, warn_about_existing=True) scan_context = self.ScanSource(self._source_path) source_type = scan_context.source_type self._status_view.SetMode(self._status_view_mode) self._status_view.SetSourceInformation( self._source_path, source_type, artifact_filters=self._artifact_filters, filter_file=self._filter_file) status_update_callback = ( self._status_view.GetExtractionStatusUpdateCallback()) self._output_writer.Write('\n') self._status_view.PrintExtractionStatusHeader(None) self._output_writer.Write('Processing started.\n') session = engine.BaseEngine.CreateSession( artifact_filter_names=self._artifact_filters, command_line_arguments=self._command_line_arguments, filter_file_path=self._filter_file, preferred_encoding=self.preferred_encoding, preferred_time_zone=self._preferred_time_zone, preferred_year=self._preferred_year) storage_writer = storage_factory.StorageFactory.CreateStorageWriter( self._storage_format, session, self._storage_file_path) if not storage_writer: raise errors.BadConfigOption( 'Unsupported storage format: {0:s}'.format(self._storage_format)) single_process_mode = self._single_process_mode if source_type == dfvfs_definitions.SOURCE_TYPE_FILE: # No need to multi process a single file source. single_process_mode = True if single_process_mode: extraction_engine = single_process_engine.SingleProcessEngine() else: extraction_engine = multi_process_engine.TaskMultiProcessEngine( use_zeromq=self._use_zeromq) # If the source is a directory or a storage media image # run pre-processing. if source_type in self._SOURCE_TYPES_TO_PREPROCESS: self._PreprocessSources(extraction_engine) configuration = self._CreateProcessingConfiguration( extraction_engine.knowledge_base) self._SetExtractionParsersAndPlugins(configuration, session) self._SetExtractionPreferredTimeZone(extraction_engine.knowledge_base) filter_find_specs = engine.BaseEngine.BuildFilterFindSpecs( self._artifact_definitions_path, self._custom_artifacts_path, extraction_engine.knowledge_base, self._artifact_filters, self._filter_file) processing_status = None if single_process_mode: logger.debug('Starting extraction in single process mode.') processing_status = extraction_engine.ProcessSources( self._source_path_specs, storage_writer, self._resolver_context, configuration, filter_find_specs=filter_find_specs, status_update_callback=status_update_callback) else: logger.debug('Starting extraction in multi process mode.') processing_status = extraction_engine.ProcessSources( session.identifier, self._source_path_specs, storage_writer, configuration, enable_sigsegv_handler=self._enable_sigsegv_handler, filter_find_specs=filter_find_specs, number_of_worker_processes=self._number_of_extraction_workers, status_update_callback=status_update_callback)
self._status_view.PrintExtractionSummary(processing_status)
[docs] def ParseArguments(self): """Parses the command line arguments. Returns: bool: True if the arguments were successfully parsed. """ loggers.ConfigureLogging() argument_parser = argparse.ArgumentParser( description=self.DESCRIPTION, epilog=self.EPILOG, add_help=False, formatter_class=argparse.RawDescriptionHelpFormatter) self.AddBasicOptions(argument_parser) extraction_group = argument_parser.add_argument_group( 'extraction arguments') argument_helper_names = ['extraction'] helpers_manager.ArgumentHelperManager.AddCommandLineArguments( extraction_group, names=argument_helper_names) extraction_group.add_argument( '--storage_file', '--storage-file', metavar='PATH', type=str, default=None, help=( 'The path of the storage file. If not specified, one will be made ' 'in the form <timestamp>-<source>.plaso')) self.AddStorageMediaImageOptions(extraction_group) self.AddCredentialOptions(extraction_group) info_group = argument_parser.add_argument_group('informational arguments') helpers_manager.ArgumentHelperManager.AddCommandLineArguments( info_group, names=['status_view']) input_group = argument_parser.add_argument_group('input arguments') input_group.add_argument( '--source', dest='source', action='store', type=str, help='The source to process') helpers_manager.ArgumentHelperManager.AddCommandLineArguments( input_group, names=['data_location']) output_group = argument_parser.add_argument_group('output arguments') helpers_manager.ArgumentHelperManager.AddCommandLineArguments( output_group, names=['language']) self.AddTimeZoneOption(output_group) output_format_group = argument_parser.add_argument_group( 'output format arguments') helpers_manager.ArgumentHelperManager.AddCommandLineArguments( output_format_group, names=['output_modules']) processing_group = argument_parser.add_argument_group( 'processing arguments') self.AddPerformanceOptions(processing_group) self.AddProcessingOptions(processing_group) try: options = argument_parser.parse_args() except UnicodeEncodeError: # If we get here we are attempting to print help in a non-Unicode # terminal. self._output_writer.Write('\n') self._output_writer.Write(argument_parser.format_help()) return False try: self.ParseOptions(options) except errors.BadConfigOption as exception: self._output_writer.Write('ERROR: {0!s}\n'.format(exception)) self._output_writer.Write('\n') self._output_writer.Write(argument_parser.format_usage()) return False loggers.ConfigureLogging( debug_output=self._debug_mode, filename=self._log_file, quiet_mode=self._quiet_mode)
return True
[docs] def ParseOptions(self, options): """Parses tool specific options. Args: options (argparse.Namespace): command line arguments. Raises: BadConfigOption: if the options are invalid. """ # The extraction options are dependent on the data location. helpers_manager.ArgumentHelperManager.ParseOptions( options, self, names=['data_location']) # The output modules options are dependent on the preferred language # and preferred time zone options. self._ParseTimezoneOption(options) argument_helper_names = [ 'artifact_definitions', 'hashers', 'language', 'parsers'] helpers_manager.ArgumentHelperManager.ParseOptions( options, self, names=argument_helper_names) self.list_hashers = self._hasher_names_string == 'list' self.list_language_identifiers = self._preferred_language == 'list' self.list_parsers_and_plugins = self._parser_filter_expression == 'list' # Check the list options first otherwise required options will raise. if (self.list_hashers or self.list_language_identifiers or self.list_parsers_and_plugins or self.list_timezones): return # Check output modules after the other listable options, otherwise # it could raise with "requires an output file". helpers_manager.ArgumentHelperManager.ParseOptions( options, self, names=['output_modules']) self.list_output_modules = self._output_format == 'list' if self.list_output_modules: return self._ParseInformationalOptions(options) argument_helper_names = ['extraction', 'status_view'] helpers_manager.ArgumentHelperManager.ParseOptions( options, self, names=argument_helper_names) self._ParseLogFileOptions(options) self._ParseStorageMediaOptions(options) self._ParsePerformanceOptions(options) self._ParseProcessingOptions(options) self._storage_file_path = getattr(options, 'storage_file', None) if not self._storage_file_path: self._storage_file_path = self._GenerateStorageFileName() self._output_filename = getattr(options, 'write', None) if not self._output_filename: raise errors.BadConfigOption(( 'Output format: {0:s} requires an output file ' '(-w OUTPUT_FILE)').format(self._output_format)) if os.path.exists(self._output_filename): raise errors.BadConfigOption( 'Output file already exists: {0:s}.'.format(self._output_filename)) self._EnforceProcessMemoryLimit(self._process_memory_limit)
self._output_module = self._CreateOutputModule(options)