Source code for plaso.analyzers.yara_analyzer

# -*- coding: utf-8 -*-
"""Analyzer that matches Yara rules."""

from __future__ import unicode_literals

import yara

from plaso.analyzers import interface
from plaso.analyzers import logger
from plaso.analyzers import manager
from plaso.containers import analyzer_result
from plaso.lib import definitions


[docs]class YaraAnalyzer(interface.BaseAnalyzer): """Analyzer that matches Yara rules.""" # pylint: disable=no-member NAME = 'yara' DESCRIPTION = 'Matches Yara rules over input data.' PROCESSING_STATUS_HINT = definitions.STATUS_INDICATOR_YARA_SCAN INCREMENTAL_ANALYZER = False _ATTRIBUTE_NAME = 'yara_match' _MATCH_TIMEOUT = 60 def __init__(self): """Initializes the Yara analyzer.""" super(YaraAnalyzer, self).__init__() self._matches = [] self._rules = None
[docs] def Analyze(self, data): """Analyzes a block of data, attempting to match Yara rules to it. Args: data(bytes): a block of data. """ if not self._rules: return try: self._matches = self._rules.match(data=data, timeout=self._MATCH_TIMEOUT) except yara.YaraTimeoutError: logger.error('Could not process file within timeout: {0:d}'.format( self._MATCH_TIMEOUT)) except yara.YaraError as exception: logger.error('Error processing file with Yara: {0!s}.'.format( exception))
[docs] def GetResults(self): """Retrieves results of the most recent analysis. Returns: list[AnalyzerResult]: results. """ result = analyzer_result.AnalyzerResult() result.analyzer_name = self.NAME result.attribute_name = self._ATTRIBUTE_NAME rule_names = [match.rule for match in self._matches] result.attribute_value = ','.join(rule_names) return [result]
[docs] def Reset(self): """Resets the internal state of the analyzer.""" self._matches = []
[docs] def SetRules(self, rules_string): """Sets the rules that the Yara analyzer will use. Args: rules_string(str): Yara rule definitions """ self._rules = yara.compile(source=rules_string)
manager.AnalyzersManager.RegisterAnalyzer(YaraAnalyzer)