# -*- coding: utf-8 -*-
"""An LL(1) lexer. This lexer is very tolerant of errors and can resync.
This lexer is originally copied from the GRR project:
https://code.google.com/p/grr
"""
from __future__ import unicode_literals
import logging
import re
from plaso.lib import errors
# TODO: Update docstrings in this file.
# pylint: disable=missing-type-doc,missing-return-type-doc,missing-return-doc
# pylint: disable=missing-param-doc
[docs]class Token(object):
"""A token action."""
def __init__(self, state_regex, regex, actions, next_state, flags=re.I):
"""Initializes the token object.
Args:
state_regex: If this regular expression matches the current state this
rule is considered.
regex: A regular expression to try and match from the current point.
actions: A command separated list of method names in the Lexer to call.
next_state: The next state we transition to if this Token matches.
flags: re flags.
"""
self.state_regex = re.compile(
state_regex, re.DOTALL | re.M | re.S | re.U | flags)
self.regex = re.compile(regex, re.DOTALL | re.M | re.S | re.U | flags)
self.re_str = regex
self.actions = []
if actions:
self.actions = actions.split(',')
self.next_state = next_state
[docs]class Lexer(object):
"""A generic feed lexer."""
_CONTINUE_STATE = 'CONTINUE'
_INITIAL_STATE = 'INITIAL'
_ERROR_TOKEN = 'Error'
# A list of Token() instances.
tokens = []
def __init__(self, data=''):
"""Initializes the lexer object.
Args:
data: optional initial data to be processed by the lexer.
"""
super(Lexer, self).__init__()
self.buffer = data
self.error = 0
self.flags = 0
self.processed = 0
self.processed_buffer = ''
self.state = self._INITIAL_STATE
self.state_stack = []
self.verbose = 0
[docs] def NextToken(self):
"""Fetch the next token by trying to match any of the regexes in order."""
current_state = self.state
for token in self.tokens:
# Does the rule apply to us?
if not token.state_regex.match(current_state):
continue
# Try to match the rule
m = token.regex.match(self.buffer)
if not m:
continue
# The match consumes the data off the buffer (the handler can put it back
# if it likes)
# TODO: using joins might be more efficient here.
self.processed_buffer += self.buffer[:m.end()]
self.buffer = self.buffer[m.end():]
self.processed += m.end()
next_state = token.next_state
for action in token.actions:
# Is there a callback to handle this action?
callback = getattr(self, action, self.Default)
# Allow a callback to skip other callbacks.
try:
possible_next_state = callback(string=m.group(0), match=m)
if possible_next_state == self._CONTINUE_STATE:
continue
# Override the state from the Token
elif possible_next_state:
next_state = possible_next_state
except errors.ParseError as exception:
self.Error(exception)
# Update the next state
if next_state:
self.state = next_state
return token
# Check that we are making progress - if we are too full, we assume we are
# stuck.
self.Error('Expected {0:s}'.format(self.state))
self.processed_buffer += self.buffer[:1]
self.buffer = self.buffer[1:]
return self._ERROR_TOKEN
[docs] def Close(self):
"""A convenience function to force us to parse all the data."""
while self.NextToken():
if not self.buffer:
return
[docs] def Default(self, **kwarg):
"""The default callback handler."""
logging.debug('Default handler: {0:s}'.format(kwarg))
[docs] def Empty(self):
"""Returns a boolean indicating if the buffer is empty."""
return not self.buffer
[docs] def Error(self, message=None, weight=1):
"""Log an error down.
Args:
message: optional error message.
weight: optional error weight.
"""
logging.debug('Error({0:d}): {1:s}'.format(weight, message))
# Keep a count of errors
self.error += weight
[docs] def PushState(self, **unused_kwargs):
"""Push the current state on the state stack."""
logging.debug('Storing state {0:s}'.format(repr(self.state)))
self.state_stack.append(self.state)
[docs] def PopState(self, **unused_kwargs):
"""Pop the previous state from the stack."""
try:
self.state = self.state_stack.pop()
logging.debug('Returned state to {0:s}'.format(self.state))
return self.state
except IndexError:
self.Error(
'Tried to pop the state but failed - possible recursion error')
[docs] def Feed(self, data):
"""Feed the buffer with data.
Args:
data: data to be processed by the lexer.
"""
self.buffer = ''.join([self.buffer, data])
[docs] def PushBack(self, string='', **unused_kwargs):
"""Push the match back on the stream.
Args:
string: optional data.
"""
self.buffer = string + self.buffer
self.processed_buffer = self.processed_buffer[:-len(string)]
[docs]class SelfFeederMixIn(Lexer):
"""This mixin is used to make a lexer which feeds itself.
Note that self.file_object must be the file object we read from.
"""
# TODO: fix this, file object either needs to be set or not passed here.
def __init__(self, file_object=None):
"""Initializes the lexer feeder min object.
Args:
file_object: Optional file-like object.
"""
super(SelfFeederMixIn, self).__init__()
self.file_object = file_object
# pylint: disable=arguments-differ
[docs] def Feed(self, size=512):
"""Feed data into the buffer.
Args:
size: optional data size to read form the file-like object.
"""
data = self.file_object.read(size)
Lexer.Feed(self, data)
return len(data)
[docs] def NextToken(self):
"""Retrieves the next token.
Returns:
The next token (instance of Token) or None.
"""
# If we don't have enough data - feed ourselves: We assume
# that we must have at least one sector in our buffer.
if len(self.buffer) < 512:
if self.Feed() == 0 and not self.buffer:
return None
return Lexer.NextToken(self)
[docs]class Expression(object):
"""A class representing an expression."""
attribute = None
args = None
operator = None
# The expected number of args
number_of_args = 1
def __init__(self):
"""Initializes the expression object."""
self.args = []
[docs] def __str__(self):
"""Return a string representation of the expression."""
return 'Expression: ({0:s}) ({1:s}) {2:s}'.format(
self.attribute, self.operator, self.args)
[docs] def AddArg(self, arg):
"""Adds a new arg to this expression.
Args:
arg: The argument to add (string).
Returns:
True if this arg is the last arg, False otherwise.
Raises:
ParseError: If there are too many args.
"""
self.args.append(arg)
if len(self.args) > self.number_of_args:
raise errors.ParseError('Too many args for this expression.')
elif len(self.args) == self.number_of_args:
return True
return False
[docs] def Compile(self, unused_filter_implementation):
"""Given a filter implementation, compile this expression."""
raise NotImplementedError(
'{0:s} does not implement Compile.'.format(self.__class__.__name__))
# TODO: rename this function to GetTreeAsString or equivalent.
[docs] def PrintTree(self, depth=''):
"""Print the tree."""
return '{0:s} {1:s}'.format(depth, self)
[docs] def SetAttribute(self, attribute):
"""Set the attribute."""
self.attribute = attribute
[docs] def SetOperator(self, operator):
"""Set the operator."""
self.operator = operator
[docs]class BinaryExpression(Expression):
"""An expression which takes two other expressions."""
def __init__(self, operator='', part=None):
"""Initializes the expression object."""
self.operator = operator
self.args = []
if part:
self.args.append(part)
super(BinaryExpression, self).__init__()
[docs] def __str__(self):
"""Return a string representation of the binary expression."""
return 'Binary Expression: {0:s} {1:s}'.format(
self.operator, [str(x) for x in self.args])
[docs] def AddOperands(self, lhs, rhs):
"""Add an operand."""
if isinstance(lhs, Expression) and isinstance(rhs, Expression):
self.args = [lhs, rhs]
else:
raise errors.ParseError(
'Expected expression, got {0:s} {1:s} {2:s}'.format(
lhs, self.operator, rhs))
# TODO: rename this function to GetTreeAsString or equivalent.
[docs] def PrintTree(self, depth=''):
"""Print the tree."""
result = '{0:s}{1:s}\n'.format(depth, self.operator)
for part in self.args:
result += '{0:s}-{1:s}\n'.format(depth, part.PrintTree(depth + ' '))
return result
[docs] def Compile(self, filter_implementation):
"""Compile the binary expression into a filter object."""
operator = self.operator.lower()
if operator == 'and' or operator == '&&':
method = 'AndFilter'
elif operator == 'or' or operator == '||':
method = 'OrFilter'
else:
raise errors.ParseError(
'Invalid binary operator {0:s}'.format(operator))
args = [x.Compile(filter_implementation) for x in self.args]
return getattr(filter_implementation, method)(*args)
[docs]class IdentityExpression(Expression):
"""An Expression which always evaluates to True."""
[docs] def Compile(self, filter_implementation):
"""Compile the expression."""
return filter_implementation.IdentityFilter()
[docs]class SearchParser(Lexer):
"""This parser can parse the mini query language and build an AST.
Examples of valid syntax:
filename contains "foo" and (size > 100k or date before "2011-10")
date between 2011 and 2010
files older than 1 year
"""
expression_cls = Expression
binary_expression_cls = BinaryExpression
tokens = [
# Double quoted string
Token('STRING', '"', 'PopState,StringFinish', None),
Token('STRING', r'\\(.)', 'StringEscape', None),
Token('STRING', r'[^\\"]+', 'StringInsert', None),
# Single quoted string
Token('SQ_STRING', '\'', 'PopState,StringFinish', None),
Token('SQ_STRING', r'\\(.)', 'StringEscape', None),
Token('SQ_STRING', r'[^\\\']+', 'StringInsert', None),
# TODO: Implement a unary not operator.
# The first thing we see in the initial state takes up to the ATTRIBUTE
Token('INITIAL', r'(and|or|\&\&|\|\|)', 'BinaryOperator', None),
Token('INITIAL', r'[^\s\(\)]', 'PushState,PushBack', 'ATTRIBUTE'),
Token('INITIAL', r'\(', 'BracketOpen', None),
Token('INITIAL', r'\)', 'BracketClose', None),
Token('ATTRIBUTE', r'[\w._0-9]+', 'StoreAttribute', 'OPERATOR'),
Token('OPERATOR', r'[a-z0-9<>=\-\+\!\^\&%]+', 'StoreOperator',
'ARG_LIST'),
Token('OPERATOR', r'(!=|[<>=])', 'StoreSpecialOperator', 'ARG_LIST'),
Token('ARG_LIST', r'[^\s\'"]+', 'InsertArg', None),
# Start a string.
Token('.', '"', 'PushState,StringStart', 'STRING'),
Token('.', '\'', 'PushState,StringStart', 'SQ_STRING'),
# Skip whitespace.
Token('.', r'\s+', None, None),
]
def __init__(self, data):
"""Initializes the search parser object."""
# Holds expression
self.current_expression = self.expression_cls()
self.filter_string = data
# The token stack
self.stack = []
self.string = None
Lexer.__init__(self, data)
[docs] def BinaryOperator(self, string=None, **unused_kwargs):
"""Set the binary operator."""
self.stack.append(self.binary_expression_cls(string))
[docs] def BracketOpen(self, **unused_kwargs):
"""Define an open bracket."""
self.stack.append('(')
[docs] def BracketClose(self, **unused_kwargs):
"""Close the bracket."""
self.stack.append(')')
[docs] def StringStart(self, **unused_kwargs):
"""Initialize the string."""
self.string = ''
[docs] def StringEscape(self, string, match, **unused_kwargs):
"""Escape backslashes found inside a string quote.
Backslashes followed by anything other than ['"rnbt] will just be included
in the string.
Args:
string: The string that matched.
match: the match object (instance of re.MatchObject).
Where match.group(1) contains the escaped code.
"""
if match.group(1) in '\'"rnbt':
self.string += string.decode('unicode_escape')
else:
self.string += string
[docs] def StringInsert(self, string='', **unused_kwargs):
"""Add to the string."""
self.string += string
[docs] def StringFinish(self, **unused_kwargs):
"""Finish the string operation."""
if self.state == 'ATTRIBUTE':
return self.StoreAttribute(string=self.string)
elif self.state == 'ARG_LIST':
return self.InsertArg(string=self.string)
return None
[docs] def StoreAttribute(self, string='', **unused_kwargs):
"""Store the attribute."""
logging.debug('Storing attribute {0:s}'.format(repr(string)))
# TODO: Update the expected number_of_args
try:
self.current_expression.SetAttribute(string)
except AttributeError:
raise errors.ParseError('Invalid attribute \'{0:s}\''.format(string))
return 'OPERATOR'
[docs] def StoreOperator(self, string='', **unused_kwargs):
"""Store the operator."""
logging.debug('Storing operator {0:s}'.format(repr(string)))
self.current_expression.SetOperator(string)
[docs] def InsertArg(self, string='', **unused_kwargs):
"""Insert an arg to the current expression."""
logging.debug('Storing Argument {0:s}'.format(string))
# This expression is complete
if self.current_expression.AddArg(string):
self.stack.append(self.current_expression)
self.current_expression = self.expression_cls()
return self.PopState()
return None
def _CombineBinaryExpressions(self, operator):
"""Combine binary expressions."""
for i in range(1, len(self.stack)-1):
item = self.stack[i]
if (isinstance(item, BinaryExpression) and item.operator == operator and
isinstance(self.stack[i-1], Expression) and
isinstance(self.stack[i+1], Expression)):
lhs = self.stack[i-1]
rhs = self.stack[i+1]
self.stack[i].AddOperands(lhs, rhs)
self.stack[i-1] = None
self.stack[i+1] = None
self.stack = list(filter(None, self.stack))
def _CombineParenthesis(self):
"""Combine parenthesis."""
for i in range(len(self.stack)-2):
if (self.stack[i] == '(' and self.stack[i+2] == ')' and
isinstance(self.stack[i+1], Expression)):
self.stack[i] = None
self.stack[i+2] = None
self.stack = list(filter(None, self.stack))
[docs] def Reduce(self):
"""Reduce the token stack into an AST."""
# Check for sanity
if self.state != 'INITIAL':
self.Error('Premature end of expression')
length = len(self.stack)
while length > 1:
# Precedence order
self._CombineParenthesis()
self._CombineBinaryExpressions('and')
self._CombineBinaryExpressions('or')
# No change
if len(self.stack) == length:
break
length = len(self.stack)
if length != 1:
self.Error('Illegal query expression')
return self.stack[0]
[docs] def Error(self, message=None, unused_weight=1):
"""Raise an error message."""
raise errors.ParseError(
'{0:s} in position {1:s}: {2:s} <----> {3:s} )'.format(
message, len(self.processed_buffer), self.processed_buffer,
self.buffer))
[docs] def Parse(self):
"""Parse."""
if not self.filter_string:
return IdentityExpression()
self.Close()
return self.Reduce()