Source code for plaso.parsers.sqlite_plugins.interface

# -*- coding: utf-8 -*-
"""The SQLite parser plugin interface."""

from __future__ import unicode_literals

# pylint: disable=wrong-import-order
try:
  from pysqlite2 import dbapi2 as sqlite3
except ImportError:
  import sqlite3

from plaso.parsers import logger
from plaso.parsers import plugins


[docs]class SQLitePlugin(plugins.BasePlugin): """SQLite parser plugin.""" NAME = 'sqlite' DESCRIPTION = 'Parser for SQLite database files.' # Queries to be executed. # Should be a list of tuples with two entries, SQLCommand and callback # function name. QUERIES = [] # List of tables that should be present in the database, for verification. REQUIRED_TABLES = frozenset([]) # Database schemas this plugin was originally designed for. # Should be a list of dictionaries with {table_name: SQLCommand} format. SCHEMAS = [] # Value to indicate the schema of the database must match one of the schemas # defined by the plugin. REQUIRES_SCHEMA_MATCH = False def __init__(self): """Initializes a SQLite parser plugin.""" super(SQLitePlugin, self).__init__() self._keys_per_query = {} def _GetRowValue(self, query_hash, row, value_name): """Retrieves a value from the row. Args: query_hash (int): hash of the query, that uniquely identifies the query that produced the row. row (sqlite3.Row): row. value_name (str): name of the value. Returns: object: value. """ keys_name_to_index_map = self._keys_per_query.get(query_hash, None) if not keys_name_to_index_map: keys_name_to_index_map = { name: index for index, name in enumerate(row.keys())} self._keys_per_query[query_hash] = keys_name_to_index_map value_index = keys_name_to_index_map.get(value_name) # Note that pysqlite does not accept a Unicode string in row['string'] and # will raise "IndexError: Index must be int or string". return row[value_index] @classmethod def _HashRow(cls, row): """Hashes the given row. Args: row (sqlite3.Row): row. Returns: int: hash value of the given row. """ values = [] for value in row: try: value = '{0!s}'.format(value) except UnicodeDecodeError: # In Python 2, blobs are "read-write buffer" and will cause a # UnicodeDecodeError exception if we try format it as a string. # Since Python 3 does not support the buffer type we cannot check # the type of value. value = repr(value) values.append(value) return hash(' '.join(values)) def _ParseQuery(self, parser_mediator, database, query, callback, cache): """Queries a database and parses the results. Args: parser_mediator (ParserMediator): parser mediator. database (SQLiteDatabase): database. query (str): query. callback (function): function to invoke to parse an individual row. cache (SQLiteCache): cache. """ row_cache = cache.GetRowCache(query) try: rows = database.Query(query) except sqlite3.DatabaseError as exception: parser_mediator.ProduceExtractionError( 'unable to run query: {0:s} on database with error: {1!s}'.format( query, exception)) return for index, row in enumerate(rows): if parser_mediator.abort: break row_hash = self._HashRow(row) if row_hash in row_cache: continue try: callback(parser_mediator, query, row, cache=cache, database=database) except Exception as exception: # pylint: disable=broad-except parser_mediator.ProduceExtractionError(( 'unable to parse row: {0:d} with callback: {1:s} on database ' 'with error: {2!s}').format( index, callback.__name__, exception)) # TODO: consider removing return. return row_cache.add(row_hash)
[docs] def CheckSchema(self, database): """Checks the schema of a database with that defined in the plugin. Args: database (SQLiteDatabase): database. Returns: bool: True if the schema of the database matches that defined by the plugin, or False if the schemas do not match or no schema is defined by the plugin. """ schema_match = False if self.SCHEMAS: for schema in self.SCHEMAS: if database and database.schema == schema: schema_match = True
return schema_match # pylint 1.9.3 wants a docstring for kwargs, but this is not useful to add. # pylint: disable=missing-param-doc,arguments-differ
[docs] def Process( self, parser_mediator, cache=None, database=None, **unused_kwargs): """Determine if this is the right plugin for this database. This function takes a SQLiteDatabase object and compares the list of required tables against the available tables in the database. If all the tables defined in REQUIRED_TABLES are present in the database then this plugin is considered to be the correct plugin and the function will return back a generator that yields event objects. Args: parser_mediator (ParserMediator): parser mediator. cache (Optional[SQLiteCache]): cache. database (Optional[SQLiteDatabase]): database. Raises: ValueError: If the database or cache value are missing. """ if cache is None: raise ValueError('Missing cache value.') if database is None: raise ValueError('Missing database value.') # This will raise if unhandled keyword arguments are passed. super(SQLitePlugin, self).Process(parser_mediator) for query, callback_method in self.QUERIES: if parser_mediator.abort: break callback = getattr(self, callback_method, None) if callback is None: logger.warning( '[{0:s}] missing callback method: {1:s} for query: {2:s}'.format( self.NAME, callback_method, query)) continue
self._ParseQuery(parser_mediator, database, query, callback, cache)