# -*- coding: utf-8 -*-
"""The single process processing engine."""
from __future__ import unicode_literals
import os
import pdb
import time
from dfvfs.lib import errors as dfvfs_errors
from plaso.containers import event_sources
from plaso.engine import engine
from plaso.engine import extractors
from plaso.engine import logger
from plaso.engine import process_info
from plaso.engine import worker
from plaso.lib import definitions
from plaso.parsers import mediator as parsers_mediator
[docs]class SingleProcessEngine(engine.BaseEngine):
"""Class that defines the single process engine."""
def __init__(self):
"""Initializes a single process engine."""
super(SingleProcessEngine, self).__init__()
self._current_display_name = ''
self._last_status_update_timestamp = 0.0
self._path_spec_extractor = extractors.PathSpecExtractor()
self._pid = os.getpid()
self._process_information = process_info.ProcessInfo(self._pid)
self._processing_configuration = None
self._status_update_callback = None
def _ProcessPathSpec(self, extraction_worker, parser_mediator, path_spec):
"""Processes a path specification.
Args:
extraction_worker (worker.ExtractionWorker): extraction worker.
parser_mediator (ParserMediator): parser mediator.
path_spec (dfvfs.PathSpec): path specification.
"""
self._current_display_name = parser_mediator.GetDisplayNameForPathSpec(
path_spec)
try:
extraction_worker.ProcessPathSpec(parser_mediator, path_spec)
except KeyboardInterrupt:
self._abort = True
self._processing_status.aborted = True
if self._status_update_callback:
self._status_update_callback(self._processing_status)
# We cannot recover from a CacheFullError and abort processing when
# it is raised.
except dfvfs_errors.CacheFullError:
# TODO: signal engine of failure.
self._abort = True
logger.error((
'ABORT: detected cache full error while processing '
'path spec: {0:s}').format(self._current_display_name))
# All exceptions need to be caught here to prevent the worker
# from being killed by an uncaught exception.
except Exception as exception: # pylint: disable=broad-except
parser_mediator.ProduceExtractionError((
'unable to process path specification with error: '
'{0!s}').format(exception), path_spec=path_spec)
if getattr(self._processing_configuration, 'debug_output', False):
logger.warning(
'Unhandled exception while processing path spec: {0:s}.'.format(
self._current_display_name))
logger.exception(exception)
pdb.post_mortem()
def _ProcessSources(
self, source_path_specs, extraction_worker, parser_mediator,
storage_writer, filter_find_specs=None):
"""Processes the sources.
Args:
source_path_specs (list[dfvfs.PathSpec]): path specifications of
the sources to process.
extraction_worker (worker.ExtractionWorker): extraction worker.
parser_mediator (ParserMediator): parser mediator.
storage_writer (StorageWriter): storage writer for a session storage.
filter_find_specs (Optional[list[dfvfs.FindSpec]]): find specifications
used in path specification extraction.
"""
if self._processing_profiler:
self._processing_profiler.StartTiming('process_sources')
number_of_consumed_sources = 0
self._UpdateStatus(
definitions.PROCESSING_STATUS_COLLECTING, '',
number_of_consumed_sources, storage_writer)
display_name = ''
path_spec_generator = self._path_spec_extractor.ExtractPathSpecs(
source_path_specs, find_specs=filter_find_specs,
recurse_file_system=False,
resolver_context=parser_mediator.resolver_context)
for path_spec in path_spec_generator:
if self._abort:
break
display_name = parser_mediator.GetDisplayNameForPathSpec(path_spec)
# TODO: determine if event sources should be DataStream or FileEntry
# or both.
event_source = event_sources.FileEntryEventSource(path_spec=path_spec)
storage_writer.AddEventSource(event_source)
self._UpdateStatus(
definitions.PROCESSING_STATUS_COLLECTING, display_name,
number_of_consumed_sources, storage_writer)
# Force the status update here to make sure the status is up to date.
self._UpdateStatus(
definitions.PROCESSING_STATUS_RUNNING, display_name,
number_of_consumed_sources, storage_writer, force=True)
if self._processing_profiler:
self._processing_profiler.StartTiming('get_event_source')
event_source = storage_writer.GetFirstWrittenEventSource()
if self._processing_profiler:
self._processing_profiler.StopTiming('get_event_source')
while event_source:
if self._abort:
break
self._ProcessPathSpec(
extraction_worker, parser_mediator, event_source.path_spec)
number_of_consumed_sources += 1
if self._guppy_memory_profiler:
self._guppy_memory_profiler.Sample()
self._UpdateStatus(
extraction_worker.processing_status, self._current_display_name,
number_of_consumed_sources, storage_writer)
if self._processing_profiler:
self._processing_profiler.StartTiming('get_event_source')
event_source = storage_writer.GetNextWrittenEventSource()
if self._processing_profiler:
self._processing_profiler.StopTiming('get_event_source')
if self._abort:
status = definitions.PROCESSING_STATUS_ABORTED
else:
status = definitions.PROCESSING_STATUS_COMPLETED
# Force the status update here to make sure the status is up to date
# on exit.
self._UpdateStatus(
status, '', number_of_consumed_sources, storage_writer, force=True)
if self._processing_profiler:
self._processing_profiler.StopTiming('process_sources')
def _UpdateStatus(
self, status, display_name, number_of_consumed_sources, storage_writer,
force=False):
"""Updates the processing status.
Args:
status (str): human readable status of the processing e.g. 'Idle'.
display_name (str): human readable of the file entry currently being
processed.
number_of_consumed_sources (int): number of consumed sources.
storage_writer (StorageWriter): storage writer for a session storage.
force (Optional[bool]): True if the update should be forced ignoring
the last status update time.
"""
current_timestamp = time.time()
if not force and current_timestamp < (
self._last_status_update_timestamp + self._STATUS_UPDATE_INTERVAL):
return
if status == definitions.PROCESSING_STATUS_IDLE:
status = definitions.PROCESSING_STATUS_RUNNING
used_memory = self._process_information.GetUsedMemory() or 0
self._processing_status.UpdateForemanStatus(
self._name, status, self._pid, used_memory, display_name,
number_of_consumed_sources, storage_writer.number_of_event_sources,
0, storage_writer.number_of_events,
0, 0,
0, storage_writer.number_of_errors,
0, 0)
if self._status_update_callback:
self._status_update_callback(self._processing_status)
self._last_status_update_timestamp = current_timestamp
[docs] def ProcessSources(
self, source_path_specs, storage_writer, resolver_context,
processing_configuration, filter_find_specs=None,
status_update_callback=None):
"""Processes the sources.
Args:
source_path_specs (list[dfvfs.PathSpec]): path specifications of
the sources to process.
storage_writer (StorageWriter): storage writer for a session storage.
resolver_context (dfvfs.Context): resolver context.
processing_configuration (ProcessingConfiguration): processing
configuration.
filter_find_specs (Optional[list[dfvfs.FindSpec]]): find specifications
used in path specification extraction.
status_update_callback (Optional[function]): callback function for status
updates.
Returns:
ProcessingStatus: processing status.
"""
parser_mediator = parsers_mediator.ParserMediator(
storage_writer, self.knowledge_base,
preferred_year=processing_configuration.preferred_year,
resolver_context=resolver_context,
temporary_directory=processing_configuration.temporary_directory)
parser_mediator.SetEventExtractionConfiguration(
processing_configuration.event_extraction)
parser_mediator.SetInputSourceConfiguration(
processing_configuration.input_source)
extraction_worker = worker.EventExtractionWorker(
parser_filter_expression=(
processing_configuration.parser_filter_expression))
extraction_worker.SetExtractionConfiguration(
processing_configuration.extraction)
self._processing_configuration = processing_configuration
self._status_update_callback = status_update_callback
logger.debug('Processing started.')
parser_mediator.StartProfiling(
self._processing_configuration.profiling, self._name,
self._process_information)
self._StartProfiling(self._processing_configuration.profiling)
if self._processing_profiler:
extraction_worker.SetProcessingProfiler(self._processing_profiler)
if self._serializers_profiler:
storage_writer.SetSerializersProfiler(self._serializers_profiler)
if self._storage_profiler:
storage_writer.SetStorageProfiler(self._storage_profiler)
storage_writer.Open()
storage_writer.WriteSessionStart()
try:
storage_writer.WritePreprocessingInformation(self.knowledge_base)
self._ProcessSources(
source_path_specs, extraction_worker, parser_mediator,
storage_writer, filter_find_specs=filter_find_specs)
finally:
storage_writer.WriteSessionCompletion(aborted=self._abort)
storage_writer.Close()
if self._processing_profiler:
extraction_worker.SetProcessingProfiler(None)
if self._serializers_profiler:
storage_writer.SetSerializersProfiler(None)
if self._storage_profiler:
storage_writer.SetStorageProfiler(None)
self._StopProfiling()
parser_mediator.StopProfiling()
if self._abort:
logger.debug('Processing aborted.')
self._processing_status.aborted = True
else:
logger.debug('Processing completed.')
self._processing_configuration = None
self._status_update_callback = None
return self._processing_status