Source code for plaso.analysis.viper

# -*- coding: utf-8 -*-
"""Analysis plugin to look up files in Viper and tag events."""

from __future__ import unicode_literals

from plaso.analysis import interface
from plaso.analysis import logger
from plaso.analysis import manager
from plaso.containers import events
from plaso.lib import errors


[docs]class ViperAnalyzer(interface.HTTPHashAnalyzer): """Class that analyzes file hashes by consulting Viper. REST API reference: https://viper-framework.readthedocs.org/en/latest/usage/web.html#api """ SUPPORTED_HASHES = ['md5', 'sha256'] SUPPORTED_PROTOCOLS = ['http', 'https'] # pylint 1.9.3 wants a docstring for kwargs, but this is not useful to add. # pylint: disable=missing-param-doc def __init__(self, hash_queue, hash_analysis_queue, **kwargs): """Initializes a Viper hash analyzer. Args: hash_queue (Queue.queue): contains hashes to be analyzed. hash_analysis_queue (Queue.queue): that the analyzer will append HashAnalysis objects this queue. """ super(ViperAnalyzer, self).__init__( hash_queue, hash_analysis_queue, **kwargs) self._checked_for_old_python_version = False self._host = None self._port = None self._protocol = None self._url = None def _QueryHash(self, digest): """Queries the Viper Server for a specfic hash. Args: digest (str): hash to look up. Returns: dict[str, object]: JSON response or None on error. """ if not self._url: self._url = '{0:s}://{1:s}:{2:d}/file/find'.format( self._protocol, self._host, self._port) request_data = {self.lookup_hash: digest} try: json_response = self.MakeRequestAndDecodeJSON( self._url, 'POST', data=request_data) except errors.ConnectionError as exception: json_response = None logger.error('Unable to query Viper with error: {0!s}.'.format( exception)) return json_response
[docs] def Analyze(self, hashes): """Looks up hashes in Viper using the Viper HTTP API. Args: hashes (list[str]): hashes to look up. Returns: list[HashAnalysis]: hash analysis. Raises: RuntimeError: If no host has been set for Viper. """ hash_analyses = [] for digest in hashes: json_response = self._QueryHash(digest) hash_analysis = interface.HashAnalysis(digest, json_response) hash_analyses.append(hash_analysis)
return hash_analyses
[docs] def SetHost(self, host): """Sets the address or hostname of the server running Viper server. Args: host (str): IP address or hostname to query. """
self._host = host
[docs] def SetPort(self, port): """Sets the port where Viper server is listening. Args: port (int): port to query. """
self._port = port
[docs] def SetProtocol(self, protocol): """Sets the protocol that will be used to query Viper. Args: protocol (str): protocol to use to query Viper. Either 'http' or 'https'. Raises: ValueError: if the protocol is not supported. """ if protocol not in self.SUPPORTED_PROTOCOLS: raise ValueError('Unsupported protocol: {0!s}'.format(protocol))
self._protocol = protocol
[docs] def TestConnection(self): """Tests the connection to the Viper server. Returns: bool: True if the Viper server instance is reachable. """ url = '{0:s}://{1:s}:{2:d}/test'.format( self._protocol, self._host, self._port) try: json_response = self.MakeRequestAndDecodeJSON(url, 'GET') except errors.ConnectionError: json_response = None
return json_response is not None
[docs]class ViperAnalysisPlugin(interface.HashTaggingAnalysisPlugin): """An analysis plugin for looking up SHA256 hashes in Viper.""" # TODO: Check if there are other file types worth checking Viper for. DATA_TYPES = ['pe:compilation:compilation_time'] URLS = ['https://viper.li'] NAME = 'viper' def __init__(self): """Initializes a Viper analysis plugin.""" super(ViperAnalysisPlugin, self).__init__(ViperAnalyzer)
[docs] def GenerateLabels(self, hash_information): """Generates a list of strings that will be used in the event tag. Args: hash_information (dict[str, object]): JSON decoded contents of the result of a Viper lookup, as produced by the ViperAnalyzer. Returns: list[str]: list of labels to apply to events. """ if not hash_information: return ['viper_not_present'] projects = [] tags = [] for project, entries in iter(hash_information.items()): if not entries: continue projects.append(project) for entry in entries: if entry['tags']: tags.extend(entry['tags']) if not projects: return ['viper_not_present'] strings = ['viper_present'] for project_name in projects: label = events.EventTag.CopyTextToLabel( project_name, prefix='viper_project_') strings.append(label) for tag_name in tags: label = events.EventTag.CopyTextToLabel(tag_name, prefix='viper_tag_') strings.append(label)
return strings
[docs] def SetHost(self, host): """Sets the address or hostname of the server running Viper server. Args: host (str): IP address or hostname to query. """
self._analyzer.SetHost(host)
[docs] def SetPort(self, port): """Sets the port where Viper server is listening. Args: port (int): port to query. """
self._analyzer.SetPort(port)
[docs] def SetProtocol(self, protocol): """Sets the protocol that will be used to query Viper. Args: protocol (str): protocol to use to query Viper. Either 'http' or 'https'. Raises: ValueError: If an invalid protocol is selected. """ protocol = protocol.lower().strip() if protocol not in ['http', 'https']: raise ValueError('Invalid protocol specified for Viper lookup')
self._analyzer.SetProtocol(protocol)
[docs] def TestConnection(self): """Tests the connection to the Viper server. Returns: bool: True if the Viper server instance is reachable. """
return self._analyzer.TestConnection() manager.AnalysisPluginManager.RegisterPlugin(ViperAnalysisPlugin)