Source code for plaso.cli.psort_tool

# -*- coding: utf-8 -*-
"""The psort CLI tool."""

from __future__ import unicode_literals

import argparse
import collections
import os
import sys
import time

# The following import makes sure the filters are registered.
from plaso import filters  # pylint: disable=unused-import

# The following import makes sure the formatters are registered.
from plaso import formatters  # pylint: disable=unused-import

# The following import makes sure the output modules are registered.
from plaso import output   # pylint: disable=unused-import

from plaso.analysis import manager as analysis_manager
from plaso.cli import logger
from plaso.cli import status_view
from plaso.cli import time_slices
from plaso.cli import tool_options
from plaso.cli import tools
from plaso.cli import views
from plaso.cli.helpers import manager as helpers_manager
from plaso.engine import configurations
from plaso.engine import engine
from plaso.engine import knowledge_base
from plaso.filters import manager as filters_manager
from plaso.lib import errors
from plaso.lib import loggers
from plaso.lib import timelib
from plaso.multi_processing import psort
from plaso.storage import factory as storage_factory

import pytz


[docs]class PsortTool( tools.CLITool, tool_options.AnalysisPluginOptions, tool_options.OutputModuleOptions, tool_options.ProfilingOptions, tool_options.StorageFileOptions): """Psort CLI tool. Attributes: list_analysis_plugins (bool): True if information about the analysis plugins should be shown. list_language_identifiers (bool): True if information about the language identifiers should be shown. list_output_modules (bool): True if information about the output modules should be shown. list_profilers (bool): True if the profilers should be listed. """ NAME = 'psort' DESCRIPTION = ( 'Application to read, filter and process output from a plaso storage ' 'file.') # The window status-view mode has an annoying flicker on Windows, # hence we default to linear status-view mode instead. if sys.platform.startswith('win'): _DEFAULT_STATUS_VIEW_MODE = status_view.StatusView.MODE_LINEAR else: _DEFAULT_STATUS_VIEW_MODE = status_view.StatusView.MODE_WINDOW def __init__(self, input_reader=None, output_writer=None): """Initializes the CLI tool object. Args: input_reader (Optional[InputReader]): input reader, where None indicates that the stdin input reader should be used. output_writer (Optional[OutputWriter]): output writer, where None indicates that the stdout output writer should be used. """ super(PsortTool, self).__init__( input_reader=input_reader, output_writer=output_writer) self._analysis_manager = analysis_manager.AnalysisPluginManager self._analysis_plugins = None self._analysis_plugins_output_format = None self._command_line_arguments = None self._deduplicate_events = True self._event_filter_expression = None self._event_filter = None self._knowledge_base = knowledge_base.KnowledgeBase() self._number_of_analysis_reports = 0 self._preferred_language = 'en-US' self._process_memory_limit = None self._status_view_mode = self._DEFAULT_STATUS_VIEW_MODE self._status_view = status_view.StatusView(self._output_writer, self.NAME) self._stdout_output_writer = isinstance( self._output_writer, tools.StdoutOutputWriter) self._storage_file_path = None self._temporary_directory = None self._time_slice = None self._use_time_slicer = False self._use_zeromq = True self._worker_memory_limit = None self.list_analysis_plugins = False self.list_language_identifiers = False self.list_output_modules = False self.list_profilers = False def _CheckStorageFile(self, storage_file_path): # pylint: disable=arguments-differ """Checks if the storage file path is valid. Args: storage_file_path (str): path of the storage file. Raises: BadConfigOption: if the storage file path is invalid. """ if os.path.exists(storage_file_path): if not os.path.isfile(storage_file_path): raise errors.BadConfigOption( 'Storage file: {0:s} already exists and is not a file.'.format( storage_file_path)) logger.warning('Appending to an already existing storage file.') dirname = os.path.dirname(storage_file_path) if not dirname: dirname = '.' # TODO: add a more thorough check to see if the storage file really is # a plaso storage file. if not os.access(dirname, os.W_OK): raise errors.BadConfigOption( 'Unable to write to storage file: {0:s}'.format(storage_file_path)) def _GetAnalysisPlugins(self, analysis_plugins_string): """Retrieves analysis plugins. Args: analysis_plugins_string (str): comma separated names of analysis plugins to enable. Returns: list[AnalysisPlugin]: analysis plugins. """ if not analysis_plugins_string: return [] analysis_plugins_list = [ name.strip() for name in analysis_plugins_string.split(',')] analysis_plugins = self._analysis_manager.GetPluginObjects( analysis_plugins_list) return analysis_plugins.values() def _ParseAnalysisPluginOptions(self, options): """Parses the analysis plugin options. Args: options (argparse.Namespace): command line arguments. """ # Get a list of all available plugins. analysis_plugin_info = self._analysis_manager.GetAllPluginInformation() analysis_plugin_names = set([ name.lower() for name, _, _ in analysis_plugin_info]) analysis_plugins = self.ParseStringOption(options, 'analysis_plugins') if not analysis_plugins: return requested_plugin_names = set([ name.strip().lower() for name in analysis_plugins.split(',')]) # Check to see if we are trying to load plugins that do not exist. difference = requested_plugin_names.difference(analysis_plugin_names) if difference: raise errors.BadConfigOption( 'Non-existent analysis plugins specified: {0:s}'.format( ' '.join(difference))) self._analysis_plugins = self._GetAnalysisPlugins(analysis_plugins) for analysis_plugin in self._analysis_plugins: helpers_manager.ArgumentHelperManager.ParseOptions( options, analysis_plugin) def _ParseFilterOptions(self, options): """Parses the filter options. Args: options (argparse.Namespace): command line arguments. Raises: BadConfigOption: if the options are invalid. """ self._event_filter_expression = self.ParseStringOption(options, 'filter') if self._event_filter_expression: self._event_filter = filters_manager.FiltersManager.GetFilterObject( self._event_filter_expression) if not self._event_filter: raise errors.BadConfigOption('Invalid filter expression: {0:s}'.format( self._event_filter_expression)) time_slice_event_time_string = getattr(options, 'slice', None) time_slice_duration = getattr(options, 'slice_size', 5) self._use_time_slicer = getattr(options, 'slicer', False) # The slice and slicer cannot be set at the same time. if time_slice_event_time_string and self._use_time_slicer: raise errors.BadConfigOption( 'Time slice and slicer cannot be used at the same time.') time_slice_event_timestamp = None if time_slice_event_time_string: # Note self._preferred_time_zone is None when not set but represents UTC. preferred_time_zone = self._preferred_time_zone or 'UTC' timezone = pytz.timezone(preferred_time_zone) time_slice_event_timestamp = timelib.Timestamp.FromTimeString( time_slice_event_time_string, timezone=timezone) if time_slice_event_timestamp is None: raise errors.BadConfigOption( 'Unsupported time slice event date and time: {0:s}'.format( time_slice_event_time_string)) if time_slice_event_timestamp is not None or self._use_time_slicer: # Note that time slicer uses the time slice to determine the duration. self._time_slice = time_slices.TimeSlice( time_slice_event_timestamp, duration=time_slice_duration) def _ParseInformationalOptions(self, options): """Parses the informational options. Args: options (argparse.Namespace): command line arguments. Raises: BadConfigOption: if the options are invalid. """ super(PsortTool, self)._ParseInformationalOptions(options) self._quiet_mode = getattr(options, 'quiet', False) helpers_manager.ArgumentHelperManager.ParseOptions( options, self, names=['status_view']) def _ParseProcessingOptions(self, options): """Parses the processing options. Args: options (argparse.Namespace): command line arguments. Raises: BadConfigOption: if the options are invalid. """ argument_helper_names = [ 'process_resources', 'temporary_directory', 'zeromq'] helpers_manager.ArgumentHelperManager.ParseOptions( options, self, names=argument_helper_names) worker_memory_limit = getattr(options, 'worker_memory_limit', None) if worker_memory_limit and worker_memory_limit < 0: raise errors.BadConfigOption( 'Invalid worker memory limit value cannot be negative.') self._worker_memory_limit = worker_memory_limit def _PrintAnalysisReportsDetails(self, storage_reader): """Prints the details of the analysis reports. Args: storage_reader (StorageReader): storage reader. """ for index, analysis_report in enumerate( storage_reader.GetAnalysisReports()): if index + 1 <= self._number_of_analysis_reports: continue title = 'Analysis report: {0:d}'.format(index) table_view = views.ViewsFactory.GetTableView( self._views_format_type, title=title) table_view.AddRow(['String', analysis_report.GetString()]) table_view.Write(self._output_writer)
[docs] def AddProcessingOptions(self, argument_group): """Adds processing options to the argument group Args: argument_group (argparse._ArgumentGroup): argparse argument group. """ argument_helper_names = ['temporary_directory', 'zeromq'] if self._CanEnforceProcessMemoryLimit(): argument_helper_names.append('process_resources') helpers_manager.ArgumentHelperManager.AddCommandLineArguments( argument_group, names=argument_helper_names) argument_group.add_argument( '--worker-memory-limit', '--worker_memory_limit', dest='worker_memory_limit', action='store', type=int, metavar='SIZE', help=( 'Maximum amount of memory (data segment and shared memory) ' 'a worker process is allowed to consume in bytes, where 0 ' 'represents no limit. The default limit is 2147483648 (2 GiB). ' 'If a worker process exceeds this limit is is killed by the main '
'(foreman) process.'))
[docs] def ParseArguments(self): """Parses the command line arguments. Returns: bool: True if the arguments were successfully parsed. """ loggers.ConfigureLogging() argument_parser = argparse.ArgumentParser( description=self.DESCRIPTION, add_help=False, conflict_handler='resolve', formatter_class=argparse.RawDescriptionHelpFormatter) self.AddBasicOptions(argument_parser) helpers_manager.ArgumentHelperManager.AddCommandLineArguments( argument_parser, names=['storage_file']) analysis_group = argument_parser.add_argument_group('Analysis Arguments') helpers_manager.ArgumentHelperManager.AddCommandLineArguments( analysis_group, names=['analysis_plugins']) processing_group = argument_parser.add_argument_group('Processing') self.AddProcessingOptions(processing_group) info_group = argument_parser.add_argument_group('Informational Arguments') self.AddLogFileOptions(info_group) self.AddInformationalOptions(info_group) helpers_manager.ArgumentHelperManager.AddCommandLineArguments( info_group, names=['status_view']) filter_group = argument_parser.add_argument_group('Filter Arguments') helpers_manager.ArgumentHelperManager.AddCommandLineArguments( filter_group, names=['event_filters']) input_group = argument_parser.add_argument_group('Input Arguments') helpers_manager.ArgumentHelperManager.AddCommandLineArguments( input_group, names=['data_location']) output_group = argument_parser.add_argument_group('Output Arguments') output_group.add_argument( '-a', '--include_all', '--include-all', action='store_false', dest='dedup', default=True, help=( 'By default the psort removes duplicate entries from the ' 'output. This parameter changes that behavior so all events ' 'are included.')) helpers_manager.ArgumentHelperManager.AddCommandLineArguments( output_group, names=['language']) self.AddTimeZoneOption(output_group) output_format_group = argument_parser.add_argument_group( 'Output Format Arguments') helpers_manager.ArgumentHelperManager.AddCommandLineArguments( output_format_group, names=['output_modules']) profiling_group = argument_parser.add_argument_group('profiling arguments') helpers_manager.ArgumentHelperManager.AddCommandLineArguments( profiling_group, names=['profiling']) try: # TODO: refactor how arguments is used in a more argparse way. options = argument_parser.parse_args() except UnicodeEncodeError: # If we get here we are attempting to print help in a non-Unicode # terminal. self._output_writer.Write('\n') self._output_writer.Write(argument_parser.format_help()) return False # Properly prepare the attributes according to local encoding. if self.preferred_encoding == 'ascii': logger.warning( 'The preferred encoding of your system is ASCII, which is not ' 'optimal for the typically non-ASCII characters that need to be ' 'parsed and processed. The tool will most likely crash and die, ' 'perhaps in a way that may not be recoverable. A five second delay ' 'is introduced to give you time to cancel the runtime and ' 'reconfigure your preferred encoding, otherwise continue at own ' 'risk.') time.sleep(5) try: self.ParseOptions(options) except errors.BadConfigOption as exception: self._output_writer.Write('ERROR: {0!s}\n'.format(exception)) self._output_writer.Write('\n') self._output_writer.Write(argument_parser.format_usage()) return False loggers.ConfigureLogging( debug_output=self._debug_mode, filename=self._log_file, quiet_mode=self._quiet_mode)
return True
[docs] def ParseOptions(self, options): """Parses the options. Args: options (argparse.Namespace): command line arguments. Raises: BadConfigOption: if the options are invalid. """ # The output modules options are dependent on the preferred language # and preferred time zone options. self._ParseTimezoneOption(options) names = ['analysis_plugins', 'language', 'profiling'] helpers_manager.ArgumentHelperManager.ParseOptions( options, self, names=names) self.list_analysis_plugins = self._analysis_plugins == 'list' self.list_language_identifiers = self._preferred_language == 'list' self.list_profilers = self._profilers == 'list' if (self.list_analysis_plugins or self.list_language_identifiers or self.list_profilers or self.list_timezones): return # Check output modules after the other listable options, otherwise # it could raise with "requires an output file". helpers_manager.ArgumentHelperManager.ParseOptions( options, self, names=['output_modules']) self.list_output_modules = self._output_format == 'list' if self.list_output_modules: return self._ParseInformationalOptions(options) helpers_manager.ArgumentHelperManager.ParseOptions( options, self, names=['data_location']) self._ParseLogFileOptions(options) self._ParseProcessingOptions(options) helpers_manager.ArgumentHelperManager.ParseOptions( options, self, names=['event_filters']) self._deduplicate_events = getattr(options, 'dedup', True) if self._data_location: # Update the data location with the calculated value. options.data_location = self._data_location else: logger.warning('Unable to automatically determine data location.') self._command_line_arguments = self.GetCommandLineArguments() helpers_manager.ArgumentHelperManager.ParseOptions( options, self, names=['storage_file']) # TODO: move check into _CheckStorageFile. if not self._storage_file_path: raise errors.BadConfigOption('Missing storage file option.') if not os.path.isfile(self._storage_file_path): raise errors.BadConfigOption( 'No such storage file: {0:s}.'.format(self._storage_file_path)) self._EnforceProcessMemoryLimit(self._process_memory_limit) self._analysis_plugins = self._CreateAnalysisPlugins(options)
self._output_module = self._CreateOutputModule(options)
[docs] def ProcessStorage(self): """Processes a plaso storage file. Raises: BadConfigOption: when a configuration parameter fails validation. RuntimeError: if a non-recoverable situation is encountered. """ self._CheckStorageFile(self._storage_file_path) self._status_view.SetMode(self._status_view_mode) self._status_view.SetStorageFileInformation(self._storage_file_path) status_update_callback = ( self._status_view.GetAnalysisStatusUpdateCallback()) session = engine.BaseEngine.CreateSession( command_line_arguments=self._command_line_arguments, preferred_encoding=self.preferred_encoding) storage_reader = storage_factory.StorageFactory.CreateStorageReaderForFile( self._storage_file_path) if not storage_reader: logger.error('Format of storage file: {0:s} not supported'.format( self._storage_file_path)) return self._number_of_analysis_reports = ( storage_reader.GetNumberOfAnalysisReports()) storage_reader.Close() configuration = configurations.ProcessingConfiguration() configuration.data_location = self._data_location configuration.profiling.directory = self._profiling_directory configuration.profiling.sample_rate = self._profiling_sample_rate configuration.profiling.profilers = self._profilers analysis_counter = None if self._analysis_plugins: storage_writer = ( storage_factory.StorageFactory.CreateStorageWriterForFile( session, self._storage_file_path)) # TODO: add single processing support. analysis_engine = psort.PsortMultiProcessEngine( use_zeromq=self._use_zeromq) analysis_engine.AnalyzeEvents( self._knowledge_base, storage_writer, self._data_location, self._analysis_plugins, configuration, event_filter=self._event_filter, event_filter_expression=self._event_filter_expression, status_update_callback=status_update_callback, worker_memory_limit=self._worker_memory_limit) analysis_counter = collections.Counter() for item, value in iter(session.analysis_reports_counter.items()): analysis_counter[item] = value events_counter = None if self._output_format != 'null': storage_reader = ( storage_factory.StorageFactory.CreateStorageReaderForFile( self._storage_file_path)) # TODO: add single processing support. analysis_engine = psort.PsortMultiProcessEngine( use_zeromq=self._use_zeromq) events_counter = analysis_engine.ExportEvents( self._knowledge_base, storage_reader, self._output_module, configuration, deduplicate_events=self._deduplicate_events, event_filter=self._event_filter, status_update_callback=status_update_callback, time_slice=self._time_slice, use_time_slicer=self._use_time_slicer) if self._quiet_mode: return self._output_writer.Write('Processing completed.\n') if analysis_counter: table_view = views.ViewsFactory.GetTableView( self._views_format_type, title='Analysis reports generated') for element, count in analysis_counter.most_common(): if element != 'total': table_view.AddRow([element, count]) table_view.AddRow(['Total', analysis_counter['total']]) table_view.Write(self._output_writer) if events_counter: table_view = views.ViewsFactory.GetTableView( self._views_format_type, title='Export results') for element, count in events_counter.most_common(): table_view.AddRow([element, count]) table_view.Write(self._output_writer) storage_reader = storage_factory.StorageFactory.CreateStorageReaderForFile( self._storage_file_path)
self._PrintAnalysisReportsDetails(storage_reader)