Source code for plaso.formatters.interface

# -*- coding: utf-8 -*-
"""This file contains the event formatters interface classes.

The l2t_csv and other formats are dependent on a message field,
referred to as description_long and description_short in l2t_csv.

Plaso no longer stores these field explicitly.

A formatter, with a format string definition, is used to convert
the event object values into a formatted string that is similar
to the description_long and description_short field.
"""

from __future__ import unicode_literals

import re

from plaso.formatters import logger
from plaso.lib import errors
from plaso.lib import py2to3


[docs]class EventFormatter(object): """Base class to format event type specific data using a format string. Define the (long) format string and the short format string by defining FORMAT_STRING and FORMAT_STRING_SHORT. The syntax of the format strings is similar to that of format() where the place holder for a certain event object attribute is defined as {attribute_name}. """ # The data type is a unique identifier for the event data. The current # approach is to define it as human readable string in the format # root:branch: ... :leaf, e.g. a page visited entry inside a Chrome History # database is defined as: chrome:history:page_visited. DATA_TYPE = 'internal' # The format string. FORMAT_STRING = '' FORMAT_STRING_SHORT = '' # The source short and long strings. SOURCE_SHORT = 'LOG' SOURCE_LONG = '' # The format string can be defined as: # {name}, {name:format}, {name!conversion}, {name!conversion:format} _FORMAT_STRING_ATTRIBUTE_NAME_RE = re.compile( '{([a-z][a-zA-Z0-9_]*)[!]?[^:}]*[:]?[^}]*}') def __init__(self): """Initializes an event formatter object.""" super(EventFormatter, self).__init__() self._format_string_attribute_names = None def _FormatMessage(self, format_string, event_values): """Determines the formatted message string. Args: format_string (str): message format string. event_values (dict[str, object]): event values. Returns: str: formatted message string. """ if not isinstance(format_string, py2to3.UNICODE_TYPE): logger.warning('Format string: {0:s} is non-Unicode.'.format( format_string)) # Plaso code files should be in UTF-8 any thus binary strings are # assumed UTF-8. If this is not the case this should be fixed. format_string = format_string.decode('utf-8', errors='ignore') try: message_string = format_string.format(**event_values) except KeyError as exception: data_type = event_values.get('data_type', 'N/A') display_name = event_values.get('display_name', 'N/A') event_identifier = event_values.get('uuid', 'N/A') parser_chain = event_values.get('parser', 'N/A') error_message = ( 'unable to format string: "{0:s}" event object is missing required ' 'attributes: {1!s}').format(format_string, exception) error_message = ( 'Event: {0:s} data type: {1:s} display name: {2:s} ' 'parser chain: {3:s} with error: {4:s}').format( event_identifier, data_type, display_name, parser_chain, error_message) logger.error(error_message) attribute_values = [] for attribute, value in iter(event_values.items()): attribute_values.append('{0:s}: {1!s}'.format(attribute, value)) message_string = ' '.join(attribute_values) except UnicodeDecodeError as exception: data_type = event_values.get('data_type', 'N/A') display_name = event_values.get('display_name', 'N/A') event_identifier = event_values.get('uuid', 'N/A') parser_chain = event_values.get('parser', 'N/A') error_message = 'Unicode decode error: {0!s}'.format(exception) error_message = ( 'Event: {0:s} data type: {1:s} display name: {2:s} ' 'parser chain: {3:s} with error: {4:s}').format( event_identifier, data_type, display_name, parser_chain, error_message) logger.error(error_message) message_string = '' # Strip carriage return and linefeed form the message strings. # Using replace function here because it is faster than re.sub() or # string.strip(). return message_string.replace('\r', '').replace('\n', '') def _FormatMessages(self, format_string, short_format_string, event_values): """Determines the formatted message strings. Args: format_string (str): message format string. short_format_string (str): short message format string. event_values (dict[str, object]): event values. Returns: tuple(str, str): formatted message string and short message string. """ message_string = self._FormatMessage(format_string, event_values) if short_format_string: short_message_string = self._FormatMessage( short_format_string, event_values) else: short_message_string = message_string # Truncate the short message string if necessary. if len(short_message_string) > 80: short_message_string = '{0:s}...'.format(short_message_string[:77]) return message_string, short_message_string
[docs] def GetFormatStringAttributeNames(self): """Retrieves the attribute names in the format string. Returns: set(str): attribute names. """ if self._format_string_attribute_names is None: self._format_string_attribute_names = ( self._FORMAT_STRING_ATTRIBUTE_NAME_RE.findall( self.FORMAT_STRING))
return set(self._format_string_attribute_names) # pylint: disable=unused-argument
[docs] def GetMessages(self, formatter_mediator, event): """Determines the formatted message strings for an event object. Args: formatter_mediator (FormatterMediator): mediates the interactions between formatters and other components, such as storage and Windows EventLog resources. event (EventObject): event. Returns: tuple(str, str): formatted message string and short message string. Raises: WrongFormatter: if the event object cannot be formatted by the formatter. """ if self.DATA_TYPE != event.data_type: raise errors.WrongFormatter('Unsupported data type: {0:s}.'.format( event.data_type)) event_values = event.CopyToDict() return self._FormatMessages(
self.FORMAT_STRING, self.FORMAT_STRING_SHORT, event_values)
[docs] def GetSources(self, event): """Determines the the short and long source for an event object. Args: event (EventObject): event. Returns: tuple(str, str): short and long source string. Raises: WrongFormatter: if the event object cannot be formatted by the formatter. """ if self.DATA_TYPE != event.data_type: raise errors.WrongFormatter('Unsupported data type: {0:s}.'.format( event.data_type))
return self.SOURCE_SHORT, self.SOURCE_LONG
[docs]class ConditionalEventFormatter(EventFormatter): """Base class to conditionally format event data using format string pieces. Define the (long) format string and the short format string by defining FORMAT_STRING_PIECES and FORMAT_STRING_SHORT_PIECES. The syntax of the format strings pieces is similar to of the event formatter (EventFormatter). Every format string piece should contain a single attribute name or none. FORMAT_STRING_SEPARATOR is used to control the string which the separate string pieces should be joined. It contains a space by default. """ # The format string pieces. FORMAT_STRING_PIECES = [''] FORMAT_STRING_SHORT_PIECES = [''] # The separator used to join the string pieces. FORMAT_STRING_SEPARATOR = ' ' def __init__(self): """Initializes the conditional formatter. A map is build of the string pieces and their corresponding attribute name to optimize conditional string formatting. Raises: RuntimeError: when an invalid format string piece is encountered. """ super(ConditionalEventFormatter, self).__init__() # The format string can be defined as: # {name}, {name:format}, {name!conversion}, {name!conversion:format} regexp = re.compile('{[a-z][a-zA-Z0-9_]*[!]?[^:}]*[:]?[^}]*}') regexp_name = re.compile('[a-z][a-zA-Z0-9_]*') # The format string pieces map is a list containing the attribute name # per format string piece. E.g. ["Description: {description}"] would be # mapped to: [0] = "description". If the string piece does not contain # an attribute name it is treated as text that does not needs formatting. self._format_string_pieces_map = [] for format_string_piece in self.FORMAT_STRING_PIECES: result = regexp.findall(format_string_piece) if not result: # The text format string piece is stored as an empty map entry to # keep the index in the map equal to the format string pieces. self._format_string_pieces_map.append('') elif len(result) == 1: # Extract the attribute name. attribute_name = regexp_name.findall(result[0])[0] self._format_string_pieces_map.append(attribute_name) else: raise RuntimeError(( 'Invalid format string piece: [{0:s}] contains more than 1 ' 'attribute name.').format(format_string_piece)) self._format_string_short_pieces_map = [] for format_string_piece in self.FORMAT_STRING_SHORT_PIECES: result = regexp.findall(format_string_piece) if not result: # The text format string piece is stored as an empty map entry to # keep the index in the map equal to the format string pieces. self._format_string_short_pieces_map.append('') elif len(result) == 1: # Extract the attribute name. attribute_name = regexp_name.findall(result[0])[0] self._format_string_short_pieces_map.append(attribute_name) else: raise RuntimeError(( 'Invalid short format string piece: [{0:s}] contains more ' 'than 1 attribute name.').format(format_string_piece)) def _ConditionalFormatMessages(self, event_values): """Determines the conditional formatted message strings. Args: event_values (dict[str, object]): event values. Returns: tuple(str, str): formatted message string and short message string. """ # Using getattr here to make sure the attribute is not set to None. # if A.b = None, hasattr(A, b) is True but getattr(A, b, None) is False. string_pieces = [] for map_index, attribute_name in enumerate(self._format_string_pieces_map): if not attribute_name or attribute_name in event_values: if attribute_name: attribute = event_values.get(attribute_name, None) # If an attribute is an int, yet has zero value we want to include # that in the format string, since that is still potentially valid # information. Otherwise we would like to skip it. # pylint: disable=unidiomatic-typecheck if (not isinstance(attribute, (bool, float)) and not isinstance(attribute, py2to3.INTEGER_TYPES) and not attribute): continue string_pieces.append(self.FORMAT_STRING_PIECES[map_index]) format_string = self.FORMAT_STRING_SEPARATOR.join(string_pieces) string_pieces = [] for map_index, attribute_name in enumerate( self._format_string_short_pieces_map): if not attribute_name or event_values.get(attribute_name, None): string_pieces.append(self.FORMAT_STRING_SHORT_PIECES[map_index]) short_format_string = self.FORMAT_STRING_SEPARATOR.join(string_pieces) return self._FormatMessages( format_string, short_format_string, event_values)
[docs] def GetFormatStringAttributeNames(self): """Retrieves the attribute names in the format string. Returns: set(str): attribute names. """ if self._format_string_attribute_names is None: self._format_string_attribute_names = [] for format_string_piece in self.FORMAT_STRING_PIECES: attribute_names = self._FORMAT_STRING_ATTRIBUTE_NAME_RE.findall( format_string_piece) if attribute_names: self._format_string_attribute_names.extend(attribute_names)
return set(self._format_string_attribute_names) # pylint: disable=unused-argument
[docs] def GetMessages(self, formatter_mediator, event): """Determines the formatted message strings for an event object. Args: formatter_mediator (FormatterMediator): mediates the interactions between formatters and other components, such as storage and Windows EventLog resources. event (EventObject): event. Returns: tuple(str, str): formatted message string and short message string. Raises: WrongFormatter: if the event object cannot be formatted by the formatter. """ if self.DATA_TYPE != event.data_type: raise errors.WrongFormatter('Unsupported data type: {0:s}.'.format( event.data_type)) event_values = event.CopyToDict()
return self._ConditionalFormatMessages(event_values)