# -*- coding: utf-8 -*-
"""The preprocess plugins manager."""
from __future__ import unicode_literals
from dfvfs.helpers import file_system_searcher
from dfvfs.helpers import windows_path_resolver
from dfwinreg import interface as dfwinreg_interface
from dfwinreg import regf as dfwinreg_regf
from dfwinreg import registry as dfwinreg_registry
from dfwinreg import registry_searcher
from plaso.lib import errors
from plaso.preprocessors import interface
from plaso.preprocessors import logger
class FileSystemWinRegistryFileReader(dfwinreg_interface.WinRegistryFileReader):
"""A file system-based Windows Registry file reader."""
def __init__(self, file_system, mount_point, environment_variables=None):
"""Initializes a Windows Registry file reader object.
Args:
file_system (dfvfs.FileSystem): file system.
mount_point (dfvfs.PathSpec): mount point path specification.
environment_variables (Optional[list[EnvironmentVariableArtifact]]):
environment variables.
"""
super(FileSystemWinRegistryFileReader, self).__init__()
self._file_system = file_system
self._path_resolver = self._CreateWindowsPathResolver(
file_system, mount_point, environment_variables=environment_variables)
def _CreateWindowsPathResolver(
self, file_system, mount_point, environment_variables):
"""Create a Windows path resolver and sets the environment variables.
Args:
file_system (dfvfs.FileSytem): file system.
mount_point (dfvfs.PathSpec): mount point path specification.
environment_variables (list[EnvironmentVariableArtifact]): environment
variables.
Returns:
dfvfs.WindowsPathResolver: Windows path resolver.
"""
if environment_variables is None:
environment_variables = []
path_resolver = windows_path_resolver.WindowsPathResolver(
file_system, mount_point)
for environment_variable in environment_variables:
name = environment_variable.name.lower()
if name not in ('systemroot', 'userprofile'):
continue
path_resolver.SetEnvironmentVariable(
environment_variable.name, environment_variable.value)
return path_resolver
def _OpenPathSpec(self, path_specification, ascii_codepage='cp1252'):
"""Opens the Windows Registry file specified by the path specification.
Args:
path_specification (dfvfs.PathSpec): path specification.
ascii_codepage (Optional[str]): ASCII string codepage.
Returns:
WinRegistryFile: Windows Registry file or None.
"""
if not path_specification:
return None
file_entry = self._file_system.GetFileEntryByPathSpec(path_specification)
if file_entry is None:
return None
file_object = file_entry.GetFileObject()
if file_object is None:
return None
registry_file = dfwinreg_regf.REGFWinRegistryFile(
ascii_codepage=ascii_codepage)
try:
registry_file.Open(file_object)
except IOError as exception:
logger.warning(
'Unable to open Windows Registry file with error: {0!s}'.format(
exception))
file_object.close()
return None
return registry_file
def Open(self, path, ascii_codepage='cp1252'):
"""Opens the Windows Registry file specified by the path.
Args:
path (str): path of the Windows Registry file.
ascii_codepage (Optional[str]): ASCII string codepage.
Returns:
WinRegistryFile: Windows Registry file or None.
"""
path_specification = self._path_resolver.ResolvePath(path)
if path_specification is None:
return None
return self._OpenPathSpec(path_specification)
[docs]class PreprocessPluginsManager(object):
"""Preprocess plugins manager."""
_plugins = {}
_file_system_plugins = {}
_windows_registry_plugins = {}
[docs] @classmethod
def CollectFromFileSystem(
cls, artifacts_registry, knowledge_base, searcher, file_system):
"""Collects values from Windows Registry values.
Args:
artifacts_registry (artifacts.ArtifactDefinitionsRegistry): artifacts
definitions registry.
knowledge_base (KnowledgeBase): to fill with preprocessing information.
searcher (dfvfs.FileSystemSearcher): file system searcher to preprocess
the file system.
file_system (dfvfs.FileSystem): file system to be preprocessed.
"""
for preprocess_plugin in cls._file_system_plugins.values():
artifact_definition = artifacts_registry.GetDefinitionByName(
preprocess_plugin.ARTIFACT_DEFINITION_NAME)
if not artifact_definition:
logger.warning('Missing artifact definition: {0:s}'.format(
preprocess_plugin.ARTIFACT_DEFINITION_NAME))
continue
try:
preprocess_plugin.Collect(
knowledge_base, artifact_definition, searcher, file_system)
except (IOError, errors.PreProcessFail) as exception:
logger.warning((
'Unable to collect value from artifact definition: {0:s} '
'with error: {1!s}').format(
preprocess_plugin.ARTIFACT_DEFINITION_NAME, exception))
continue
[docs] @classmethod
def CollectFromWindowsRegistry(
cls, artifacts_registry, knowledge_base, searcher):
"""Collects values from Windows Registry values.
Args:
artifacts_registry (artifacts.ArtifactDefinitionsRegistry): artifacts
definitions registry.
knowledge_base (KnowledgeBase): to fill with preprocessing information.
searcher (dfwinreg.WinRegistrySearcher): Windows Registry searcher to
preprocess the Windows Registry.
"""
for preprocess_plugin in cls._windows_registry_plugins.values():
artifact_definition = artifacts_registry.GetDefinitionByName(
preprocess_plugin.ARTIFACT_DEFINITION_NAME)
if not artifact_definition:
logger.warning('Missing artifact definition: {0:s}'.format(
preprocess_plugin.ARTIFACT_DEFINITION_NAME))
continue
try:
preprocess_plugin.Collect(knowledge_base, artifact_definition, searcher)
except (IOError, errors.PreProcessFail) as exception:
logger.warning((
'Unable to collect value from artifact definition: {0:s} '
'with error: {1!s}').format(
preprocess_plugin.ARTIFACT_DEFINITION_NAME, exception))
continue
[docs] @classmethod
def DeregisterPlugin(cls, plugin_class):
"""Deregisters an preprocess plugin class.
Args:
plugin_class (type): preprocess plugin class.
Raises:
KeyError: if plugin class is not set for the corresponding name.
TypeError: if the source type of the plugin class is not supported.
"""
name = plugin_class.ARTIFACT_DEFINITION_NAME.lower()
if name not in cls._plugins:
raise KeyError(
'Artifact plugin class not set for name: {0:s}.'.format(name))
del cls._plugins[name]
if name in cls._file_system_plugins:
del cls._file_system_plugins[name]
if name in cls._windows_registry_plugins:
del cls._windows_registry_plugins[name]
[docs] @classmethod
def GetNames(cls):
"""Retrieves the names of the registered artifact definitions.
Returns:
list[str]: registered artifact definitions names.
"""
return [
plugin_class.ARTIFACT_DEFINITION_NAME
for plugin_class in cls._plugins.values()]
[docs] @classmethod
def RegisterPlugin(cls, plugin_class):
"""Registers an preprocess plugin class.
Args:
plugin_class (type): preprocess plugin class.
Raises:
KeyError: if plugin class is already set for the corresponding name.
TypeError: if the source type of the plugin class is not supported.
"""
name = plugin_class.ARTIFACT_DEFINITION_NAME.lower()
if name in cls._plugins:
raise KeyError(
'Artifact plugin class already set for name: {0:s}.'.format(name))
preprocess_plugin = plugin_class()
cls._plugins[name] = preprocess_plugin
if isinstance(
preprocess_plugin, interface.FileSystemArtifactPreprocessorPlugin):
cls._file_system_plugins[name] = preprocess_plugin
elif isinstance(
preprocess_plugin,
interface.WindowsRegistryKeyArtifactPreprocessorPlugin):
cls._windows_registry_plugins[name] = preprocess_plugin
[docs] @classmethod
def RegisterPlugins(cls, plugin_classes):
"""Registers preprocess plugin classes.
Args:
plugin_classes (list[type]): preprocess plugin classes.
Raises:
KeyError: if plugin class is already set for the corresponding name.
"""
for plugin_class in plugin_classes:
cls.RegisterPlugin(plugin_class)
[docs] @classmethod
def RunPlugins(
cls, artifacts_registry, file_system, mount_point, knowledge_base):
"""Runs the preprocessing plugins.
Args:
artifacts_registry (artifacts.ArtifactDefinitionsRegistry): artifacts
definitions registry.
file_system (dfvfs.FileSystem): file system to be preprocessed.
mount_point (dfvfs.PathSpec): mount point path specification that refers
to the base location of the file system.
knowledge_base (KnowledgeBase): to fill with preprocessing information.
"""
searcher = file_system_searcher.FileSystemSearcher(file_system, mount_point)
cls.CollectFromFileSystem(
artifacts_registry, knowledge_base, searcher, file_system)
# Run the Registry plugins separately so we do not have to open
# Registry files for every preprocess plugin.
environment_variables = None
if knowledge_base:
environment_variables = knowledge_base.GetEnvironmentVariables()
registry_file_reader = FileSystemWinRegistryFileReader(
file_system, mount_point, environment_variables=environment_variables)
win_registry = dfwinreg_registry.WinRegistry(
registry_file_reader=registry_file_reader)
searcher = registry_searcher.WinRegistrySearcher(win_registry)
cls.CollectFromWindowsRegistry(
artifacts_registry, knowledge_base, searcher)
if not knowledge_base.HasUserAccounts():
logger.warning('Unable to find any user accounts on the system.')