Source code for plaso.filters.filter_list

# -*- coding: utf-8 -*-
"""List of object-filters."""

from __future__ import unicode_literals

import logging
import os

import yaml

from plaso.filters import interface
from plaso.filters import manager
from plaso.lib import errors


[docs]class ObjectFilterList(interface.FilterObject): """A list of object-filters with additional metadata.""" def __init__(self): """Initializes an object-filter list object.""" super(ObjectFilterList, self).__init__() self.filters = None def _IncludeKeyword(self, loader, node): """Callback for YAML add_constructor. The YAML constructor is a function that converts a YAML node to a native Python object. A YAML constructor accepts an instance of Loader and a node and returns a Python object. For more information see: http://pyyaml.org/wiki/PyYAMLDocumentation Args: loader (yaml.Loader): the YAML loader object (instance of yaml.Loader). node (yaml.node): a YAML node. Returns: object: a Python object or None. """ filename = loader.construct_scalar(node) if not os.path.isfile(filename): return None with open(filename, 'rb') as file_object: try: return yaml.safe_load(file_object) except yaml.ParserError as exception: logging.error( 'Unable to load rule file with error: {0!s}'.format(exception)) return None def _ParseEntry(self, entry): """Parses a single filter entry. Args: entry (dict[str, dict[str, object]]): dictionary containing one more filter rules and associated metadata. Raises: WrongPlugin: if the entry cannot be parsed. """ # A single file with a list of filters to parse. for name, meta in entry.items(): if 'filter' not in meta: raise errors.WrongPlugin( 'Entry inside {0:s} does not contain a filter statement.'.format( name)) meta_filter = meta.get('filter') matcher = self._GetMatcher(meta_filter) if not matcher: raise errors.WrongPlugin( 'Filter entry [{0:s}] malformed for rule: <{1:s}>'.format( meta_filter, name)) self.filters.append((name, matcher, meta))
[docs] def CompileFilter(self, filter_expression): """Compiles the filter expression. The filter expression contains the name of a YAML file. Args: filter_expression (str): filter expression. Raises: WrongPlugin: if the filter could not be compiled. """ if not os.path.isfile(filter_expression): raise errors.WrongPlugin(( 'ObjectFilterList requires an YAML file to be passed on, ' 'this filter string is not a file.')) yaml.add_constructor( '!include', self._IncludeKeyword, Loader=yaml.loader.SafeLoader) results = None with open(filter_expression, 'rb') as file_object: try: results = yaml.safe_load(file_object) except (yaml.scanner.ScannerError, IOError) as exception: raise errors.WrongPlugin( 'Unable to parse YAML file with error: {0!s}.'.format(exception)) self.filters = [] results_type = type(results) if results_type is dict: self._ParseEntry(results) elif results_type is list: for result in results: if not isinstance(result, dict): raise errors.WrongPlugin( 'Wrong format of YAML file, entry not a dict ({0:s})'.format( results_type)) self._ParseEntry(result) else: raise errors.WrongPlugin( 'Wrong format of YAML file, entry not a dict ({0:s})'.format( results_type))
self._filter_expression = filter_expression
[docs] def Match(self, event): """Determines if an event object matches the filter. Args: event (EventObject): event. Returns: bool: True if the filter matched. """ if not self.filters: return True for _, matcher, _ in self.filters: if matcher.Matches(event): return True
return False manager.FiltersManager.RegisterFilter(ObjectFilterList)