Source code for plaso.storage.sqlite.merge_reader

# -*- coding: utf-8 -*-
"""Merge reader for SQLite storage files."""

from __future__ import unicode_literals

import os
import sqlite3

from plaso.containers import errors
from plaso.containers import event_sources
from plaso.containers import events
from plaso.containers import reports
from plaso.containers import tasks
from plaso.storage import identifiers
from plaso.storage import interface


[docs]class SQLiteStorageMergeReader(interface.StorageFileMergeReader): """SQLite-based storage file reader for merging.""" _CONTAINER_TYPE_ANALYSIS_REPORT = reports.AnalysisReport.CONTAINER_TYPE _CONTAINER_TYPE_EVENT = events.EventObject.CONTAINER_TYPE _CONTAINER_TYPE_EVENT_DATA = events.EventData.CONTAINER_TYPE _CONTAINER_TYPE_EVENT_SOURCE = event_sources.EventSource.CONTAINER_TYPE _CONTAINER_TYPE_EVENT_TAG = events.EventTag.CONTAINER_TYPE _CONTAINER_TYPE_EXTRACTION_ERROR = errors.ExtractionError.CONTAINER_TYPE _CONTAINER_TYPE_TASK_COMPLETION = tasks.TaskCompletion.CONTAINER_TYPE _CONTAINER_TYPE_TASK_START = tasks.TaskStart.CONTAINER_TYPE _CONTAINER_TYPES = ( _CONTAINER_TYPE_EVENT_SOURCE, _CONTAINER_TYPE_EVENT_DATA, _CONTAINER_TYPE_EVENT, _CONTAINER_TYPE_EVENT_TAG, _CONTAINER_TYPE_EXTRACTION_ERROR, _CONTAINER_TYPE_ANALYSIS_REPORT) _TABLE_NAMES_QUERY = ( 'SELECT name FROM sqlite_master WHERE type = "table"') def __init__(self, storage_writer, path): """Initializes a storage merge reader. Args: storage_writer (StorageWriter): storage writer. path (str): path to the input file. Raises: IOError: if the input file cannot be opened. """ super(SQLiteStorageMergeReader, self).__init__(storage_writer) self._active_container_type = None self._active_cursor = None self._connection = None self._container_types = None self._cursor = None self._event_data_identifier_mappings = {} self._path = path def _AddAttributeContainer(self, attribute_container): """Adds a single attribute container to the storage writer. Args: attribute_container (AttributeContainer): container Raises: RuntimeError: if the attribute container type is not supported. """ container_type = attribute_container.CONTAINER_TYPE if container_type == self._CONTAINER_TYPE_EVENT_SOURCE: self._storage_writer.AddEventSource(attribute_container) elif container_type == self._CONTAINER_TYPE_EVENT_DATA: identifier = attribute_container.GetIdentifier() lookup_key = identifier.CopyToString() self._storage_writer.AddEventData(attribute_container) identifier = attribute_container.GetIdentifier() self._event_data_identifier_mappings[lookup_key] = identifier elif container_type == self._CONTAINER_TYPE_EVENT: if hasattr(attribute_container, 'event_data_row_identifier'): event_data_identifier = identifiers.SQLTableIdentifier( self._CONTAINER_TYPE_EVENT_DATA, attribute_container.event_data_row_identifier) lookup_key = event_data_identifier.CopyToString() event_data_identifier = self._event_data_identifier_mappings[lookup_key] attribute_container.SetEventDataIdentifier(event_data_identifier) # TODO: add event identifier mappings for event tags. self._storage_writer.AddEvent(attribute_container) elif container_type == self._CONTAINER_TYPE_EVENT_TAG: self._storage_writer.AddEventTag(attribute_container) elif container_type == self._CONTAINER_TYPE_EXTRACTION_ERROR: self._storage_writer.AddError(attribute_container) elif container_type == self._CONTAINER_TYPE_ANALYSIS_REPORT: self._storage_writer.AddAnalysisReport(attribute_container) elif container_type not in ( self._CONTAINER_TYPE_TASK_COMPLETION, self._CONTAINER_TYPE_TASK_START): raise RuntimeError('Unsupported container type: {0:s}'.format( container_type))
[docs] def MergeAttributeContainers( self, callback=None, maximum_number_of_containers=0): """Reads attribute containers from a task storage file into the writer. Args: callback (function[StorageWriter, AttributeContainer]): function to call after each attribute container is deserialized. maximum_number_of_containers (Optional[int]): maximum number of containers to merge, where 0 represent no limit. Returns: bool: True if the entire task storage file has been merged. Raises: OSError: if the task storage file cannot be deleted. """ if not self._cursor: self._connection = sqlite3.connect( self._path, detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) self._cursor = self._connection.cursor() self._cursor.execute(self._TABLE_NAMES_QUERY) table_names = [row[0] for row in self._cursor.fetchall()] # Remove container types not stored in the storage file but keep # the container types list in order. self._container_types = list(self._CONTAINER_TYPES) for name in set(self._CONTAINER_TYPES).difference(table_names): self._container_types.remove(name) number_of_containers = 0 while self._active_cursor or self._container_types: if not self._active_cursor: self._active_container_type = self._container_types.pop(0) query = 'SELECT _identifier, _data FROM {0:s}'.format( self._active_container_type) self._cursor.execute(query) self._active_cursor = self._cursor if maximum_number_of_containers > 0: number_of_rows = maximum_number_of_containers - number_of_containers rows = self._active_cursor.fetchmany(size=number_of_rows) else: rows = self._active_cursor.fetchall() if not rows: self._active_cursor = None continue for row in rows: identifier = identifiers.SQLTableIdentifier( self._active_container_type, row[0]) serialized_data = row[1] attribute_container = self._DeserializeAttributeContainer( self._active_container_type, serialized_data) attribute_container.SetIdentifier(identifier) if self._active_container_type == self._CONTAINER_TYPE_EVENT_TAG: event_identifier = identifiers.SQLTableIdentifier( self._CONTAINER_TYPE_EVENT, attribute_container.event_row_identifier) attribute_container.SetEventIdentifier(event_identifier) del attribute_container.event_row_identifier if callback: callback(self._storage_writer, attribute_container) self._AddAttributeContainer(attribute_container) number_of_containers += 1 if (maximum_number_of_containers > 0 and number_of_containers >= maximum_number_of_containers): return False self._connection.close() self._connection = None self._cursor = None os.remove(self._path) return True