# -*- coding: utf-8 -*-
"""The psort multi-processing engine."""
from __future__ import unicode_literals
import collections
import heapq
import os
import time
from plaso.engine import plaso_queue
from plaso.engine import zeromq_queue
from plaso.containers import tasks
from plaso.lib import bufferlib
from plaso.lib import definitions
from plaso.lib import py2to3
from plaso.multi_processing import analysis_process
from plaso.multi_processing import engine as multi_process_engine
from plaso.multi_processing import logger
from plaso.multi_processing import multi_process_queue
from plaso.storage import event_tag_index
from plaso.storage import time_range as storage_time_range
[docs]class PsortEventHeap(object):
"""Psort event heap."""
_IDENTIFIER_EXCLUDED_ATTRIBUTES = frozenset([
'data_type',
'display_name',
'filename',
'inode',
'parser',
'tag',
'timestamp',
'timestamp_desc'])
def __init__(self):
"""Initializes a psort events heap."""
super(PsortEventHeap, self).__init__()
self._heap = []
@property
def number_of_events(self):
"""int: number of events on the heap."""
return len(self._heap)
def _GetEventIdentifiers(self, event):
"""Retrieves different identifiers of the event.
Every event contains event data, which consists of attributes and values.
These attributes and values can be represented as a string and used for
sorting and uniquely identifying events. This function determines multiple
identifiers:
* an identifier of the attributes and values without the timestamp
description (or usage). This is referred to as the MACB group
identifier.
* an identifier of the attributes and values including the timestamp
description (or usage). This is referred to as the event content
identifier.
The identifier without the timestamp description can be used to group
events that have the same MACB (modification, access, change, birth)
timestamps. The PsortEventHeap will store these events individually and
relies on PsortMultiProcessEngine to do the actual grouping of events.
Args:
event (EventObject): event.
Returns:
tuple: containing:
str: identifier of the event MACB group or None if the event cannot
be grouped.
str: identifier of the event content.
"""
attributes = []
attribute_string = 'data_type: {0:s}'.format(event.data_type)
attributes.append(attribute_string)
for attribute_name, attribute_value in sorted(event.GetAttributes()):
if attribute_name in self._IDENTIFIER_EXCLUDED_ATTRIBUTES:
continue
if not attribute_value:
continue
if attribute_name == 'pathspec':
attribute_value = attribute_value.comparable
elif isinstance(attribute_value, dict):
attribute_value = sorted(attribute_value.items())
elif isinstance(attribute_value, set):
attribute_value = sorted(list(attribute_value))
elif isinstance(attribute_value, py2to3.BYTES_TYPE):
attribute_value = repr(attribute_value)
try:
attribute_string = '{0:s}: {1!s}'.format(
attribute_name, attribute_value)
except UnicodeDecodeError:
logger.error('Failed to decode attribute {0:s}'.format(
attribute_name))
attributes.append(attribute_string)
# The 'atime', 'ctime', 'crtime', 'mtime' are included for backwards
# compatibility with the filestat parser.
if event.timestamp_desc in (
'atime', 'ctime', 'crtime', 'mtime',
definitions.TIME_DESCRIPTION_LAST_ACCESS,
definitions.TIME_DESCRIPTION_CHANGE,
definitions.TIME_DESCRIPTION_CREATION,
definitions.TIME_DESCRIPTION_MODIFICATION):
macb_group_identifier = ', '.join(attributes)
else:
macb_group_identifier = None
attributes.insert(0, event.timestamp_desc)
content_identifier = ', '.join(attributes)
return macb_group_identifier, content_identifier
[docs] def PopEvent(self):
"""Pops an event from the heap.
Returns:
tuple: containing:
str: identifier of the event MACB group or None if the event cannot
be grouped.
str: identifier of the event content.
EventObject: event.
"""
try:
macb_group_identifier, content_identifier, event = heapq.heappop(
self._heap)
if macb_group_identifier == '':
macb_group_identifier = None
return macb_group_identifier, content_identifier, event
except IndexError:
return None
[docs] def PopEvents(self):
"""Pops events from the heap.
Yields:
EventObject: event.
"""
event = self.PopEvent()
while event:
yield event
event = self.PopEvent()
[docs] def PushEvent(self, event):
"""Pushes an event onto the heap.
Args:
event (EventObject): event.
"""
macb_group_identifier, content_identifier = self._GetEventIdentifiers(event)
# We can ignore the timestamp here because the psort engine only stores
# events with the same timestamp in the event heap.
heap_values = (macb_group_identifier or '', content_identifier, event)
heapq.heappush(self._heap, heap_values)
[docs]class PsortMultiProcessEngine(multi_process_engine.MultiProcessEngine):
"""Psort multi-processing engine."""
_PROCESS_JOIN_TIMEOUT = 5.0
_PROCESS_WORKER_TIMEOUT = 15.0 * 60.0
_QUEUE_TIMEOUT = 10 * 60
def __init__(self, use_zeromq=True):
"""Initializes an engine object.
Args:
use_zeromq (Optional[bool]): True if ZeroMQ should be used for queuing
instead of Python's multiprocessing queue.
"""
super(PsortMultiProcessEngine, self).__init__()
self._analysis_plugins = {}
self._completed_analysis_processes = set()
self._data_location = None
self._event_filter_expression = None
self._event_queues = {}
self._event_tag_index = event_tag_index.EventTagIndex()
# The export event heap is used to make sure the events are sorted in
# a deterministic way.
self._export_event_heap = PsortEventHeap()
self._export_event_timestamp = 0
self._guppy_memory_profiler = None
self._knowledge_base = None
self._memory_profiler = None
self._merge_task = None
self._number_of_consumed_errors = 0
self._number_of_consumed_events = 0
self._number_of_consumed_event_tags = 0
self._number_of_consumed_reports = 0
self._number_of_consumed_sources = 0
self._number_of_duplicate_events = 0
self._number_of_macb_grouped_events = 0
self._number_of_produced_errors = 0
self._number_of_produced_events = 0
self._number_of_produced_event_tags = 0
self._number_of_produced_reports = 0
self._number_of_produced_sources = 0
self._processing_configuration = None
self._processing_profiler = None
self._serializers_profiler = None
self._status = definitions.PROCESSING_STATUS_IDLE
self._status_update_callback = None
self._use_zeromq = use_zeromq
self._worker_memory_limit = definitions.DEFAULT_WORKER_MEMORY_LIMIT
def _AnalyzeEvents(self, storage_writer, analysis_plugins, event_filter=None):
"""Analyzes events in a plaso storage.
Args:
storage_writer (StorageWriter): storage writer.
analysis_plugins (dict[str, AnalysisPlugin]): analysis plugins that
should be run and their names.
event_filter (Optional[FilterObject]): event filter.
Returns:
collections.Counter: counter containing information about the events
processed and filtered.
Raises:
RuntimeError: if a non-recoverable situation is encountered.
"""
self._status = definitions.PROCESSING_STATUS_RUNNING
self._number_of_consumed_errors = 0
self._number_of_consumed_events = 0
self._number_of_consumed_reports = 0
self._number_of_consumed_sources = 0
self._number_of_produced_errors = 0
self._number_of_produced_events = 0
self._number_of_produced_reports = 0
self._number_of_produced_sources = 0
number_of_filtered_events = 0
logger.debug('Processing events.')
filter_limit = getattr(event_filter, 'limit', None)
for event in storage_writer.GetSortedEvents():
event_data_identifier = event.GetEventDataIdentifier()
if event_data_identifier:
event_data = storage_writer.GetEventDataByIdentifier(
event_data_identifier)
if event_data:
for attribute_name, attribute_value in event_data.GetAttributes():
setattr(event, attribute_name, attribute_value)
event_identifier = event.GetIdentifier()
event.tag = self._event_tag_index.GetEventTagByIdentifier(
storage_writer, event_identifier)
if event_filter:
filter_match = event_filter.Match(event)
else:
filter_match = None
# pylint: disable=singleton-comparison
if filter_match == False:
number_of_filtered_events += 1
continue
for event_queue in self._event_queues.values():
# TODO: Check for premature exit of analysis plugins.
event_queue.PushItem(event)
self._number_of_consumed_events += 1
if (event_filter and filter_limit and
filter_limit == self._number_of_consumed_events):
break
logger.debug('Finished pushing events to analysis plugins.')
# Signal that we have finished adding events.
for event_queue in self._event_queues.values():
event_queue.PushItem(plaso_queue.QueueAbort(), block=False)
logger.debug('Processing analysis plugin results.')
# TODO: use a task based approach.
plugin_names = [plugin_name for plugin_name in analysis_plugins.keys()]
while plugin_names:
for plugin_name in list(plugin_names):
if self._abort:
break
# TODO: temporary solution.
task = tasks.Task()
task.identifier = plugin_name
merge_ready = storage_writer.CheckTaskReadyForMerge(task)
if merge_ready:
storage_writer.PrepareMergeTaskStorage(task)
self._status = definitions.PROCESSING_STATUS_MERGING
event_queue = self._event_queues[plugin_name]
del self._event_queues[plugin_name]
event_queue.Close()
storage_merge_reader = storage_writer.StartMergeTaskStorage(task)
storage_merge_reader.MergeAttributeContainers(
callback=self._MergeEventTag)
# TODO: temporary solution.
plugin_names.remove(plugin_name)
self._status = definitions.PROCESSING_STATUS_RUNNING
self._number_of_produced_event_tags = (
storage_writer.number_of_event_tags)
self._number_of_produced_reports = (
storage_writer.number_of_analysis_reports)
try:
storage_writer.StopTaskStorage(abort=self._abort)
except (IOError, OSError) as exception:
logger.error('Unable to stop task storage with error: {0!s}'.format(
exception))
if self._abort:
logger.debug('Processing aborted.')
else:
logger.debug('Processing completed.')
events_counter = collections.Counter()
events_counter['Events filtered'] = number_of_filtered_events
events_counter['Events processed'] = self._number_of_consumed_events
return events_counter
def _CheckStatusAnalysisProcess(self, pid):
"""Checks the status of an analysis process.
Args:
pid (int): process ID (PID) of a registered analysis process.
Raises:
KeyError: if the process is not registered with the engine.
"""
# TODO: Refactor this method, simplify and separate concerns (monitoring
# vs management).
self._RaiseIfNotRegistered(pid)
if pid in self._completed_analysis_processes:
status_indicator = definitions.PROCESSING_STATUS_COMPLETED
process_status = {
'processing_status': status_indicator}
used_memory = 0
else:
process = self._processes_per_pid[pid]
process_status = self._QueryProcessStatus(process)
if process_status is None:
process_is_alive = False
else:
process_is_alive = True
process_information = self._process_information_per_pid[pid]
used_memory = process_information.GetUsedMemory() or 0
if self._worker_memory_limit and used_memory > self._worker_memory_limit:
logger.warning((
'Process: {0:s} (PID: {1:d}) killed because it exceeded the '
'memory limit: {2:d}.').format(
process.name, pid, self._worker_memory_limit))
self._KillProcess(pid)
if isinstance(process_status, dict):
self._rpc_errors_per_pid[pid] = 0
status_indicator = process_status.get('processing_status', None)
if status_indicator == definitions.PROCESSING_STATUS_COMPLETED:
self._completed_analysis_processes.add(pid)
else:
rpc_errors = self._rpc_errors_per_pid.get(pid, 0) + 1
self._rpc_errors_per_pid[pid] = rpc_errors
if rpc_errors > self._MAXIMUM_RPC_ERRORS:
process_is_alive = False
if process_is_alive:
rpc_port = process.rpc_port.value
logger.warning((
'Unable to retrieve process: {0:s} (PID: {1:d}) status via '
'RPC socket: http://localhost:{2:d}').format(
process.name, pid, rpc_port))
processing_status_string = 'RPC error'
status_indicator = definitions.PROCESSING_STATUS_RUNNING
else:
processing_status_string = 'killed'
status_indicator = definitions.PROCESSING_STATUS_KILLED
process_status = {
'processing_status': processing_status_string}
self._UpdateProcessingStatus(pid, process_status, used_memory)
if status_indicator in definitions.PROCESSING_ERROR_STATUS:
logger.error((
'Process {0:s} (PID: {1:d}) is not functioning correctly. '
'Status code: {2!s}.').format(
process.name, pid, status_indicator))
self._TerminateProcessByPid(pid)
def _ExportEvent(self, output_module, event, deduplicate_events=True):
"""Exports an event using an output module.
Args:
output_module (OutputModule): output module.
event (EventObject): event.
deduplicate_events (Optional[bool]): True if events should be
deduplicated.
"""
if event.timestamp != self._export_event_timestamp:
self._FlushExportBuffer(
output_module, deduplicate_events=deduplicate_events)
self._export_event_timestamp = event.timestamp
self._export_event_heap.PushEvent(event)
def _ExportEvents(
self, storage_reader, output_module, deduplicate_events=True,
event_filter=None, time_slice=None, use_time_slicer=False):
"""Exports events using an output module.
Args:
storage_reader (StorageReader): storage reader.
output_module (OutputModule): output module.
deduplicate_events (Optional[bool]): True if events should be
deduplicated.
event_filter (Optional[FilterObject]): event filter.
time_slice (Optional[TimeRange]): time range that defines a time slice
to filter events.
use_time_slicer (Optional[bool]): True if the 'time slicer' should be
used. The 'time slicer' will provide a context of events around
an event of interest.
Returns:
collections.Counter: counter that tracks the number of unique events
read from storage.
"""
self._status = definitions.PROCESSING_STATUS_EXPORTING
time_slice_buffer = None
time_slice_range = None
if time_slice:
if time_slice.event_timestamp is not None:
time_slice_range = storage_time_range.TimeRange(
time_slice.start_timestamp, time_slice.end_timestamp)
if use_time_slicer:
time_slice_buffer = bufferlib.CircularBuffer(time_slice.duration)
filter_limit = getattr(event_filter, 'limit', None)
forward_entries = 0
number_of_filtered_events = 0
number_of_events_from_time_slice = 0
for event in storage_reader.GetSortedEvents(time_range=time_slice_range):
event_data_identifier = event.GetEventDataIdentifier()
if event_data_identifier:
event_data = storage_reader.GetEventDataByIdentifier(
event_data_identifier)
if event_data:
for attribute_name, attribute_value in event_data.GetAttributes():
setattr(event, attribute_name, attribute_value)
event_identifier = event.GetIdentifier()
event.tag = self._event_tag_index.GetEventTagByIdentifier(
storage_reader, event_identifier)
if time_slice_range and event.timestamp != time_slice.event_timestamp:
number_of_events_from_time_slice += 1
if event_filter:
filter_match = event_filter.Match(event)
else:
filter_match = None
# pylint: disable=singleton-comparison
if filter_match == False:
if not time_slice_buffer:
number_of_filtered_events += 1
elif forward_entries == 0:
time_slice_buffer.Append(event)
number_of_filtered_events += 1
elif forward_entries <= time_slice_buffer.size:
self._ExportEvent(
output_module, event, deduplicate_events=deduplicate_events)
self._number_of_consumed_events += 1
number_of_events_from_time_slice += 1
forward_entries += 1
else:
# We reached the maximum size of the time slice and don't need to
# include other entries.
number_of_filtered_events += 1
forward_entries = 0
else:
# pylint: disable=singleton-comparison
if filter_match == True and time_slice_buffer:
# Empty the time slice buffer.
for event_in_buffer in time_slice_buffer.Flush():
self._ExportEvent(
output_module, event_in_buffer,
deduplicate_events=deduplicate_events)
self._number_of_consumed_events += 1
number_of_filtered_events += 1
number_of_events_from_time_slice += 1
forward_entries = 1
self._ExportEvent(
output_module, event, deduplicate_events=deduplicate_events)
self._number_of_consumed_events += 1
# pylint: disable=singleton-comparison
if (filter_match == True and filter_limit and
filter_limit == self._number_of_consumed_events):
break
self._FlushExportBuffer(output_module)
events_counter = collections.Counter()
events_counter['Events filtered'] = number_of_filtered_events
events_counter['Events from time slice'] = number_of_events_from_time_slice
events_counter['Events processed'] = self._number_of_consumed_events
if self._number_of_duplicate_events:
events_counter['Duplicate events removed'] = (
self._number_of_duplicate_events)
if self._number_of_macb_grouped_events:
events_counter['Events MACB grouped'] = (
self._number_of_macb_grouped_events)
if filter_limit:
events_counter['Limited By'] = filter_limit
return events_counter
def _FlushExportBuffer(self, output_module, deduplicate_events=True):
"""Flushes buffered events and writes them to the output module.
Args:
output_module (OutputModule): output module.
deduplicate_events (Optional[bool]): True if events should be
deduplicated.
"""
last_macb_group_identifier = None
last_content_identifier = None
macb_group = []
generator = self._export_event_heap.PopEvents()
for macb_group_identifier, content_identifier, event in generator:
if deduplicate_events and last_content_identifier == content_identifier:
self._number_of_duplicate_events += 1
continue
if macb_group_identifier is None:
if macb_group:
output_module.WriteEventMACBGroup(macb_group)
macb_group = []
output_module.WriteEvent(event)
else:
if (last_macb_group_identifier == macb_group_identifier or
not macb_group):
macb_group.append(event)
else:
output_module.WriteEventMACBGroup(macb_group)
macb_group = [event]
self._number_of_macb_grouped_events += 1
last_macb_group_identifier = macb_group_identifier
last_content_identifier = content_identifier
if macb_group:
output_module.WriteEventMACBGroup(macb_group)
def _MergeEventTag(self, storage_writer, attribute_container):
"""Merges an event tag with the last stored event tag.
If there is an existing event the provided event tag is updated with
the contents of the existing one. After which the event tag index is
updated.
Args:
storage_writer (StorageWriter): storage writer.
attribute_container (AttributeContainer): container.
"""
if attribute_container.CONTAINER_TYPE != 'event_tag':
return
event_identifier = attribute_container.GetEventIdentifier()
if not event_identifier:
return
# Check if the event has already been tagged on a previous occasion,
# we need to append the event tag to the last stored one.
stored_event_tag = self._event_tag_index.GetEventTagByIdentifier(
storage_writer, event_identifier)
if stored_event_tag:
attribute_container.AddComment(stored_event_tag.comment)
attribute_container.AddLabels(stored_event_tag.labels)
self._event_tag_index.SetEventTag(attribute_container)
def _StartAnalysisProcesses(self, storage_writer, analysis_plugins):
"""Starts the analysis processes.
Args:
storage_writer (StorageWriter): storage writer.
analysis_plugins (dict[str, AnalysisPlugin]): analysis plugins that
should be run and their names.
"""
logger.info('Starting analysis plugins.')
for analysis_plugin in analysis_plugins.values():
self._analysis_plugins[analysis_plugin.NAME] = analysis_plugin
process = self._StartWorkerProcess(analysis_plugin.NAME, storage_writer)
if not process:
logger.error('Unable to create analysis process: {0:s}'.format(
analysis_plugin.NAME))
logger.info('Analysis plugins running')
def _StatusUpdateThreadMain(self):
"""Main function of the status update thread."""
while self._status_update_active:
# Make a local copy of the PIDs in case the dict is changed by
# the main thread.
for pid in list(self._process_information_per_pid.keys()):
self._CheckStatusAnalysisProcess(pid)
used_memory = self._process_information.GetUsedMemory() or 0
display_name = getattr(self._merge_task, 'identifier', '')
self._processing_status.UpdateForemanStatus(
self._name, self._status, self._pid, used_memory, display_name,
self._number_of_consumed_sources, self._number_of_produced_sources,
self._number_of_consumed_events, self._number_of_produced_events,
self._number_of_consumed_event_tags,
self._number_of_produced_event_tags,
self._number_of_consumed_errors, self._number_of_produced_errors,
self._number_of_consumed_reports, self._number_of_produced_reports)
if self._status_update_callback:
self._status_update_callback(self._processing_status)
time.sleep(self._STATUS_UPDATE_INTERVAL)
def _StopAnalysisProcesses(self, abort=False):
"""Stops the analysis processes.
Args:
abort (bool): True to indicated the stop is issued on abort.
"""
logger.debug('Stopping analysis processes.')
self._StopMonitoringProcesses()
# Note that multiprocessing.Queue is very sensitive regarding
# blocking on either a get or a put. So we try to prevent using
# any blocking behavior.
if abort:
# Signal all the processes to abort.
self._AbortTerminate()
if not self._use_zeromq:
logger.debug('Emptying queues.')
for event_queue in self._event_queues.values():
event_queue.Empty()
# Wake the processes to make sure that they are not blocking
# waiting for the queue new items.
for event_queue in self._event_queues.values():
event_queue.PushItem(plaso_queue.QueueAbort(), block=False)
# Try waiting for the processes to exit normally.
self._AbortJoin(timeout=self._PROCESS_JOIN_TIMEOUT)
for event_queue in self._event_queues.values():
event_queue.Close(abort=abort)
if abort:
# Kill any remaining processes.
self._AbortKill()
else:
# Check if the processes are still alive and terminate them if necessary.
self._AbortTerminate()
self._AbortJoin(timeout=self._PROCESS_JOIN_TIMEOUT)
for event_queue in self._event_queues.values():
event_queue.Close(abort=True)
def _UpdateProcessingStatus(self, pid, process_status, used_memory):
"""Updates the processing status.
Args:
pid (int): process identifier (PID) of the worker process.
process_status (dict[str, object]): status values received from
the worker process.
used_memory (int): size of used memory in bytes.
Raises:
KeyError: if the process is not registered with the engine.
"""
self._RaiseIfNotRegistered(pid)
if not process_status:
return
process = self._processes_per_pid[pid]
processing_status = process_status.get('processing_status', None)
self._RaiseIfNotMonitored(pid)
display_name = process_status.get('display_name', '')
number_of_consumed_errors = process_status.get(
'number_of_consumed_errors', None)
number_of_produced_errors = process_status.get(
'number_of_produced_errors', None)
number_of_consumed_event_tags = process_status.get(
'number_of_consumed_event_tags', None)
number_of_produced_event_tags = process_status.get(
'number_of_produced_event_tags', None)
number_of_consumed_events = process_status.get(
'number_of_consumed_events', None)
number_of_produced_events = process_status.get(
'number_of_produced_events', None)
number_of_consumed_reports = process_status.get(
'number_of_consumed_reports', None)
number_of_produced_reports = process_status.get(
'number_of_produced_reports', None)
number_of_consumed_sources = process_status.get(
'number_of_consumed_sources', None)
number_of_produced_sources = process_status.get(
'number_of_produced_sources', None)
if processing_status != definitions.PROCESSING_STATUS_IDLE:
last_activity_timestamp = process_status.get(
'last_activity_timestamp', 0.0)
if last_activity_timestamp:
last_activity_timestamp += self._PROCESS_WORKER_TIMEOUT
current_timestamp = time.time()
if current_timestamp > last_activity_timestamp:
logger.error((
'Process {0:s} (PID: {1:d}) has not reported activity within '
'the timeout period.').format(process.name, pid))
processing_status = definitions.PROCESSING_STATUS_NOT_RESPONDING
self._processing_status.UpdateWorkerStatus(
process.name, processing_status, pid, used_memory, display_name,
number_of_consumed_sources, number_of_produced_sources,
number_of_consumed_events, number_of_produced_events,
number_of_consumed_event_tags, number_of_produced_event_tags,
number_of_consumed_errors, number_of_produced_errors,
number_of_consumed_reports, number_of_produced_reports)
def _StartWorkerProcess(self, process_name, storage_writer):
"""Creates, starts, monitors and registers a worker process.
Args:
process_name (str): process name.
storage_writer (StorageWriter): storage writer for a session storage used
to create task storage.
Returns:
MultiProcessWorkerProcess: extraction worker process or None on error.
"""
analysis_plugin = self._analysis_plugins.get(process_name, None)
if not analysis_plugin:
logger.error('Missing analysis plugin: {0:s}'.format(process_name))
return None
if self._use_zeromq:
queue_name = '{0:s} output event queue'.format(process_name)
output_event_queue = zeromq_queue.ZeroMQPushBindQueue(
name=queue_name, timeout_seconds=self._QUEUE_TIMEOUT)
# Open the queue so it can bind to a random port, and we can get the
# port number to use in the input queue.
output_event_queue.Open()
else:
output_event_queue = multi_process_queue.MultiProcessingQueue(
timeout=self._QUEUE_TIMEOUT)
self._event_queues[process_name] = output_event_queue
if self._use_zeromq:
queue_name = '{0:s} input event queue'.format(process_name)
input_event_queue = zeromq_queue.ZeroMQPullConnectQueue(
name=queue_name, delay_open=True, port=output_event_queue.port,
timeout_seconds=self._QUEUE_TIMEOUT)
else:
input_event_queue = output_event_queue
process = analysis_process.AnalysisProcess(
input_event_queue, storage_writer, self._knowledge_base,
analysis_plugin, self._processing_configuration,
data_location=self._data_location,
event_filter_expression=self._event_filter_expression,
name=process_name)
process.start()
logger.info('Started analysis plugin: {0:s} (PID: {1:d}).'.format(
process_name, process.pid))
try:
self._StartMonitoringProcess(process)
except (IOError, KeyError) as exception:
logger.error((
'Unable to monitor analysis plugin: {0:s} (PID: {1:d}) '
'with error: {2!s}').format(process_name, process.pid, exception))
process.terminate()
return None
self._RegisterProcess(process)
return process
[docs] def AnalyzeEvents(
self, knowledge_base_object, storage_writer, data_location,
analysis_plugins, processing_configuration, event_filter=None,
event_filter_expression=None, status_update_callback=None,
worker_memory_limit=None):
"""Analyzes events in a plaso storage.
Args:
knowledge_base_object (KnowledgeBase): contains information from
the source data needed for processing.
storage_writer (StorageWriter): storage writer.
data_location (str): path to the location that data files should
be loaded from.
analysis_plugins (dict[str, AnalysisPlugin]): analysis plugins that
should be run and their names.
processing_configuration (ProcessingConfiguration): processing
configuration.
event_filter (Optional[FilterObject]): event filter.
event_filter_expression (Optional[str]): event filter expression.
status_update_callback (Optional[function]): callback function for status
updates.
worker_memory_limit (Optional[int]): maximum amount of memory a worker is
allowed to consume, where None represents the default memory limit
and 0 represents no limit.
Raises:
KeyboardInterrupt: if a keyboard interrupt was raised.
"""
if not analysis_plugins:
return
keyboard_interrupt = False
self._analysis_plugins = {}
self._data_location = data_location
self._event_filter_expression = event_filter_expression
self._knowledge_base = knowledge_base_object
self._status_update_callback = status_update_callback
self._processing_configuration = processing_configuration
if worker_memory_limit is None:
self._worker_memory_limit = definitions.DEFAULT_WORKER_MEMORY_LIMIT
else:
self._worker_memory_limit = worker_memory_limit
self._StartProfiling(self._processing_configuration.profiling)
# Set up the storage writer before the analysis processes.
storage_writer.StartTaskStorage()
self._StartAnalysisProcesses(storage_writer, analysis_plugins)
# Start the status update thread after open of the storage writer
# so we don't have to clean up the thread if the open fails.
self._StartStatusUpdateThread()
try:
# Open the storage file after creating the worker processes otherwise
# the ZIP storage file will remain locked as long as the worker processes
# are alive.
storage_writer.Open()
storage_writer.ReadPreprocessingInformation(knowledge_base_object)
storage_writer.WriteSessionStart()
try:
self._AnalyzeEvents(
storage_writer, analysis_plugins, event_filter=event_filter)
self._status = definitions.PROCESSING_STATUS_FINALIZING
except KeyboardInterrupt:
keyboard_interrupt = True
self._abort = True
self._processing_status.aborted = True
if self._status_update_callback:
self._status_update_callback(self._processing_status)
finally:
storage_writer.WriteSessionCompletion(aborted=self._abort)
storage_writer.Close()
finally:
# Stop the status update thread after close of the storage writer
# so we include the storage sync to disk in the status updates.
self._StopStatusUpdateThread()
try:
self._StopAnalysisProcesses(abort=self._abort)
except KeyboardInterrupt:
keyboard_interrupt = True
self._AbortKill()
# The abort can leave the main process unresponsive
# due to incorrectly finalized IPC.
self._KillProcess(os.getpid())
self._StopProfiling()
# Reset values.
self._analysis_plugins = {}
self._data_location = None
self._event_filter_expression = None
self._knowledge_base = None
self._processing_configuration = None
self._status_update_callback = None
self._worker_memory_limit = definitions.DEFAULT_WORKER_MEMORY_LIMIT
if keyboard_interrupt:
raise KeyboardInterrupt
if keyboard_interrupt:
raise KeyboardInterrupt
[docs] def ExportEvents(
self, knowledge_base_object, storage_reader, output_module,
processing_configuration, deduplicate_events=True, event_filter=None,
status_update_callback=None, time_slice=None, use_time_slicer=False):
"""Exports events using an output module.
Args:
knowledge_base_object (KnowledgeBase): contains information from
the source data needed for processing.
storage_reader (StorageReader): storage reader.
output_module (OutputModule): output module.
processing_configuration (ProcessingConfiguration): processing
configuration.
deduplicate_events (Optional[bool]): True if events should be
deduplicated.
event_filter (Optional[FilterObject]): event filter.
status_update_callback (Optional[function]): callback function for status
updates.
time_slice (Optional[TimeSlice]): slice of time to output.
use_time_slicer (Optional[bool]): True if the 'time slicer' should be
used. The 'time slicer' will provide a context of events around
an event of interest.
Returns:
collections.Counter: counter that tracks the number of events extracted
from storage.
"""
self._processing_configuration = processing_configuration
self._status_update_callback = status_update_callback
storage_reader.ReadPreprocessingInformation(knowledge_base_object)
output_module.Open()
output_module.WriteHeader()
self._StartStatusUpdateThread()
self._StartProfiling(self._processing_configuration.profiling)
try:
events_counter = self._ExportEvents(
storage_reader, output_module, deduplicate_events=deduplicate_events,
event_filter=event_filter, time_slice=time_slice,
use_time_slicer=use_time_slicer)
finally:
# Stop the status update thread after close of the storage writer
# so we include the storage sync to disk in the status updates.
self._StopStatusUpdateThread()
output_module.WriteFooter()
output_module.Close()
self._StopProfiling()
# Reset values.
self._status_update_callback = None
self._processing_configuration = None
return events_counter