Source code for plaso.multi_processing.base_process

# -*- coding: utf-8 -*-
"""Base class for a process used in multi-processing."""

from __future__ import unicode_literals

import abc
import logging
import multiprocessing
import os
import random
import signal
import time

from plaso.lib import loggers
from plaso.multi_processing import plaso_xmlrpc


[docs]class MultiProcessBaseProcess(multiprocessing.Process): """Class that defines the multi-processing process interface. Attributes: rpc_port (int): port number of the process status RPC server. """ _NUMBER_OF_RPC_SERVER_START_ATTEMPTS = 14 _PROCESS_JOIN_TIMEOUT = 5.0 def __init__(self, enable_sigsegv_handler=False, **kwargs): """Initializes a process object. Args: enable_sigsegv_handler (Optional[bool]): True if the SIGSEGV handler should be enabled. kwargs (dict[str,object]): keyword arguments to pass to multiprocessing.Process. """ super(MultiProcessBaseProcess, self).__init__(**kwargs) self._debug_output = False self._enable_sigsegv_handler = enable_sigsegv_handler self._log_filename = None self._original_sigsegv_handler = None # TODO: check if this can be replaced by self.pid or does this only apply # to the parent process? self._pid = None self._quiet_mode = False self._rpc_server = None self._status_is_running = False # We need to share the RPC port number with the engine process. self.rpc_port = multiprocessing.Value('I', 0) @property def name(self): """str: process name.""" return self._name @abc.abstractmethod def _GetStatus(self): """Returns status information. Returns: dict [str, object]: status attributes, indexed by name. """ @abc.abstractmethod def _Main(self): """The process main loop. This method is called when the process is ready to start. A sub class should override this method to do the necessary actions in the main loop. """ def _OnCriticalError(self): """The process on critical error handler. This method is called when the process encounters a critical error e.g. a segfault. A sub class should override this method to do the necessary actions before the original critical error signal handler it called. Be aware that the state of the process should not be trusted, as a significant part of memory could have been overwritten before a segfault. This callback is primarily intended to salvage what we need to troubleshoot the error. """ return def _SigSegvHandler(self, unused_signal_number, unused_stack_frame): """Signal handler for the SIGSEGV signal. Args: signal_number (int): numeric representation of the signal. stack_frame (frame): current stack frame or None. """ self._OnCriticalError() # Note that the original SIGSEGV handler can be 0. if self._original_sigsegv_handler is not None: # Let the original SIGSEGV handler take over. signal.signal(signal.SIGSEGV, self._original_sigsegv_handler) os.kill(self._pid, signal.SIGSEGV) def _SigTermHandler(self, unused_signal_number, unused_stack_frame): """Signal handler for the SIGTERM signal. Args: signal_number (int): numeric representation of the signal. stack_frame (frame): current stack frame or None. """ self.SignalAbort() def _StartProcessStatusRPCServer(self): """Starts the process status RPC server.""" if self._rpc_server: return self._rpc_server = plaso_xmlrpc.XMLProcessStatusRPCServer(self._GetStatus) hostname = 'localhost' # Try the PID as port number first otherwise pick something random # between 1024 and 60000. if self._pid < 1024 or self._pid > 60000: port = random.randint(1024, 60000) else: port = self._pid if not self._rpc_server.Start(hostname, port): port = 0 for _ in range(self._NUMBER_OF_RPC_SERVER_START_ATTEMPTS): port = random.randint(1024, 60000) if self._rpc_server.Start(hostname, port): break port = 0 if not port: logging.error(( 'Unable to start a process status RPC server for {0!s} ' '(PID: {1:d})').format(self._name, self._pid)) self._rpc_server = None return self.rpc_port.value = port logging.debug( 'Process: {0!s} process status RPC server started'.format(self._name)) def _StopProcessStatusRPCServer(self): """Stops the process status RPC server.""" if not self._rpc_server: return # Make sure the engine gets one more status update so it knows # the worker has completed. self._WaitForStatusNotRunning() self._rpc_server.Stop() self._rpc_server = None self.rpc_port.value = 0 logging.debug( 'Process: {0!s} process status RPC server stopped'.format(self._name)) def _WaitForStatusNotRunning(self): """Waits for the status is running to change to false.""" # We wait slightly longer than the status check sleep time. time.sleep(2.0) time_slept = 2.0 while self._status_is_running: time.sleep(0.5) time_slept += 0.5 if time_slept >= self._PROCESS_JOIN_TIMEOUT: break # This method is part of the multiprocessing.Process interface hence # its name does not follow the style guide.
[docs] def run(self): """Runs the process.""" # Prevent the KeyboardInterrupt being raised inside the process. # This will prevent a process from generating a traceback when interrupted. signal.signal(signal.SIGINT, signal.SIG_IGN) # A SIGTERM signal handler is necessary to make sure IPC is cleaned up # correctly on terminate. signal.signal(signal.SIGTERM, self._SigTermHandler) # A SIGSEGV signal handler is necessary to try to indicate where # worker failed. # WARNING the SIGSEGV handler will deadlock the process on a real segfault. if self._enable_sigsegv_handler: self._original_sigsegv_handler = signal.signal( signal.SIGSEGV, self._SigSegvHandler) self._pid = os.getpid() # We need to set the is running status explicitly to True in case # the process completes before the engine is able to determine # the status of the process, e.g. in the unit tests. self._status_is_running = True # Logging needs to be configured before the first output otherwise we # mess up the logging of the parent process. loggers.ConfigureLogging( debug_output=self._debug_output, filename=self._log_filename, quiet_mode=self._quiet_mode) logging.debug( 'Process: {0!s} (PID: {1:d}) started'.format(self._name, self._pid)) self._StartProcessStatusRPCServer() self._Main() self._StopProcessStatusRPCServer() logging.debug( 'Process: {0!s} (PID: {1:d}) stopped'.format(self._name, self._pid)) # Make sure log files are cleanly closed. logging.shutdown() self._status_is_running = False
[docs] @abc.abstractmethod def SignalAbort(self): """Signals the process to abort."""