Source code for plaso.analysis.tagging

# -*- coding: utf-8 -*-
"""A plugin to tag events according to rules in a tagging file."""

from __future__ import unicode_literals

import logging
import io
import re
import os

from efilter import ast as efilter_ast
from efilter import api as efilter_api
from efilter import errors as efilter_errors
from efilter import query as efilter_query

from plaso.analysis import interface
from plaso.analysis import manager
from plaso.containers import reports
from plaso.lib import errors


[docs]class TaggingAnalysisPlugin(interface.AnalysisPlugin): """Analysis plugin that tags events according to rules in a tag file.""" NAME = 'tagging' ENABLE_IN_EXTRACTION = True _EVENT_TAG_COMMENT = 'Tag applied by tagging analysis plugin.' _OS_TAG_FILES = { 'macos': 'tag_macos.txt', 'windows': 'tag_windows.txt'} # A line with no indent is a tag name. _TAG_LABEL_LINE = re.compile(r'^(\w+)') # A line with leading indent is one of the rules for the preceding tag. _TAG_RULE_LINE = re.compile(r'^\s+(.+)') # If any of these words are in the query then it's probably objectfilter. _OBJECTFILTER_WORDS = re.compile( r'\s(is|isnot|equals|notequals|inset|notinset|contains|notcontains)\s') def __init__(self): """Initializes a tagging analysis plugin.""" super(TaggingAnalysisPlugin, self).__init__() self._autodetect_tag_file_attempt = False self._number_of_event_tags = 0 self._tag_rules = None self._tagging_file_name = None def _AttemptAutoDetectTagFile(self, analysis_mediator): """Detects which tag file is most appropriate. Args: analysis_mediator (AnalysisMediator): analysis mediator. Returns: bool: True if a tag file is autodetected. """ self._autodetect_tag_file_attempt = True if not analysis_mediator.data_location: return False operating_system = analysis_mediator.operating_system.lower() filename = self._OS_TAG_FILES.get(operating_system, None) if not filename: return False logging.info('Using auto detected tag file: {0:s}'.format(filename)) tag_file_path = os.path.join(analysis_mediator.data_location, filename) self.SetAndLoadTagFile(tag_file_path) return True def _ParseDefinitions(self, tagging_file_path): """Parses the tag file and yields tuples of label name, list of rule ASTs. Args: tagging_file_path (str): path to the tagging file. Yields: tuple: contains: str: label name. list[efilter.query.Query]: efilter queries. """ queries = None label_name = None with io.open(tagging_file_path, 'r', encoding='utf-8') as tagging_file: for line in tagging_file.readlines(): label_match = self._TAG_LABEL_LINE.match(line) if label_match: if label_name and queries: yield label_name, queries queries = [] label_name = label_match.group(1) continue event_tagging_expression = self._TAG_RULE_LINE.match(line) if not event_tagging_expression: continue tagging_rule = self._ParseEventTaggingRule( event_tagging_expression.group(1)) queries.append(tagging_rule) # Yield any remaining tags once we reach the end of the file. if label_name and queries: yield label_name, queries def _ParseEventTaggingRule(self, event_tagging_expression): """Parses an event tagging expression. This method attempts to detect whether the event tagging expression is valid objectfilter or dottysql syntax. Example: _ParseEventTaggingRule('5 + 5') # Returns Sum(Literal(5), Literal(5)) Args: event_tagging_expression (str): event tagging experssion either in objectfilter or dottysql syntax. Returns: efilter.query.Query: efilter query of the event tagging expression. Raises: TaggingFileError: when the tagging file cannot be correctly parsed. """ if self._OBJECTFILTER_WORDS.search(event_tagging_expression): syntax = 'objectfilter' else: syntax = 'dottysql' try: return efilter_query.Query(event_tagging_expression, syntax=syntax) except efilter_errors.EfilterParseError as exception: stripped_expression = event_tagging_expression.rstrip() raise errors.TaggingFileError(( 'Unable to parse event tagging expressoin: "{0:s}" with error: ' '{1!s}').format(stripped_expression, exception)) def _ParseTaggingFile(self, tag_file_path): """Parses tag definitions from the source. Args: tag_file_path (str): path to the tag file. Returns: efilter.ast.Expression: efilter abstract syntax tree (AST), containing the tagging rules. """ tags = [] for label_name, rules in self._ParseDefinitions(tag_file_path): if not rules: continue tag = efilter_ast.IfElse( # Union will be true if any of the 'rules' match. efilter_ast.Union(*[rule.root for rule in rules]), # If so then evaluate to a string with the name of the tag. efilter_ast.Literal(label_name), # Otherwise don't return anything. efilter_ast.Literal(None)) tags.append(tag) # Generate a repeated value with all the tags (None will be skipped). return efilter_ast.Repeat(*tags)
[docs] def CompileReport(self, mediator): """Compiles an analysis report. Args: mediator (AnalysisMediator): mediates interactions between analysis plugins and other components, such as storage and dfvfs. Returns: AnalysisReport: analysis report. """ report_text = 'Tagging plugin produced {0:d} tags.\n'.format( self._number_of_event_tags) self._number_of_event_tags = 0 return reports.AnalysisReport(plugin_name=self.NAME, text=report_text)
[docs] def ExamineEvent(self, mediator, event): """Analyzes an EventObject and tags it according to rules in the tag file. Args: mediator (AnalysisMediator): mediates interactions between analysis plugins and other components, such as storage and dfvfs. event (EventObject): event to examine. """ if self._tag_rules is None: if self._autodetect_tag_file_attempt: # There's nothing to tag with, and we've already tried to find a good # tag file, so there's nothing we can do with this event (or any other). return if not self._AttemptAutoDetectTagFile(mediator): logging.info( 'No tag definition file specified, and plaso was not able to ' 'autoselect a tagging file. As no definitions were specified, ' 'no events will be tagged.') return try: matched_labels = efilter_api.apply(self._tag_rules, vars=event) except efilter_errors.EfilterTypeError as exception: logging.warning('Unable to apply efilter query with error: {0!s}'.format( exception)) matched_labels = None if not matched_labels: return labels = list(efilter_api.getvalues(matched_labels)) event_tag = self._CreateEventTag(event, self._EVENT_TAG_COMMENT, labels) mediator.ProduceEventTag(event_tag) self._number_of_event_tags += 1
[docs] def SetAndLoadTagFile(self, tagging_file_path): """Sets the tag file to be used by the plugin. Args: tagging_file_path (str): path of the tagging file. """ self._tag_rules = self._ParseTaggingFile(tagging_file_path) self._tagging_file_name = tagging_file_path
manager.AnalysisPluginManager.RegisterPlugin(TaggingAnalysisPlugin)