Source code for plaso.multi_processing.task_engine

# -*- coding: utf-8 -*-
"""The task multi-process processing engine."""

from __future__ import unicode_literals

import heapq
import logging
import multiprocessing
import os
import time

from dfvfs.lib import definitions as dfvfs_definitions
from dfvfs.resolver import context

from plaso.containers import event_sources
from plaso.containers import errors as error_containers
from plaso.engine import extractors
from plaso.engine import plaso_queue
from plaso.engine import zeromq_queue
from plaso.lib import definitions
from plaso.lib import errors
from plaso.lib import loggers
from plaso.multi_processing import engine
from plaso.multi_processing import logger
from plaso.multi_processing import multi_process_queue
from plaso.multi_processing import task_manager
from plaso.multi_processing import worker_process


class _EventSourceHeap(object):
  """Class that defines an event source heap."""

  def __init__(self, maximum_number_of_items=50000):
    """Initializes an event source heap.

    Args:
      maximum_number_of_items (Optional[int]): maximum number of items
          in the heap.
    """
    super(_EventSourceHeap, self).__init__()
    self._heap = []
    self._maximum_number_of_items = maximum_number_of_items

  def PopEventSource(self):
    """Pops an event source from the heap.

    Returns:
      EventSource: event source or None on error.
    """
    try:
      _, event_source = heapq.heappop(self._heap)

    except IndexError:
      return None

    return event_source

  def PushEventSource(self, event_source):
    """Pushes an event source onto the heap.

    Args:
      event_source (EventSource): event source.

    Raises:
      HeapFull: if the heap contains the maximum number of items. Note that
          this exception is raised after the item is added to the heap.
    """
    if event_source.file_entry_type == (
        dfvfs_definitions.FILE_ENTRY_TYPE_DIRECTORY):
      weight = 1
    else:
      weight = 100

    heap_values = (weight, event_source)
    heapq.heappush(self._heap, heap_values)

    if len(self._heap) >= self._maximum_number_of_items:
      raise errors.HeapFull()


[docs]class TaskMultiProcessEngine(engine.MultiProcessEngine): """Class that defines the task multi-process engine. This class contains functionality to: * monitor and manage extraction tasks; * merge results returned by extraction workers. """ # Maximum number of attribute containers to merge per loop. _MAXIMUM_NUMBER_OF_CONTAINERS = 50 # Maximum number of concurrent tasks. _MAXIMUM_NUMBER_OF_TASKS = 10000 # Consider a worker inactive after 15 minutes of no activity. _PROCESS_WORKER_TIMEOUT = 15.0 * 60.0 _WORKER_PROCESSES_MINIMUM = 2 _WORKER_PROCESSES_MAXIMUM = 15 _TASK_QUEUE_TIMEOUT_SECONDS = 2 _ZEROMQ_NO_WORKER_REQUEST_TIME_SECONDS = 10 * 60 def __init__( self, maximum_number_of_tasks=_MAXIMUM_NUMBER_OF_TASKS, use_zeromq=True): """Initializes an engine. Args: maximum_number_of_tasks (Optional[int]): maximum number of concurrent tasks, where 0 represents no limit. use_zeromq (Optional[bool]): True if ZeroMQ should be used for queuing instead of Python's multiprocessing queue. """ super(TaskMultiProcessEngine, self).__init__() self._enable_sigsegv_handler = False self._filter_find_specs = None self._last_worker_number = 0 self._maximum_number_of_tasks = maximum_number_of_tasks self._merge_task = None self._merge_task_on_hold = None self._number_of_consumed_errors = 0 self._number_of_consumed_event_tags = 0 self._number_of_consumed_events = 0 self._number_of_consumed_reports = 0 self._number_of_consumed_sources = 0 self._number_of_produced_errors = 0 self._number_of_produced_event_tags = 0 self._number_of_produced_events = 0 self._number_of_produced_reports = 0 self._number_of_produced_sources = 0 self._number_of_worker_processes = 0 self._path_spec_extractor = extractors.PathSpecExtractor() self._processing_configuration = None self._resolver_context = context.Context() self._session_identifier = None self._status = definitions.PROCESSING_STATUS_IDLE self._storage_merge_reader = None self._storage_merge_reader_on_hold = None self._task_queue = None self._task_queue_port = None self._task_manager = task_manager.TaskManager() self._use_zeromq = use_zeromq def _FillEventSourceHeap( self, storage_writer, event_source_heap, start_with_first=False): """Fills the event source heap with the available written event sources. Args: storage_writer (StorageWriter): storage writer for a session storage. event_source_heap (_EventSourceHeap): event source heap. start_with_first (Optional[bool]): True if the function should start with the first written event source. """ if self._processing_profiler: self._processing_profiler.StartTiming('fill_event_source_heap') if self._processing_profiler: self._processing_profiler.StartTiming('get_event_source') if start_with_first: event_source = storage_writer.GetFirstWrittenEventSource() else: event_source = storage_writer.GetNextWrittenEventSource() if self._processing_profiler: self._processing_profiler.StopTiming('get_event_source') while event_source: try: event_source_heap.PushEventSource(event_source) except errors.HeapFull: break if self._processing_profiler: self._processing_profiler.StartTiming('get_event_source') event_source = storage_writer.GetNextWrittenEventSource() if self._processing_profiler: self._processing_profiler.StopTiming('get_event_source') if self._processing_profiler: self._processing_profiler.StopTiming('fill_event_source_heap') def _MergeTaskStorage(self, storage_writer): """Merges a task storage with the session storage. This function checks all task stores that are ready to merge and updates the scheduled tasks. Note that to prevent this function holding up the task scheduling loop only the first available task storage is merged. Args: storage_writer (StorageWriter): storage writer for a session storage used to merge task storage. """ if self._processing_profiler: self._processing_profiler.StartTiming('merge_check') for task_identifier in storage_writer.GetProcessedTaskIdentifiers(): try: task = self._task_manager.GetProcessedTaskByIdentifier(task_identifier) self._task_manager.SampleTaskStatus(task, 'processed') to_merge = self._task_manager.CheckTaskToMerge(task) if not to_merge: storage_writer.RemoveProcessedTaskStorage(task) self._task_manager.RemoveTask(task) self._task_manager.SampleTaskStatus(task, 'removed_processed') else: storage_writer.PrepareMergeTaskStorage(task) self._task_manager.UpdateTaskAsPendingMerge(task) except KeyError: logger.error( 'Unable to retrieve task: {0:s} to prepare it to be merged.'.format( task_identifier)) continue if self._processing_profiler: self._processing_profiler.StopTiming('merge_check') task = None if not self._storage_merge_reader_on_hold: task = self._task_manager.GetTaskPendingMerge(self._merge_task) # Limit the number of attribute containers from a single task-based # storage file that are merged per loop to keep tasks flowing. if task or self._storage_merge_reader: self._status = definitions.PROCESSING_STATUS_MERGING if self._processing_profiler: self._processing_profiler.StartTiming('merge') if task: if self._storage_merge_reader: self._merge_task_on_hold = self._merge_task self._storage_merge_reader_on_hold = self._storage_merge_reader self._task_manager.SampleTaskStatus( self._merge_task_on_hold, 'merge_on_hold') self._merge_task = task try: self._storage_merge_reader = storage_writer.StartMergeTaskStorage( task) self._task_manager.SampleTaskStatus(task, 'merge_started') except IOError as exception: logger.error(( 'Unable to merge results of task: {0:s} ' 'with error: {1!s}').format(task.identifier, exception)) self._storage_merge_reader = None if self._storage_merge_reader: fully_merged = self._storage_merge_reader.MergeAttributeContainers( maximum_number_of_containers=self._MAXIMUM_NUMBER_OF_CONTAINERS) else: # TODO: Do something more sensible when this happens, perhaps # retrying the task once that is implemented. For now, we mark the task # as fully merged because we can't continue with it. fully_merged = True if self._processing_profiler: self._processing_profiler.StopTiming('merge') if fully_merged: try: self._task_manager.CompleteTask(self._merge_task) except KeyError as exception: logger.error( 'Unable to complete task: {0:s} with error: {1!s}'.format( self._merge_task.identifier, exception)) if not self._storage_merge_reader_on_hold: self._merge_task = None self._storage_merge_reader = None else: self._merge_task = self._merge_task_on_hold self._storage_merge_reader = self._storage_merge_reader_on_hold self._merge_task_on_hold = None self._storage_merge_reader_on_hold = None self._task_manager.SampleTaskStatus( self._merge_task, 'merge_resumed') self._status = definitions.PROCESSING_STATUS_RUNNING self._number_of_produced_errors = storage_writer.number_of_errors self._number_of_produced_events = storage_writer.number_of_events self._number_of_produced_sources = storage_writer.number_of_event_sources def _ProcessSources( self, source_path_specs, storage_writer, filter_find_specs=None): """Processes the sources. Args: source_path_specs (list[dfvfs.PathSpec]): path specifications of the sources to process. storage_writer (StorageWriter): storage writer for a session storage. filter_find_specs (Optional[list[dfvfs.FindSpec]]): find specifications used in path specification extraction. If set, path specifications that match the find specification will be processed. """ if self._processing_profiler: self._processing_profiler.StartTiming('process_sources') self._status = definitions.PROCESSING_STATUS_COLLECTING self._number_of_consumed_errors = 0 self._number_of_consumed_event_tags = 0 self._number_of_consumed_events = 0 self._number_of_consumed_reports = 0 self._number_of_consumed_sources = 0 self._number_of_produced_errors = 0 self._number_of_produced_event_tags = 0 self._number_of_produced_events = 0 self._number_of_produced_reports = 0 self._number_of_produced_sources = 0 path_spec_generator = self._path_spec_extractor.ExtractPathSpecs( source_path_specs, find_specs=filter_find_specs, recurse_file_system=False, resolver_context=self._resolver_context) for path_spec in path_spec_generator: if self._abort: break # TODO: determine if event sources should be DataStream or FileEntry # or both. event_source = event_sources.FileEntryEventSource(path_spec=path_spec) storage_writer.AddEventSource(event_source) self._number_of_produced_sources = storage_writer.number_of_event_sources # Update the foreman process status in case we are using a filter file. self._UpdateForemanProcessStatus() if self._status_update_callback: self._status_update_callback(self._processing_status) self._ScheduleTasks(storage_writer) if self._abort: self._status = definitions.PROCESSING_STATUS_ABORTED else: self._status = definitions.PROCESSING_STATUS_COMPLETED self._number_of_produced_errors = storage_writer.number_of_errors self._number_of_produced_events = storage_writer.number_of_events self._number_of_produced_sources = storage_writer.number_of_event_sources if self._processing_profiler: self._processing_profiler.StopTiming('process_sources') # Update the foreman process and task status in case we are using # a filter file. self._UpdateForemanProcessStatus() tasks_status = self._task_manager.GetStatusInformation() if self._task_queue_profiler: self._task_queue_profiler.Sample(tasks_status) self._processing_status.UpdateTasksStatus(tasks_status) if self._status_update_callback: self._status_update_callback(self._processing_status) def _ScheduleTask(self, task): """Schedules a task. Args: task (Task): task. Returns: bool: True if the task was scheduled. """ if self._processing_profiler: self._processing_profiler.StartTiming('schedule_task') try: self._task_queue.PushItem(task, block=False) is_scheduled = True except errors.QueueFull: is_scheduled = False if self._processing_profiler: self._processing_profiler.StopTiming('schedule_task') return is_scheduled def _ScheduleTasks(self, storage_writer): """Schedules tasks. Args: storage_writer (StorageWriter): storage writer for a session storage. """ logger.debug('Task scheduler started') self._status = definitions.PROCESSING_STATUS_RUNNING # TODO: make tasks persistent. # TODO: protect task scheduler loop by catch all and # handle abort path. event_source_heap = _EventSourceHeap() self._FillEventSourceHeap( storage_writer, event_source_heap, start_with_first=True) event_source = event_source_heap.PopEventSource() task = None while event_source or self._task_manager.HasPendingTasks(): if self._abort: break try: if not task: task = self._task_manager.CreateRetryTask() if not task and event_source: task = self._task_manager.CreateTask(self._session_identifier) task.file_entry_type = event_source.file_entry_type task.path_spec = event_source.path_spec event_source = None self._number_of_consumed_sources += 1 if self._guppy_memory_profiler: self._guppy_memory_profiler.Sample() if task: if self._ScheduleTask(task): logger.debug( 'Scheduled task {0:s} for path specification {1:s}'.format( task.identifier, task.path_spec.comparable)) self._task_manager.SampleTaskStatus(task, 'scheduled') task = None else: self._task_manager.SampleTaskStatus(task, 'schedule_attempted') self._MergeTaskStorage(storage_writer) self._FillEventSourceHeap(storage_writer, event_source_heap) if not task and not event_source: event_source = event_source_heap.PopEventSource() except KeyboardInterrupt: self._abort = True self._processing_status.aborted = True if self._status_update_callback: self._status_update_callback(self._processing_status) for task in self._task_manager.GetFailedTasks(): error = error_containers.ExtractionError( message='Worker failed to process path specification', path_spec=task.path_spec) self._storage_writer.AddError(error) self._processing_status.error_path_specs.append(task.path_spec) self._status = definitions.PROCESSING_STATUS_IDLE if self._abort: logger.debug('Task scheduler aborted') else: logger.debug('Task scheduler stopped') def _StartWorkerProcess(self, process_name, storage_writer): """Creates, starts, monitors and registers a worker process. Args: process_name (str): process name. storage_writer (StorageWriter): storage writer for a session storage used to create task storage. Returns: MultiProcessWorkerProcess: extraction worker process or None if the process could not be started. """ process_name = 'Worker_{0:02d}'.format(self._last_worker_number) logger.debug('Starting worker process {0:s}'.format(process_name)) if self._use_zeromq: queue_name = '{0:s} task queue'.format(process_name) task_queue = zeromq_queue.ZeroMQRequestConnectQueue( delay_open=True, linger_seconds=0, name=queue_name, port=self._task_queue_port, timeout_seconds=self._TASK_QUEUE_TIMEOUT_SECONDS) else: task_queue = self._task_queue process = worker_process.WorkerProcess( task_queue, storage_writer, self.knowledge_base, self._session_identifier, self._processing_configuration, enable_sigsegv_handler=self._enable_sigsegv_handler, name=process_name) # Remove all possible log handlers to prevent a child process from logging # to the main process log file and garbling the log. The log handlers are # recreated after the worker process has been started. for handler in logging.root.handlers: logging.root.removeHandler(handler) handler.close() process.start() loggers.ConfigureLogging( debug_output=self._debug_output, filename=self._log_filename, mode='a', quiet_mode=self._quiet_mode) try: self._StartMonitoringProcess(process) except (IOError, KeyError) as exception: pid = process.pid logger.error(( 'Unable to monitor replacement worker process: {0:s} ' '(PID: {1:d}) with error: {2!s}').format( process_name, pid, exception)) self._TerminateProcess(process) return None self._RegisterProcess(process) self._last_worker_number += 1 return process def _StatusUpdateThreadMain(self): """Main function of the status update thread.""" while self._status_update_active: # Make a local copy of the PIDs in case the dict is changed by # the main thread. for pid in list(self._process_information_per_pid.keys()): self._CheckStatusWorkerProcess(pid) self._UpdateForemanProcessStatus() tasks_status = self._task_manager.GetStatusInformation() if self._task_queue_profiler: self._task_queue_profiler.Sample(tasks_status) self._processing_status.UpdateTasksStatus(tasks_status) if self._status_update_callback: self._status_update_callback(self._processing_status) time.sleep(self._STATUS_UPDATE_INTERVAL) def _StopExtractionProcesses(self, abort=False): """Stops the extraction processes. Args: abort (bool): True to indicated the stop is issued on abort. """ logger.debug('Stopping extraction processes.') self._StopMonitoringProcesses() # Note that multiprocessing.Queue is very sensitive regarding # blocking on either a get or a put. So we try to prevent using # any blocking behavior. if abort: # Signal all the processes to abort. self._AbortTerminate() logger.debug('Emptying task queue.') self._task_queue.Empty() # Wake the processes to make sure that they are not blocking # waiting for the queue new items. for _ in self._processes_per_pid: try: self._task_queue.PushItem(plaso_queue.QueueAbort(), block=False) except errors.QueueFull: logger.warning('Task queue full, unable to push abort message.') # Try waiting for the processes to exit normally. self._AbortJoin(timeout=self._PROCESS_JOIN_TIMEOUT) self._task_queue.Close(abort=abort) if not abort: # Check if the processes are still alive and terminate them if necessary. self._AbortTerminate() self._AbortJoin(timeout=self._PROCESS_JOIN_TIMEOUT) self._task_queue.Close(abort=True) # Kill any lingering processes. self._AbortKill() def _UpdateForemanProcessStatus(self): """Update the foreman process status.""" used_memory = self._process_information.GetUsedMemory() or 0 if self._memory_profiler: self._memory_profiler.Sample('main', used_memory) display_name = getattr(self._merge_task, 'identifier', '') self._processing_status.UpdateForemanStatus( self._name, self._status, self._pid, used_memory, display_name, self._number_of_consumed_sources, self._number_of_produced_sources, self._number_of_consumed_events, self._number_of_produced_events, self._number_of_consumed_event_tags, self._number_of_produced_event_tags, self._number_of_consumed_errors, self._number_of_produced_errors, self._number_of_consumed_reports, self._number_of_produced_reports) def _UpdateProcessingStatus(self, pid, process_status, used_memory): """Updates the processing status. Args: pid (int): process identifier (PID) of the worker process. process_status (dict[str, object]): status values received from the worker process. used_memory (int): size of used memory in bytes. Raises: KeyError: if the process is not registered with the engine. """ self._RaiseIfNotRegistered(pid) if not process_status: return process = self._processes_per_pid[pid] processing_status = process_status.get('processing_status', None) self._RaiseIfNotMonitored(pid) display_name = process_status.get('display_name', '') number_of_consumed_errors = process_status.get( 'number_of_consumed_errors', None) number_of_produced_errors = process_status.get( 'number_of_produced_errors', None) number_of_consumed_event_tags = process_status.get( 'number_of_consumed_event_tags', None) number_of_produced_event_tags = process_status.get( 'number_of_produced_event_tags', None) number_of_consumed_events = process_status.get( 'number_of_consumed_events', None) number_of_produced_events = process_status.get( 'number_of_produced_events', None) number_of_consumed_reports = process_status.get( 'number_of_consumed_reports', None) number_of_produced_reports = process_status.get( 'number_of_produced_reports', None) number_of_consumed_sources = process_status.get( 'number_of_consumed_sources', None) number_of_produced_sources = process_status.get( 'number_of_produced_sources', None) if processing_status != definitions.PROCESSING_STATUS_IDLE: last_activity_timestamp = process_status.get( 'last_activity_timestamp', 0.0) if last_activity_timestamp: last_activity_timestamp += self._PROCESS_WORKER_TIMEOUT current_timestamp = time.time() if current_timestamp > last_activity_timestamp: logger.error(( 'Process {0:s} (PID: {1:d}) has not reported activity within ' 'the timeout period.').format(process.name, pid)) processing_status = definitions.PROCESSING_STATUS_NOT_RESPONDING self._processing_status.UpdateWorkerStatus( process.name, processing_status, pid, used_memory, display_name, number_of_consumed_sources, number_of_produced_sources, number_of_consumed_events, number_of_produced_events, number_of_consumed_event_tags, number_of_produced_event_tags, number_of_consumed_errors, number_of_produced_errors, number_of_consumed_reports, number_of_produced_reports) task_identifier = process_status.get('task_identifier', '') if not task_identifier: return try: self._task_manager.UpdateTaskAsProcessingByIdentifier(task_identifier) return except KeyError: logger.debug( 'Worker {0:s} is processing unknown task: {1:s}.'.format( process.name, task_identifier))
[docs] def ProcessSources( self, session_identifier, source_path_specs, storage_writer, processing_configuration, enable_sigsegv_handler=False, filter_find_specs=None, number_of_worker_processes=0, status_update_callback=None, worker_memory_limit=None): """Processes the sources and extract events. Args: session_identifier (str): identifier of the session. source_path_specs (list[dfvfs.PathSpec]): path specifications of the sources to process. storage_writer (StorageWriter): storage writer for a session storage. processing_configuration (ProcessingConfiguration): processing configuration. enable_sigsegv_handler (Optional[bool]): True if the SIGSEGV handler should be enabled. filter_find_specs (Optional[list[dfvfs.FindSpec]]): find specifications used in path specification extraction. number_of_worker_processes (Optional[int]): number of worker processes. status_update_callback (Optional[function]): callback function for status updates. worker_memory_limit (Optional[int]): maximum amount of memory a worker is allowed to consume, where None represents the default memory limit and 0 represents no limit. Returns: ProcessingStatus: processing status. """ if number_of_worker_processes < 1: # One worker for each "available" CPU (minus other processes). # The number here is derived from the fact that the engine starts up: # * A main process. # # If we want to utilize all CPUs on the system we therefore need to start # up workers that amounts to the total number of CPUs - the other # processes. try: cpu_count = multiprocessing.cpu_count() - 1 if cpu_count <= self._WORKER_PROCESSES_MINIMUM: cpu_count = self._WORKER_PROCESSES_MINIMUM elif cpu_count >= self._WORKER_PROCESSES_MAXIMUM: cpu_count = self._WORKER_PROCESSES_MAXIMUM except NotImplementedError: logger.error(( 'Unable to determine number of CPUs defaulting to {0:d} worker ' 'processes.').format(self._WORKER_PROCESSES_MINIMUM)) cpu_count = self._WORKER_PROCESSES_MINIMUM number_of_worker_processes = cpu_count self._enable_sigsegv_handler = enable_sigsegv_handler self._number_of_worker_processes = number_of_worker_processes if worker_memory_limit is None: self._worker_memory_limit = definitions.DEFAULT_WORKER_MEMORY_LIMIT else: self._worker_memory_limit = worker_memory_limit # Keep track of certain values so we can spawn new extraction workers. self._processing_configuration = processing_configuration self._debug_output = processing_configuration.debug_output self._filter_find_specs = filter_find_specs self._log_filename = processing_configuration.log_filename self._session_identifier = session_identifier self._status_update_callback = status_update_callback self._storage_writer = storage_writer # Set up the task queue. if not self._use_zeromq: self._task_queue = multi_process_queue.MultiProcessingQueue( maximum_number_of_queued_items=self._maximum_number_of_tasks) else: task_outbound_queue = zeromq_queue.ZeroMQBufferedReplyBindQueue( delay_open=True, linger_seconds=0, maximum_items=1, name='main_task_queue', timeout_seconds=self._ZEROMQ_NO_WORKER_REQUEST_TIME_SECONDS) self._task_queue = task_outbound_queue # The ZeroMQ backed queue must be started first, so we can save its port. # TODO: raises: attribute-defined-outside-init # self._task_queue.name = 'Task queue' self._task_queue.Open() self._task_queue_port = self._task_queue.port self._StartProfiling(self._processing_configuration.profiling) self._task_manager.StartProfiling( self._processing_configuration.profiling, self._name) if self._serializers_profiler: storage_writer.SetSerializersProfiler(self._serializers_profiler) if self._storage_profiler: storage_writer.SetStorageProfiler(self._storage_profiler) # Set up the storage writer before the worker processes. storage_writer.StartTaskStorage() for worker_number in range(number_of_worker_processes): # First argument to _StartWorkerProcess is not used. extraction_process = self._StartWorkerProcess('', storage_writer) if not extraction_process: logger.error('Unable to create worker process: {0:d}'.format( worker_number)) self._StartStatusUpdateThread() try: # Open the storage file after creating the worker processes otherwise # the ZIP storage file will remain locked as long as the worker processes # are alive. storage_writer.Open() storage_writer.WriteSessionStart() try: storage_writer.WritePreprocessingInformation(self.knowledge_base) self._ProcessSources( source_path_specs, storage_writer, filter_find_specs=filter_find_specs) finally: storage_writer.WriteSessionCompletion(aborted=self._abort) storage_writer.Close() finally: # Stop the status update thread after close of the storage writer # so we include the storage sync to disk in the status updates. self._StopStatusUpdateThread() if self._serializers_profiler: storage_writer.SetSerializersProfiler(None) if self._storage_profiler: storage_writer.SetStorageProfiler(None) self._task_manager.StopProfiling() self._StopProfiling() try: self._StopExtractionProcesses(abort=self._abort) except KeyboardInterrupt: self._AbortKill() # The abort can leave the main process unresponsive # due to incorrectly finalized IPC. self._KillProcess(os.getpid()) # The task queue should be closed by _StopExtractionProcesses, this # close is a failsafe, primarily due to MultiProcessingQueue's # blocking behaviour. self._task_queue.Close(abort=True) if self._processing_status.error_path_specs: task_storage_abort = True else: task_storage_abort = self._abort try: storage_writer.StopTaskStorage(abort=task_storage_abort) except (IOError, OSError) as exception: logger.error('Unable to stop task storage with error: {0!s}'.format( exception)) if self._abort: logger.debug('Processing aborted.') self._processing_status.aborted = True else: logger.debug('Processing completed.') # Reset values. self._enable_sigsegv_handler = None self._number_of_worker_processes = None self._worker_memory_limit = definitions.DEFAULT_WORKER_MEMORY_LIMIT self._processing_configuration = None self._filter_find_specs = None self._session_identifier = None self._status_update_callback = None self._storage_writer = None
return self._processing_status