Source code for plaso.filters.path_filter

# -*- coding: utf-8 -*-
"""A scan tree-based path filter implementation.

The scan tree is a tree based on multiple paths that contains the path
segments per node. The most significant path segment is at the root and
therefore compared first. More information can be found here:
https://github.com/libyal/libsigscan/wiki/Internals
#scanning-tree-based-signature-scanning

The scan tree is used in the filter to filter provided paths.
"""

from __future__ import unicode_literals

from plaso.lib import py2to3


# TODO: Update docstrings in this file.
# pylint: disable=missing-type-doc,missing-return-type-doc
class _PathFilterTable(object):
  """Class that implements a path filter table.

  The path filter table is used to construct a scan tree.
  """

  def __init__(self, paths, ignore_list, path_segment_separator='/'):
    """Initializes and builds the path filter table from a list of paths.

    Args:
      paths: a list of strings containing the paths.
      ignore_list: a list of path segment indexes to ignore, where 0 is the
                   index of the first path segment relative from the root.
      path_segment_separator: optional string containing the path segment
                              separator.
    """
    super(_PathFilterTable, self).__init__()
    self._path_segment_separator = path_segment_separator
    self.path_segments_per_index = {}
    self.paths = list(paths)

    for path in self.paths:
      self._AddPathSegments(path, ignore_list)

  def _AddPathSegments(self, path, ignore_list):
    """Adds the path segments to the table.

    Args:
      path: a string containing the path.
      ignore_list: a list of path segment indexes to ignore, where 0 is the
                   index of the first path segment relative from the root.
    """
    path_segments = path.split(self._path_segment_separator)
    for path_segment_index, path_segment in enumerate(path_segments):
      if path_segment_index not in self.path_segments_per_index:
        self.path_segments_per_index[path_segment_index] = {}

      if path_segment_index not in ignore_list:
        path_segments = self.path_segments_per_index[path_segment_index]

        if path_segment not in path_segments:
          path_segments[path_segment] = []

        paths_per_segment_list = path_segments[path_segment]
        paths_per_segment_list.append(path)

  def GetPathIndexesList(self, path_segment_index, path_segment):
    """Retrieves the path indexes list.

    Args:
      path_segment_index: an integer containing the path segment index.
      path_segment: a string containing the path segment.

    Returns:
      A list of integers containing the path indexes.
    """
    return self.path_segments_per_index[path_segment_index][path_segment]

  def GetPathSegments(self, path_segment_index):
    """Retrieves the path segments for a specific path segment index.

    Args:
      path_segment_index: an integer containing the path segment index.

    Returns:
      A dictionary containing the paths list per path segment.
    """
    return self.path_segments_per_index[path_segment_index]

  def ToDebugString(self):
    """Converts the path filter table into a debug string."""
    text_parts = ['Path segment index\tPath segments(s)']
    for index, path_segments in self.path_segments_per_index.items():
      text_parts.append('{0:d}\t\t\t[{1:s}]'.format(
          index, ', '.join(path_segments)))

    text_parts.append('')

    return '\n'.join(text_parts)


class _PathSegmentWeights(object):
  """Class that implements path segment weights.

  The path segment weights are used to determine the most significant
  path segment per index. The path segment weights are used to construct
  a scan tree.
  """

  def __init__(self):
    """Initializes the path segment weights."""
    super(_PathSegmentWeights, self).__init__()
    self._indexes_per_weight = {}
    self._weight_per_index = {}

  def AddIndex(self, path_segment_index):
    """Adds a path segment index and sets its weight to 0.

    Args:
      path_segment_index: an integer containing the path segment index.

    Raises:
      ValueError: if the path segment weights already contains
                  the path segment index.
    """
    if path_segment_index in self._weight_per_index:
      raise ValueError('Path segment index already set.')

    self._weight_per_index[path_segment_index] = 0

  def AddWeight(self, path_segment_index, weight):
    """Adds a weight for a specific path segment index.

    Args:
      path_segment_index: an integer containing the path segment index.
      weight: an integer containing the weight.

    Raises:
      ValueError: if the path segment weights do not contain
                  the path segment index.
    """
    if path_segment_index not in self._weight_per_index:
      raise ValueError('Path segment index not set.')

    self._weight_per_index[path_segment_index] += weight

    if weight not in self._indexes_per_weight:
      self._indexes_per_weight[weight] = []

    self._indexes_per_weight[weight].append(path_segment_index)

  def GetFirstAvailableIndex(self):
    """Retrieves the first available path segment index.

    Returns:
      An integer containing the first available path segment index
      or None if not available.
    """
    path_segment_indexes = sorted(self._weight_per_index.keys())
    if not path_segment_indexes:
      return None

    return path_segment_indexes[0]

  def GetLargestWeight(self):
    """Retrieves the largest weight or 0 if none."""
    if not self._indexes_per_weight:
      return 0

    return max(self._indexes_per_weight)

  def GetIndexesForWeight(self, weight):
    """Retrieves the list of path segment indexes for a specific weight."""
    return self._indexes_per_weight[weight]

  def GetWeightForIndex(self, path_segment_index):
    """Retrieves the weight for a specific path segment index."""
    return self._weight_per_index[path_segment_index]

  def SetWeight(self, path_segment_index, weight):
    """Sets a weight for a specific path segment index.

    Args:
      path_segment_index: an integer containing the path segment index.
      weight: an integer containing the weight.

    Raises:
      ValueError: if the path segment weights do not contain
                  the path segment index.
    """
    if path_segment_index not in self._weight_per_index:
      raise ValueError('Path segment index not set.')

    self._weight_per_index[path_segment_index] = weight

    if weight not in self._indexes_per_weight:
      self._indexes_per_weight[weight] = []

    self._indexes_per_weight[weight].append(path_segment_index)

  def ToDebugString(self):
    """Converts the path segment weights into a debug string."""
    text_parts = ['Path segment index\tWeight']
    for path_segment_index, weight in self._weight_per_index.items():
      text_parts.append('{0:d}\t\t\t{1:d}'.format(
          path_segment_index, weight))
    text_parts.append('')

    text_parts.append('Weight\t\t\tPath segment index(es)')
    for weight, path_segment_indexes in self._indexes_per_weight.items():
      text_parts.append('{0:d}\t\t\t{1!s}'.format(
          weight, path_segment_indexes))
    text_parts.append('')

    return '\n'.join(text_parts)


[docs]class PathFilterScanTree(object): """Class that implements a path filter scan tree.""" def __init__(self, paths, case_sensitive=True, path_segment_separator='/'): """Initializes and builds a path filter scan tree. Args: paths: a list of strings containing the paths. case_sensitive: optional boolean value to indicate string matches should be case sensitive. path_segment_separator: optional string containing the path segment separator. """ super(PathFilterScanTree, self).__init__() self._case_sensitive = case_sensitive self._path_segment_separator = path_segment_separator self._root_node = None if not self._case_sensitive: paths = [path.lower() for path in paths] path_filter_table = _PathFilterTable( paths, [], path_segment_separator=self._path_segment_separator) if path_filter_table.paths: self._root_node = self._BuildScanTreeNode(path_filter_table, []) def _BuildScanTreeNode(self, path_filter_table, ignore_list): """Builds a scan tree node. Args: path_filter_table: a path filter table object (instance of _PathFilterTable). ignore_list: a list of path segment indexes to ignore, where 0 is the index of the first path segment relative from the root. Returns: A scan tree node (instance of PathFilterScanTreeNode). Raises: ValueError: if the path segment index value or the number of paths segments value is out of bounds. """ # Make a copy of the lists because the function is going to alter them # and the changes must remain in scope of the function. paths_list = list(path_filter_table.paths) ignore_list = list(ignore_list) similarity_weights = _PathSegmentWeights() occurrence_weights = _PathSegmentWeights() value_weights = _PathSegmentWeights() for path_segment_index in path_filter_table.path_segments_per_index.keys(): # Skip a path segment index for which no path segments are defined # in the path filter table. if not path_filter_table.path_segments_per_index[path_segment_index]: continue similarity_weights.AddIndex(path_segment_index) occurrence_weights.AddIndex(path_segment_index) value_weights.AddIndex(path_segment_index) path_segments = path_filter_table.GetPathSegments(path_segment_index) number_of_path_segments = len(path_segments.keys()) if number_of_path_segments > 1: occurrence_weights.SetWeight( path_segment_index, number_of_path_segments) for paths_per_segment_list in path_segments.values(): path_segment_weight = len(paths_per_segment_list) if path_segment_weight > 1: similarity_weights.AddWeight(path_segment_index, path_segment_weight) path_segment_index = self._GetMostSignificantPathSegmentIndex( paths_list, similarity_weights, occurrence_weights, value_weights) ignore_list.append(path_segment_index) if path_segment_index < 0: raise ValueError('Invalid path segment index value out of bounds.') scan_tree_node = PathFilterScanTreeNode(path_segment_index) path_segments = path_filter_table.GetPathSegments(path_segment_index) for path_segment, paths_per_segment_list in path_segments.items(): if not paths_per_segment_list: raise ValueError('Invalid number of paths value out of bounds.') if len(paths_per_segment_list) == 1: for path in paths_per_segment_list: scan_tree_node.AddPathSegment(path_segment, path) else: sub_path_filter_table = _PathFilterTable( paths_per_segment_list, ignore_list, path_segment_separator=self._path_segment_separator) scan_sub_node = self._BuildScanTreeNode( sub_path_filter_table, ignore_list) scan_tree_node.AddPathSegment(path_segment, scan_sub_node) for path in paths_per_segment_list: paths_list.remove(path) number_of_paths = len(paths_list) if number_of_paths == 1: scan_tree_node.SetDefaultValue(paths_list[0]) elif number_of_paths > 1: path_filter_table = _PathFilterTable( paths_list, ignore_list, path_segment_separator=self._path_segment_separator) scan_sub_node = self._BuildScanTreeNode(path_filter_table, ignore_list) scan_tree_node.SetDefaultValue(scan_sub_node) return scan_tree_node def _GetMostSignificantPathSegmentIndex( self, paths, similarity_weights, occurrence_weights, value_weights): """Retrieves the index of the most significant path segment. Args: paths: a list of strings containing the paths. similarity_weights: the similarity weights object (instance of _PathSegmentWeights). occurrence_weights: the occurrence weights object (instance of _PathSegmentWeights). value_weights: the value weights object (instance of _PathSegmentWeights). Returns: An integer containing the path segment index. Raises: ValueError: when paths is an empty list. """ if not paths: raise ValueError('Missing paths.') number_of_paths = len(paths) path_segment_index = None if number_of_paths == 1: path_segment_index = self._GetPathSegmentIndexForValueWeights( value_weights) elif number_of_paths == 2: path_segment_index = self._GetPathSegmentIndexForOccurrenceWeights( occurrence_weights, value_weights) elif number_of_paths > 2: path_segment_index = self._GetPathSegmentIndexForSimilarityWeights( similarity_weights, occurrence_weights, value_weights) return path_segment_index def _GetPathSegmentIndexForOccurrenceWeights( self, occurrence_weights, value_weights): """Retrieves the index of the path segment based on occurrence weights. Args: occurrence_weights: the occurrence weights object (instance of _PathSegmentWeights). value_weights: the value weights object (instance of _PathSegmentWeights). Returns: An integer containing the path segment index. """ largest_weight = occurrence_weights.GetLargestWeight() if largest_weight > 0: occurrence_weight_indexes = occurrence_weights.GetIndexesForWeight( largest_weight) number_of_occurrence_indexes = len(occurrence_weight_indexes) else: number_of_occurrence_indexes = 0 path_segment_index = None if number_of_occurrence_indexes == 0: path_segment_index = self._GetPathSegmentIndexForValueWeights( value_weights) elif number_of_occurrence_indexes == 1: path_segment_index = occurrence_weight_indexes[0] else: largest_weight = 0 for occurrence_index in occurrence_weight_indexes: value_weight = value_weights.GetWeightForIndex(occurrence_index) if not path_segment_index or largest_weight < value_weight: largest_weight = value_weight path_segment_index = occurrence_index return path_segment_index def _GetPathSegmentIndexForSimilarityWeights( self, similarity_weights, occurrence_weights, value_weights): """Retrieves the index of the path segment based on similarity weights. Args: similarity_weights: the similarity weights object (instance of _PathSegmentWeights). occurrence_weights: the occurrence weights object (instance of _PathSegmentWeights). value_weights: the value weights object (instance of _PathSegmentWeights). Returns: An integer containing the path segment index. """ largest_weight = similarity_weights.GetLargestWeight() if largest_weight > 0: similarity_weight_indexes = similarity_weights.GetIndexesForWeight( largest_weight) number_of_similarity_indexes = len(similarity_weight_indexes) else: number_of_similarity_indexes = 0 path_segment_index = None if number_of_similarity_indexes == 0: path_segment_index = self._GetPathSegmentIndexForOccurrenceWeights( occurrence_weights, value_weights) elif number_of_similarity_indexes == 1: path_segment_index = similarity_weight_indexes[0] else: largest_weight = 0 largest_value_weight = 0 for similarity_index in similarity_weight_indexes: occurrence_weight = occurrence_weights.GetWeightForIndex( similarity_index) if largest_weight > 0 and largest_weight == occurrence_weight: value_weight = value_weights.GetWeightForIndex(similarity_index) if largest_value_weight < value_weight: largest_weight = 0 if not path_segment_index or largest_weight < occurrence_weight: largest_weight = occurrence_weight path_segment_index = similarity_index largest_value_weight = value_weights.GetWeightForIndex( similarity_index) return path_segment_index def _GetPathSegmentIndexForValueWeights(self, value_weights): """Retrieves the index of the path segment based on value weights. Args: value_weights: the value weights object (instance of _PathSegmentWeights). Returns: An integer containing the path segment index. Raises: RuntimeError: is no path segment index can be found. """ largest_weight = value_weights.GetLargestWeight() if largest_weight > 0: value_weight_indexes = value_weights.GetIndexesForWeight(largest_weight) else: value_weight_indexes = [] if value_weight_indexes: path_segment_index = value_weight_indexes[0] else: path_segment_index = value_weights.GetFirstAvailableIndex() if path_segment_index is None: raise RuntimeError('No path segment index found.') return path_segment_index
[docs] def CheckPath(self, path, path_segment_separator=None): """Checks if a path matches the scan tree-based path filter. Args: path: a string containing the path. path_segment_separator: optional string containing the path segment separator. None defaults to the path segment separator that was set when the path filter scan tree was initialized. Returns: A boolean indicating if the path matches the filter. """ if not self._case_sensitive: path = path.lower() if path_segment_separator is None: path_segment_separator = self._path_segment_separator path_segments = path.split(path_segment_separator) number_of_path_segments = len(path_segments) scan_object = self._root_node while scan_object: if isinstance(scan_object, py2to3.STRING_TYPES): break if scan_object.path_segment_index >= number_of_path_segments: scan_object = scan_object.default_value continue path_segment = path_segments[scan_object.path_segment_index] scan_object = scan_object.GetScanObject(path_segment) if not isinstance(scan_object, py2to3.STRING_TYPES): return False filter_path_segments = scan_object.split(self._path_segment_separator)
return filter_path_segments == path_segments
[docs]class PathFilterScanTreeNode(object): """Class that implements a path filter scan tree node. The path filter scan tree node defines the path segments for a specific path segment index to filter. Each path segment will point to a scan object that indicates the next part of the path filter. A default value indicates the scan object to use next when there was no match. Attributes: default_value: the default scan object, either a scan tree sub node (instance of PathFilterScanTreeNode) or a string containing a path. parent: the parent path filter scan tree node or None. path_segment_index: an integer containing the path segment index represented by the node. """ def __init__(self, path_segment_index): """Initializes a path filter scan tree node. Args: path_segment_index: an integer containing the path segment index. """ super(PathFilterScanTreeNode, self).__init__() self._path_segments = {} self.default_value = None self.parent = None self.path_segment_index = path_segment_index @property def path_segments(self): """A list of strings containing the path segments.""" return self._path_segments.keys()
[docs] def AddPathSegment(self, path_segment, scan_object): """Adds a path segment. Args: path_segment: a string containing the path segment. scan_object: a scan object, either a scan tree sub node (instance of PathFilterScanTreeNode) or a string containing a path. Raises: ValueError: if the node already contains a scan object for the path segment. """ if path_segment in self._path_segments: raise ValueError('Path segment already set.') if isinstance(scan_object, PathFilterScanTreeNode): scan_object.parent = self
self._path_segments[path_segment] = scan_object
[docs] def GetScanObject(self, path_segment): """Retrieves the scan object for a specific path segment. Args: path_segment: a string containing the path segment. Returns: A scan object, which can be a scan tree sub node (instance of PathFilterScanTreeNode), a path or the default value. """
return self._path_segments.get(path_segment, self.default_value)
[docs] def SetDefaultValue(self, scan_object): """Sets the default (non-match) value. Args: scan_object: a scan object, either a scan tree sub node (instance of PathFilterScanTreeNode) or a string containing a path. Raises: TypeError: if the scan object is of an unsupported type. ValueError: if the default value is already set. """ if (not isinstance(scan_object, PathFilterScanTreeNode) and not isinstance(scan_object, py2to3.STRING_TYPES)): raise TypeError('Unsupported scan object type.') if self.default_value: raise ValueError('Default value already set.')
self.default_value = scan_object
[docs] def ToDebugString(self, indentation_level=1): """Converts the path filter scan tree node into a debug string. Args: indentation_level: an integer containing the text indentation level. Returns: A string containing a debug representation of the path filter scan tree node. """ indentation = ' ' * indentation_level text_parts = ['{0:s}path segment index: {1:d}\n'.format( indentation, self.path_segment_index)] for path_segment, scan_object in self._path_segments.items(): text_parts.append('{0:s}path segment: {1:s}\n'.format( indentation, path_segment)) if isinstance(scan_object, PathFilterScanTreeNode): text_parts.append('{0:s}scan tree node:\n'.format(indentation)) text_parts.append(scan_object.ToDebugString(indentation_level + 1)) elif isinstance(scan_object, py2to3.STRING_TYPES): text_parts.append('{0:s}path: {1:s}\n'.format( indentation, scan_object)) text_parts.append('{0:s}default value:\n'.format(indentation)) if isinstance(self.default_value, PathFilterScanTreeNode): text_parts.append('{0:s}scan tree node:\n'.format(indentation)) text_parts.append(self.default_value.ToDebugString(indentation_level + 1)) elif isinstance(self.default_value, py2to3.STRING_TYPES): text_parts.append('{0:s}pattern: {1:s}\n'.format( indentation, self.default_value)) text_parts.append('\n')
return ''.join(text_parts)