Source code for plaso.filters.filters

# -*- coding: utf-8 -*-
"""The event filter expression parser filter classes."""

from __future__ import unicode_literals

import abc
import logging
import re

from plaso.filters import helpers
from plaso.lib import errors
from plaso.lib import py2to3


[docs]class Filter(object): """Filter interface. Attributes: args (list[object]): arguments provided to the filter. """ def __init__(self, arguments=None): """Initializes a filter. Implementations expanders are provided by subclassing ValueExpander. Args: arguments (Optional[object]): arguments. """ logging.debug('Adding {0!s}'.format(arguments)) super(Filter, self).__init__() self.args = arguments or []
[docs] @abc.abstractmethod def Matches(self, event, event_data, event_tag): """Determines if the event, data and tag match the filter. Args: event (EventObject): event to compare against the filter. event_data (EventData): event data to compare against the filter. event_tag (EventTag): event tag to compare against the filter. Returns: bool: True if the event, data and tag match the filter, False otherwise. """
[docs]class AndFilter(Filter): """A filter that performs a boolean AND on the arguments. Note that if no conditions are passed, all objects will pass. """
[docs] def Matches(self, event, event_data, event_tag): """Determines if the event, data and tag match the filter. Args: event (EventObject): event to compare against the filter. event_data (EventData): event data to compare against the filter. event_tag (EventTag): event tag to compare against the filter. Returns: bool: True if the event, data and tag match the filter, False otherwise. """ for sub_filter in self.args: if not sub_filter.Matches(event, event_data, event_tag): return False return True
[docs]class OrFilter(Filter): """A filter that performs a boolean OR on the arguments. Note that if no conditions are passed, all objects will pass. """
[docs] def Matches(self, event, event_data, event_tag): """Determines if the event, data and tag match the filter. Args: event (EventObject): event to compare against the filter. event_data (EventData): event data to compare against the filter. event_tag (EventTag): event tag to compare against the filter. Returns: bool: True if the event, data and tag match the filter, False otherwise. """ if not self.args: return True for sub_filter in self.args: if sub_filter.Matches(event, event_data, event_tag): return True return False
[docs]class Operator(Filter): """Interface for filters that represent operators."""
[docs] @abc.abstractmethod def Matches(self, event, event_data, event_tag): """Determines if the event, data and tag match the filter. Args: event (EventObject): event to compare against the filter. event_data (EventData): event data to compare against the filter. event_tag (EventTag): event tag to compare against the filter. Returns: bool: True if the event, data and tag match the filter, False otherwise. """
[docs]class IdentityFilter(Operator): """A filter which always evaluates to True."""
[docs] def Matches(self, event, event_data, event_tag): """Determines if the event, data and tag match the filter. Args: event (EventObject): event to compare against the filter. event_data (EventData): event data to compare against the filter. event_tag (EventTag): event tag to compare against the filter. Returns: bool: True if the event, data and tag match the filter, False otherwise. """ return True
[docs]class BinaryOperator(Operator): """Interface for binary operators. Attributes: left_operand (object): left hand operand. right_operand (object): right hand operand. """ def __init__(self, arguments=None, **kwargs): """Initializes a binary operator. Args: arguments (Optional[list[str, object]]): operands of the filter. Raises: InvalidNumberOfOperands: if the number of operands provided is not supported. """ if len(arguments) != 2: raise errors.InvalidNumberOfOperands(( '{0:s} only supports 2 operands, provided were {1:d} ' 'operands.').format(self.__class__.__name__, len(arguments))) super(BinaryOperator, self).__init__(arguments=arguments, **kwargs) self.left_operand = arguments[0] self.right_operand = arguments[1]
[docs] @abc.abstractmethod def Matches(self, event, event_data, event_tag): """Determines if the event, data and tag match the filter. Args: event (EventObject): event to compare against the filter. event_data (EventData): event data to compare against the filter. event_tag (EventTag): event tag to compare against the filter. Returns: bool: True if the event, data and tag match the filter, False otherwise. """
[docs]class GenericBinaryOperator(BinaryOperator): """Shared functionality for common binary operators.""" _DEPRECATED_ATTRIBUTE_NAMES = frozenset([ 'message', 'source', 'source_long', 'source_short', 'sourcetype']) # Attributes that are stored in the event attribute container. _EVENT_ATTRIBUTE_NAMES = frozenset(['timestamp', 'timestamp_desc']) _OBJECT_PATH_SEPARATOR = '.' def __init__(self, arguments=None, **kwargs): """Initializes a generic binary operator. Args: arguments (Optional[list[str, object]]): operands of the filter. """ super(GenericBinaryOperator, self).__init__(arguments=arguments, **kwargs) self._bool_value = True @abc.abstractmethod def _CompareValue(self, event_value, filter_value): """Compares two values with the operator. Args: event_value (object): value retrieved from the event. filter_value (object): value defined by the filter. Returns: bool: True if the values match according to the operator, False otherwise. """ def _GetValue(self, attribute_name, event, event_data, event_tag): """Retrieves the value of a specific event, data or tag attribute. Args: attribute_name (str): name of the attribute to retrieve the value from. event (EventObject): event to retrieve the value from. event_data (EventData): event data to retrieve the value from. event_tag (EventTag): event tag to retrieve the value from. Returns: object: attribute value or None if not available. """ if attribute_name in self._DEPRECATED_ATTRIBUTE_NAMES: logging.warning( 'Expansion of {0:s} in event filter no longer supported'.format( attribute_name)) if attribute_name in self._EVENT_ATTRIBUTE_NAMES: return getattr(event, attribute_name, None) if attribute_name == 'tag': return getattr(event_tag, 'labels', None) return getattr(event_data, attribute_name, None) def _GetValueByPath(self, path, event, event_data, event_tag): """Retrieves the value of a specific event attribute given a specific path. Given a path such as ["pathspec", "inode"] it returns the value event_data.pathspec.inode. Args: path (list[str]): object path to traverse, that contains the attribute names. event (EventObject): event to retrieve the value from. event_data (EventData): event data to retrieve the value from. event_tag (EventTag): event tag to retrieve the value from. Returns: object: attribute value or None if not available. """ if isinstance(path, py2to3.STRING_TYPES): path = path.split(self._OBJECT_PATH_SEPARATOR) attribute_name = path[0].lower() attribute_value = self._GetValue( attribute_name, event, event_data, event_tag) if attribute_value is None: return None if isinstance(attribute_value, dict): if len(path) != 2: logging.warning(( 'Unsupported object path length: {0:d} for dictionary ' 'value').format(len(path))) return None attribute_name = path[1].lower() return attribute_value.get(attribute_name, None) if len(path) == 1: return attribute_value return self._GetValueByPath(path[1:], None, attribute_value, None)
[docs] def FlipBool(self): """Negates the internal boolean value attribute.""" logging.debug('Negative matching.') self._bool_value = not self._bool_value
[docs] def Matches(self, event, event_data, event_tag): """Determines if the event, data and tag match the filter. Args: event (EventObject): event to compare against the filter. event_data (EventData): event data to compare against the filter. event_tag (EventTag): event tag to compare against the filter. Returns: bool: True if the event, data and tag match the filter, False otherwise. """ path = self.left_operand.split('.') value = self._GetValueByPath(path, event, event_data, event_tag) if value and self._CompareValue(value, self.right_operand): return self._bool_value return not self._bool_value
[docs]class EqualsOperator(GenericBinaryOperator): """Equals (==) operator.""" def _CompareValue(self, event_value, filter_value): """Compares if two values are equal. Args: event_value (object): value retrieved from the event. filter_value (object): value defined by the filter. Returns: bool: True if the values are equal, False otherwise. """ return event_value == filter_value
[docs]class NotEqualsOperator(GenericBinaryOperator): """Not equals (!=) operator.""" def _CompareValue(self, event_value, filter_value): """Compares if two values are not equal. Args: event_value (object): value retrieved from the event. filter_value (object): value defined by the filter. Returns: bool: True if the values are not equal, False otherwise. """ return event_value != filter_value
[docs]class LessThanOperator(GenericBinaryOperator): """Less than (<) operator.""" def _CompareValue(self, event_value, filter_value): """Compares if the event value is less than the second. Args: event_value (object): value retrieved from the event. filter_value (object): value defined by the filter. Returns: bool: True if the event value is less than the second, False otherwise. """ return event_value < filter_value
[docs]class LessEqualOperator(GenericBinaryOperator): """Less than or equals (<=) operator.""" def _CompareValue(self, event_value, filter_value): """Compares if the event value is less than or equals the second. Args: event_value (object): value retrieved from the event. filter_value (object): value defined by the filter. Returns: bool: True if the event value is than or equals the second, False otherwise. """ return event_value <= filter_value
[docs]class GreaterThanOperator(GenericBinaryOperator): """Greater than (>) operator.""" def _CompareValue(self, event_value, filter_value): """Compares if the event value is greater than the second. Args: event_value (object): value retrieved from the event. filter_value (object): value defined by the filter. Returns: bool: True if the event value is greater than the second, False otherwise. """ return event_value > filter_value
[docs]class GreaterEqualOperator(GenericBinaryOperator): """Greater than or equals (>=) operator.""" def _CompareValue(self, event_value, filter_value): """Compares if the event value is greater than or equals the second. Args: event_value (object): value retrieved from the event. filter_value (object): value defined by the filter. Returns: bool: True if the event value is greater than or equals the second, False otherwise. """ return event_value >= filter_value
[docs]class Contains(GenericBinaryOperator): """Operator to determine if a value contains another value.""" def _CompareValue(self, event_value, filter_value): """Compares if the second value is part of the first. Note that this method will do a case insensitive comparion if the first value is a string. Args: event_value (object): value retrieved from the event. filter_value (object): value defined by the filter. Returns: bool: True if the second value is part of the first, False otherwise. """ try: if isinstance(event_value, py2to3.STRING_TYPES): return filter_value.lower() in event_value.lower() return filter_value in event_value except (AttributeError, TypeError): return False
# TODO: Change to an N-ary Operator?
[docs]class InSet(GenericBinaryOperator): """Operator to determine if a value is part of another value.""" def _CompareValue(self, event_value, filter_value): """Compares if the event value is part of the second. Note that this method will do a case insensitive string comparion if the event value is a string. Args: event_value (object): value retrieved from the event. filter_value (object): value defined by the filter. Returns: bool: True if the event value is part of the second, False otherwise. """ if event_value in filter_value: return True # event_value might be an iterable # first we need to skip strings or we'll do silly things # pylint: disable=consider-merging-isinstance if (isinstance(event_value, py2to3.STRING_TYPES) or isinstance(event_value, bytes)): return False try: for value in event_value: if value not in filter_value: return False return True except TypeError: return False
# TODO: is GenericBinaryOperator the most suitable super class here? # Would BinaryOperator be a better fit?
[docs]class Regexp(GenericBinaryOperator): """Operator to determine if a value matches a regular expression. Attributes: compiled_re (???): compiled regular expression. """ def __init__(self, arguments=None, **kwargs): """Initializes a regular expression operator. This operator uses case senstive comparision. Args: arguments (Optional[object]): operands of the filter. Raises: ValueError: if the regular expression is malformed. """ super(Regexp, self).__init__(arguments=arguments, **kwargs) # Note that right_operand is not necessarily a string. logging.debug('Compiled: {0!s}'.format(self.right_operand)) try: expression = helpers.GetUnicodeString(self.right_operand) compiled_re = re.compile(expression, re.DOTALL) except re.error: raise ValueError('Regular expression "{0!s}" is malformed.'.format( self.right_operand)) self.compiled_re = compiled_re def _CompareValue(self, event_value, filter_value): """Compares if the event value matches a regular expression. Args: event_value (object): value retrieved from the event. filter_value (object): value defined by the filter. Returns: bool: True if the event value matches the regular expression, False otherwise. """ try: string_value = helpers.GetUnicodeString(event_value) if self.compiled_re.search(string_value): return True except TypeError: pass return False
[docs]class RegexpInsensitive(Regexp): """Operator to determine if a value matches a regular expression.""" def __init__(self, arguments=None, **kwargs): """Initializes a regular expression operator. This operator uses case insenstive comparision. Args: arguments (Optional[object]): operands of the filter. Raises: ValueError: if the regular expression is malformed. """ super(RegexpInsensitive, self).__init__(arguments=arguments, **kwargs) # Note that right_operand is not necessarily a string. logging.debug('Compiled: {0!s}'.format(self.right_operand)) try: expression = helpers.GetUnicodeString(self.right_operand) compiled_re = re.compile(expression, re.I | re.DOTALL) except re.error: raise ValueError('Regular expression "{0!s}" is malformed.'.format( self.right_operand)) self.compiled_re = compiled_re