# -*- coding: utf-8 -*-
"""The parsers and plugins interface classes."""
from __future__ import unicode_literals
import abc
import os
from plaso.lib import errors
[docs]class BaseFileEntryFilter(object):
"""File entry filter interface."""
# pylint: disable=redundant-returns-doc
[docs] @abc.abstractmethod
def Match(self, file_entry):
"""Determines if a file entry matches the filter.
Args:
file_entry (dfvfs.FileEntry): a file entry.
Returns:
bool: True if the file entry matches the filter.
"""
[docs]class FileNameFileEntryFilter(BaseFileEntryFilter):
"""File name file entry filter."""
def __init__(self, filename):
"""Initializes a file entry filter.
Args:
filename (str): name of the file.
"""
super(FileNameFileEntryFilter, self).__init__()
self._filename = filename.lower()
[docs] def Match(self, file_entry):
"""Determines if a file entry matches the filter.
Args:
file_entry (dfvfs.FileEntry): a file entry.
Returns:
bool: True if the file entry matches the filter.
"""
if not file_entry:
return False
filename = file_entry.name.lower()
return filename == self._filename
[docs]class BaseParser(object):
"""The parser interface."""
NAME = 'base_parser'
DESCRIPTION = ''
# List of filters that should match for the parser to be applied.
FILTERS = frozenset()
# Every derived parser class that implements plugins should define
# its own _plugin_classes dict:
# _plugin_classes = {}
# We deliberately don't define it here to make sure the plugins of
# different parser classes don't end up in the same dict.
_plugin_classes = None
def __init__(self):
"""Initializes a parser.
By default all plugins will be enabled. To only enable specific plugins
use the EnablePlugins method and pass it a list of strings containing
the names of the plugins to enable.
The default plugin, named "{self.NAME:s}_default", if it exists,
is always enabled and cannot be disabled.
"""
super(BaseParser, self).__init__()
self._default_plugin = None
self._plugins = None
self.EnablePlugins([])
[docs] @classmethod
def DeregisterPlugin(cls, plugin_class):
"""Deregisters a plugin class.
The plugin classes are identified based on their lower case name.
Args:
plugin_class (type): class of the plugin.
Raises:
KeyError: if plugin class is not set for the corresponding name.
"""
plugin_name = plugin_class.NAME.lower()
if plugin_name not in cls._plugin_classes:
raise KeyError(
'Plugin class not set for name: {0:s}.'.format(
plugin_class.NAME))
del cls._plugin_classes[plugin_name]
[docs] def EnablePlugins(self, plugin_includes):
"""Enables parser plugins.
Args:
plugin_includes (list[str]): names of the plugins to enable, where None
or an empty list represents all plugins. Note the default plugin, if
it exists, is always enabled and cannot be disabled.
"""
self._plugins = []
if not self._plugin_classes:
return
default_plugin_name = '{0:s}_default'.format(self.NAME)
for plugin_name, plugin_class in iter(self._plugin_classes.items()):
if plugin_name == default_plugin_name:
self._default_plugin = plugin_class()
continue
if plugin_includes and plugin_name not in plugin_includes:
continue
plugin_object = plugin_class()
self._plugins.append(plugin_object)
# TODO: move this to a filter.
# pylint: disable=redundant-returns-doc
[docs] @classmethod
def GetPluginObjectByName(cls, plugin_name):
"""Retrieves a specific plugin object by its name.
Args:
plugin_name (str): name of the plugin.
Returns:
BasePlugin: a plugin object or None if not available.
"""
plugin_class = cls._plugin_classes.get(plugin_name, None)
if plugin_class:
return plugin_class()
return None
[docs] @classmethod
def GetPlugins(cls):
"""Retrieves the registered plugins.
Yields:
tuple[str, type]: name and class of the plugin.
"""
for plugin_name, plugin_class in iter(cls._plugin_classes.items()):
yield plugin_name, plugin_class
[docs] @classmethod
def RegisterPlugin(cls, plugin_class):
"""Registers a plugin class.
The plugin classes are identified based on their lower case name.
Args:
plugin_class (type): class of the plugin.
Raises:
KeyError: if plugin class is already set for the corresponding name.
"""
plugin_name = plugin_class.NAME.lower()
if plugin_name in cls._plugin_classes:
raise KeyError((
'Plugin class already set for name: {0:s}.').format(
plugin_class.NAME))
cls._plugin_classes[plugin_name] = plugin_class
[docs] @classmethod
def RegisterPlugins(cls, plugin_classes):
"""Registers plugin classes.
Args:
plugin_classes (list[type]): classes of plugins.
Raises:
KeyError: if plugin class is already set for the corresponding name.
"""
for plugin_class in plugin_classes:
cls.RegisterPlugin(plugin_class)
[docs] @classmethod
def SupportsPlugins(cls):
"""Determines if a parser supports plugins.
Returns:
bool: True if the parser supports plugins.
"""
return cls._plugin_classes is not None
[docs]class FileEntryParser(BaseParser):
"""The file entry parser interface."""
[docs] def Parse(self, parser_mediator):
"""Parsers the file entry and extracts event objects.
Args:
parser_mediator (ParserMediator): a parser mediator.
Raises:
UnableToParseFile: when the file cannot be parsed.
"""
file_entry = parser_mediator.GetFileEntry()
if not file_entry:
raise errors.UnableToParseFile('Invalid file entry')
parser_mediator.AppendToParserChain(self)
try:
self.ParseFileEntry(parser_mediator, file_entry)
finally:
parser_mediator.PopFromParserChain()
[docs] @abc.abstractmethod
def ParseFileEntry(self, parser_mediator, file_entry):
"""Parses a file entry.
Args:
parser_mediator (ParserMediator): a parser mediator.
file_entry (dfvfs.FileEntry): a file entry to parse.
Raises:
UnableToParseFile: when the file cannot be parsed.
"""
[docs]class FileObjectParser(BaseParser):
"""The file-like object parser interface."""
# The initial file offset. Set this value to None if no initial
# file offset seek needs to be performed.
_INITIAL_FILE_OFFSET = 0
[docs] def Parse(self, parser_mediator, file_object):
"""Parses a single file-like object.
Args:
parser_mediator (ParserMediator): a parser mediator.
file_object (dvfvs.FileIO): a file-like object to parse.
Raises:
UnableToParseFile: when the file cannot be parsed.
"""
if not file_object:
raise errors.UnableToParseFile('Invalid file object')
if self._INITIAL_FILE_OFFSET is not None:
file_object.seek(self._INITIAL_FILE_OFFSET, os.SEEK_SET)
parser_mediator.AppendToParserChain(self)
try:
self.ParseFileObject(parser_mediator, file_object)
finally:
parser_mediator.PopFromParserChain()
[docs] @abc.abstractmethod
def ParseFileObject(self, parser_mediator, file_object):
"""Parses a file-like object.
Args:
parser_mediator (ParserMediator): a parser mediator.
file_object (dvfvs.FileIO): a file-like object to parse.
Raises:
UnableToParseFile: when the file cannot be parsed.
"""