Source code for plaso.storage.interface

# -*- coding: utf-8 -*-
"""The storage interface classes."""

from __future__ import unicode_literals

import abc
import os
import shutil
import tempfile

from plaso.lib import definitions
from plaso.serializer import json_serializer


[docs]class SerializedAttributeContainerList(object): """Serialized attribute container list. The list is unsorted and pops attribute containers in the same order as pushed to preserve order. The GetAttributeContainerByIndex method should be used to read attribute containers from the list while it being filled. Attributes: data_size (int): total data size of the serialized attribute containers on the list. next_sequence_number (int): next attribute container sequence number. """ def __init__(self): """Initializes a serialized attribute container list.""" super(SerializedAttributeContainerList, self).__init__() self._list = [] self.data_size = 0 self.next_sequence_number = 0 @property def number_of_attribute_containers(self): """int: number of serialized attribute containers on the list.""" return len(self._list)
[docs] def Empty(self): """Empties the list.""" self._list = []
self.data_size = 0
[docs] def GetAttributeContainerByIndex(self, index): """Retrieves a specific serialized attribute container from the list. Args: index (int): attribute container index. Returns: bytes: serialized attribute container data or None if not available. Raises: IndexError: if the index is less than zero. """ if index < 0: raise IndexError( 'Unsupported negative index value: {0:d}.'.format(index)) if index < len(self._list): return self._list[index]
return None
[docs] def PopAttributeContainer(self): """Pops a serialized attribute container from the list. Returns: bytes: serialized attribute container data. """ try: serialized_data = self._list.pop(0) self.data_size -= len(serialized_data) return serialized_data except IndexError:
return None
[docs] def PushAttributeContainer(self, serialized_data): """Pushes a serialized attribute container onto the list. Args: serialized_data (bytes): serialized attribute container data. """ self._list.append(serialized_data) self.data_size += len(serialized_data)
self.next_sequence_number += 1 # pylint: disable=redundant-returns-doc,redundant-yields-doc
[docs]class BaseStore(object): """Storage interface.""" def __init__(self): """Initializes a store.""" super(BaseStore, self).__init__() self._serializers_profiler = None self._storage_profiler = None
[docs] @abc.abstractmethod def AddAnalysisReport(self, analysis_report): """Adds an analysis report. Args: analysis_report (AnalysisReport): analysis report.
"""
[docs] @abc.abstractmethod def AddError(self, error): """Adds an error. Args: error (ExtractionError): error.
"""
[docs] @abc.abstractmethod def AddEvent(self, event): """Adds an event. Args: event (EventObject): event.
"""
[docs] @abc.abstractmethod def AddEventSource(self, event_source): """Adds an event source. Args: event_source (EventSource): event source.
"""
[docs] @abc.abstractmethod def AddEventTag(self, event_tag): """Adds an event tag. Args: event_tag (EventTag): event tag.
"""
[docs] @abc.abstractmethod def Close(self):
"""Closes the storage."""
[docs] @abc.abstractmethod def GetAnalysisReports(self): """Retrieves the analysis reports. Yields: AnalysisReport: analysis report.
"""
[docs] @abc.abstractmethod def GetErrors(self): """Retrieves the errors. Yields: ExtractionError: error.
"""
[docs] @abc.abstractmethod def GetEventData(self): """Retrieves the event data. Yields: EventData: event data.
"""
[docs] @abc.abstractmethod def GetEventDataByIdentifier(self, identifier): """Retrieves specific event data. Args: identifier (AttributeContainerIdentifier): event data identifier. Returns: EventData: event data or None if not available.
"""
[docs] @abc.abstractmethod def GetEvents(self): """Retrieves the events. Yields: EventObject: event.
"""
[docs] @abc.abstractmethod def GetEventSources(self): """Retrieves the event sources. Yields: EventSource: event source.
"""
[docs] @abc.abstractmethod def GetEventTagByIdentifier(self, identifier): """Retrieves a specific event tag. Args: identifier (AttributeContainerIdentifier): event tag identifier. Returns: EventTag: event tag or None if not available.
"""
[docs] @abc.abstractmethod def GetEventTags(self): """Retrieves the event tags. Yields: EventTag: event tag.
"""
[docs] @abc.abstractmethod def GetNumberOfEventSources(self): """Retrieves the number event sources. Returns: int: number of event sources.
"""
[docs] @abc.abstractmethod def GetSortedEvents(self, time_range=None): """Retrieves the events in increasing chronological order. This includes all events written to the storage including those pending being flushed (written) to the storage. Args: time_range (Optional[TimeRange]): time range used to filter events that fall in a specific period. Yields: EventObject: event.
"""
[docs] @abc.abstractmethod def HasAnalysisReports(self): """Determines if a store contains analysis reports. Returns: bool: True if the store contains analysis reports.
"""
[docs] @abc.abstractmethod def HasErrors(self): """Determines if a store contains extraction errors. Returns: bool: True if the store contains extraction errors.
"""
[docs] @abc.abstractmethod def HasEventTags(self): """Determines if a store contains event tags. Returns: bool: True if the store contains event tags.
"""
[docs] @abc.abstractmethod def Open(self, **kwargs):
"""Opens the storage."""
[docs] @abc.abstractmethod def ReadPreprocessingInformation(self, knowledge_base): """Reads preprocessing information. The preprocessing information contains the system configuration which contains information about various system specific configuration data, for example the user accounts. Args: knowledge_base (KnowledgeBase): is used to store the preprocessing information.
"""
[docs] def SetSerializersProfiler(self, serializers_profiler): """Sets the serializers profiler. Args: serializers_profiler (SerializersProfiler): serializers profiler. """
self._serializers_profiler = serializers_profiler
[docs] def SetStorageProfiler(self, storage_profiler): """Sets the storage profiler. Args: storage_profiler (StorageProfiler): storage profiler. """
self._storage_profiler = storage_profiler
[docs] @abc.abstractmethod def WritePreprocessingInformation(self, knowledge_base): """Writes preprocessing information. Args: knowledge_base (KnowledgeBase): contains the preprocessing information.
"""
[docs] @abc.abstractmethod def WriteSessionCompletion(self, session_completion): """Writes session completion information. Args: session_completion (SessionCompletion): session completion information.
"""
[docs] @abc.abstractmethod def WriteSessionStart(self, session_start): """Writes session start information. Args: session_start (SessionStart): session start information.
"""
[docs] @abc.abstractmethod def WriteTaskCompletion(self, task_completion): """Writes task completion information. Args: task_completion (TaskCompletion): task completion information.
"""
[docs] @abc.abstractmethod def WriteTaskStart(self, task_start): """Writes task start information. Args: task_start (TaskStart): task start information.
"""
[docs]class BaseStorageFile(BaseStore): """Interface for file-based stores.""" # pylint: disable=abstract-method def __init__(self): """Initializes a file-based store.""" super(BaseStorageFile, self).__init__() self._is_open = False self._read_only = True self._serialized_attribute_containers = {} self._serializer = json_serializer.JSONAttributeContainerSerializer def _DeserializeAttributeContainer(self, container_type, serialized_data): """Deserializes an attribute container. Args: container_type (str): attribute container type. serialized_data (bytes): serialized attribute container data. Returns: AttributeContainer: attribute container or None. Raises: IOError: if the serialized data cannot be decoded. """ if not serialized_data: return None if self._serializers_profiler: self._serializers_profiler.StartTiming(container_type) try: serialized_string = serialized_data.decode('utf-8') except UnicodeDecodeError as exception: raise IOError('Unable to decode serialized data: {0!s}'.format( exception)) attribute_container = self._serializer.ReadSerialized(serialized_string) if self._serializers_profiler: self._serializers_profiler.StopTiming(container_type) return attribute_container def _GetNumberOfSerializedAttributeContainers(self, container_type): """Retrieves the number of serialized attribute containers. Args: container_type (str): attribute container type. Returns: int: number of serialized attribute containers. """ container_list = self._GetSerializedAttributeContainerList(container_type) return container_list.number_of_attribute_containers def _GetSerializedAttributeContainerByIndex(self, container_type, index): """Retrieves a specific serialized attribute container. Args: container_type (str): attribute container type. index (int): attribute container index. Returns: bytes: serialized attribute container data or None if not available. """ container_list = self._GetSerializedAttributeContainerList(container_type) return container_list.GetAttributeContainerByIndex(index) def _GetSerializedAttributeContainerList(self, container_type): """Retrieves a serialized attribute container list. Args: container_type (str): attribute container type. Returns: SerializedAttributeContainerList: serialized attribute container list. """ container_list = self._serialized_attribute_containers.get( container_type, None) if not container_list: container_list = SerializedAttributeContainerList() self._serialized_attribute_containers[container_type] = container_list return container_list def _SerializeAttributeContainer(self, attribute_container): """Serializes an attribute container. Args: attribute_container (AttributeContainer): attribute container. Returns: bytes: serialized attribute container. Raises: IOError: if the attribute container cannot be serialized. """ if self._serializers_profiler: self._serializers_profiler.StartTiming( attribute_container.CONTAINER_TYPE) try: attribute_container_data = self._serializer.WriteSerialized( attribute_container) if not attribute_container_data: raise IOError( 'Unable to serialize attribute container: {0:s}.'.format( attribute_container.CONTAINER_TYPE)) attribute_container_data = attribute_container_data.encode('utf-8') finally: if self._serializers_profiler: self._serializers_profiler.StopTiming( attribute_container.CONTAINER_TYPE) return attribute_container_data def _RaiseIfNotWritable(self): """Raises if the storage file is not writable. Raises: IOError: when the storage file is closed or read-only. """ if not self._is_open: raise IOError('Unable to write to closed storage file.') if self._read_only:
raise IOError('Unable to write to read-only storage file.')
[docs]class StorageMergeReader(object): """Storage reader interface for merging.""" def __init__(self, storage_writer): """Initializes a storage merge reader. Args: storage_writer (StorageWriter): storage writer. """ super(StorageMergeReader, self).__init__() self._storage_writer = storage_writer
[docs] @abc.abstractmethod def MergeAttributeContainers( self, callback=None, maximum_number_of_containers=0): """Reads attribute containers from a task storage file into the writer. Args: callback (function[StorageWriter, AttributeContainer]): function to call after each attribute container is deserialized. maximum_number_of_containers (Optional[int]): maximum number of containers to merge, where 0 represent no limit. Returns: bool: True if the entire task storage file has been merged.
"""
[docs]class StorageFileMergeReader(StorageMergeReader): """Storage reader interface for merging file-based stores.""" # pylint: disable=abstract-method def __init__(self, storage_writer): """Initializes a storage merge reader. Args: storage_writer (StorageWriter): storage writer. """ super(StorageFileMergeReader, self).__init__(storage_writer) self._serializer = json_serializer.JSONAttributeContainerSerializer self._serializers_profiler = None def _DeserializeAttributeContainer(self, container_type, serialized_data): """Deserializes an attribute container. Args: container_type (str): attribute container type. serialized_data (bytes): serialized attribute container data. Returns: AttributeContainer: attribute container or None. Raises: IOError: if the serialized data cannot be decoded. """ if not serialized_data: return None if self._serializers_profiler: self._serializers_profiler.StartTiming(container_type) try: serialized_string = serialized_data.decode('utf-8') except UnicodeDecodeError as exception: raise IOError('Unable to decode serialized data: {0!s}'.format( exception)) attribute_container = self._serializer.ReadSerialized(serialized_string) if self._serializers_profiler: self._serializers_profiler.StopTiming(container_type)
return attribute_container # pylint: disable=redundant-returns-doc,redundant-yields-doc
[docs]class StorageReader(object): """Storage reader interface."""
[docs] def __enter__(self): """Make usable with "with" statement."""
return self # pylint: disable=unused-argument
[docs] def __exit__(self, exception_type, value, traceback): """Make usable with "with" statement."""
self.Close()
[docs] @abc.abstractmethod def Close(self):
"""Closes the storage reader."""
[docs] @abc.abstractmethod def GetAnalysisReports(self): """Retrieves the analysis reports. Yields: AnalysisReport: analysis report.
"""
[docs] @abc.abstractmethod def GetErrors(self): """Retrieves the errors. Yields: ExtractionError: error.
"""
[docs] @abc.abstractmethod def GetEventData(self): """Retrieves the event data. Yields: EventData: event data.
"""
[docs] @abc.abstractmethod def GetEventDataByIdentifier(self, identifier): """Retrieves specific event data. Args: identifier (AttributeContainerIdentifier): event data identifier. Returns: EventData: event data or None if not available.
"""
[docs] @abc.abstractmethod def GetEvents(self): """Retrieves the events. Yields: EventObject: event.
"""
[docs] @abc.abstractmethod def GetEventSources(self): """Retrieves event sources. Yields: EventSourceObject: event source.
"""
[docs] @abc.abstractmethod def GetEventTagByIdentifier(self, identifier): """Retrieves a specific event tag. Args: identifier (AttributeContainerIdentifier): event tag identifier. Returns: EventTag: event tag or None if not available.
"""
[docs] @abc.abstractmethod def GetEventTags(self): """Retrieves the event tags. Yields: EventTag: event tag.
"""
[docs] @abc.abstractmethod def GetNumberOfAnalysisReports(self): """Retrieves the number analysis reports. Returns: int: number of analysis reports.
"""
[docs] @abc.abstractmethod def GetSortedEvents(self, time_range=None): """Retrieves the events in increasing chronological order. This includes all events written to the storage including those pending being flushed (written) to the storage. Args: time_range (Optional[TimeRange]): time range used to filter events that fall in a specific period. Yields: EventObject: event.
"""
[docs] @abc.abstractmethod def ReadPreprocessingInformation(self, knowledge_base): """Reads preprocessing information. The preprocessing information contains the system configuration which contains information about various system specific configuration data, for example the user accounts. Args: knowledge_base (KnowledgeBase): is used to store the preprocessing information.
"""
[docs] @abc.abstractmethod def SetSerializersProfiler(self, serializers_profiler): """Sets the serializers profiler. Args: serializers_profiler (SerializersProfiler): serializers profiler.
"""
[docs] @abc.abstractmethod def SetStorageProfiler(self, storage_profiler): """Sets the storage profiler. Args: storage_profiler (StorageProfiler): storage profile.
"""
[docs]class StorageFileReader(StorageReader): """File-based storage reader interface.""" def __init__(self, path): """Initializes a storage reader. Args: path (str): path to the input file. """ super(StorageFileReader, self).__init__() self._path = path self._storage_file = None
[docs] def Close(self): """Closes the storage reader.""" if self._storage_file: self._storage_file.Close()
self._storage_file = None
[docs] def GetAnalysisReports(self): """Retrieves the analysis reports. Returns: generator(AnalysisReport): analysis report generator. """
return self._storage_file.GetAnalysisReports()
[docs] def GetErrors(self): """Retrieves the errors. Returns: generator(ExtractionError): error generator. """
return self._storage_file.GetErrors()
[docs] def GetEventData(self): """Retrieves the event data. Returns: generator(EventData): event data generator. """
return self._storage_file.GetEventData()
[docs] def GetEventDataByIdentifier(self, identifier): """Retrieves specific event data. Args: identifier (AttributeContainerIdentifier): event data identifier. Returns: EventData: event data or None if not available. """
return self._storage_file.GetEventDataByIdentifier(identifier)
[docs] def GetEvents(self): """Retrieves the events. Returns: generator(EventObject): event generator. """
return self._storage_file.GetEvents()
[docs] def GetEventSources(self): """Retrieves the event sources. Returns: generator(EventSource): event source generator. """
return self._storage_file.GetEventSources()
[docs] def GetEventTagByIdentifier(self, identifier): """Retrieves a specific event tag. Args: identifier (AttributeContainerIdentifier): event tag identifier. Returns: EventTag: event tag or None if not available. """
return self._storage_file.GetEventTagByIdentifier(identifier)
[docs] def GetEventTags(self): """Retrieves the event tags. Returns: generator(EventTag): event tag generator. """
return self._storage_file.GetEventTags()
[docs] def GetNumberOfAnalysisReports(self): """Retrieves the number analysis reports. Returns: int: number of analysis reports. """
return self._storage_file.GetNumberOfAnalysisReports()
[docs] def GetSortedEvents(self, time_range=None): """Retrieves the events in increasing chronological order. This includes all events written to the storage including those pending being flushed (written) to the storage. Args: time_range (Optional[TimeRange]): time range used to filter events that fall in a specific period. Returns: generator(EventObject): event generator. """
return self._storage_file.GetSortedEvents(time_range=time_range)
[docs] def ReadPreprocessingInformation(self, knowledge_base): """Reads preprocessing information. The preprocessing information contains the system configuration which contains information about various system specific configuration data, for example the user accounts. Args: knowledge_base (KnowledgeBase): is used to store the preprocessing information. """
self._storage_file.ReadPreprocessingInformation(knowledge_base)
[docs] def SetSerializersProfiler(self, serializers_profiler): """Sets the serializers profiler. Args: serializers_profiler (SerializersProfiler): serializers profiler. """
self._storage_file.SetSerializersProfiler(serializers_profiler)
[docs] def SetStorageProfiler(self, storage_profiler): """Sets the storage profiler. Args: storage_profiler (StorageProfiler): storage profiler. """
self._storage_file.SetStorageProfiler(storage_profiler) # pylint: disable=redundant-returns-doc,redundant-yields-doc
[docs]class StorageWriter(object): """Storage writer interface. Attributes: number_of_analysis_reports (int): number of analysis reports written. number_of_errors (int): number of errors written. number_of_event_sources (int): number of event sources written. number_of_event_tags (int): number of event tags written. number_of_events (int): number of events written. """ def __init__( self, session, storage_type=definitions.STORAGE_TYPE_SESSION, task=None): """Initializes a storage writer. Args: session (Session): session the storage changes are part of. storage_type (Optional[str]): storage type. task(Optional[Task]): task. """ super(StorageWriter, self).__init__() self._first_written_event_source_index = 0 self._serializers_profiler = None self._session = session self._storage_profiler = None self._storage_type = storage_type self._task = task self._written_event_source_index = 0 self.number_of_analysis_reports = 0 self.number_of_errors = 0 self.number_of_event_sources = 0 self.number_of_event_tags = 0 self.number_of_events = 0
[docs] @abc.abstractmethod def AddAnalysisReport(self, analysis_report): """Adds an analysis report. Args: analysis_report (AnalysisReport): a report.
"""
[docs] @abc.abstractmethod def AddError(self, error): """Adds an error. Args: error (ExtractionError): an error.
"""
[docs] @abc.abstractmethod def AddEvent(self, event): """Adds an event. Args: event(EventObject): an event.
"""
[docs] @abc.abstractmethod def AddEventSource(self, event_source): """Adds an event source. Args: event_source (EventSource): an event source.
"""
[docs] @abc.abstractmethod def AddEventTag(self, event_tag): """Adds an event tag. Args: event_tag (EventTag): an event tag.
"""
[docs] @abc.abstractmethod def Close(self):
"""Closes the storage writer.""" # pylint: disable=unused-argument
[docs] def CreateTaskStorage(self, task): """Creates a task storage. Args: task (Task): task. Returns: StorageWriter: storage writer. Raises: NotImplementedError: since there is no implementation. """
raise NotImplementedError()
[docs] @abc.abstractmethod def GetEventDataByIdentifier(self, identifier): """Retrieves specific event data. Args: identifier (AttributeContainerIdentifier): event data identifier. Returns: EventData: event data or None if not available.
"""
[docs] @abc.abstractmethod def GetEvents(self): """Retrieves the events. Yields: EventObject: event.
"""
[docs] @abc.abstractmethod def GetFirstWrittenEventSource(self): """Retrieves the first event source that was written after open. Using GetFirstWrittenEventSource and GetNextWrittenEventSource newly added event sources can be retrieved in order of addition. Returns: EventSource: event source or None if there are no newly written ones.
"""
[docs] @abc.abstractmethod def GetNextWrittenEventSource(self): """Retrieves the next event source that was written after open. Returns: EventSource: event source or None if there are no newly written ones.
"""
[docs] @abc.abstractmethod def GetSortedEvents(self, time_range=None): """Retrieves the events in increasing chronological order. This includes all events written to the storage including those pending being flushed (written) to the storage. Args: time_range (Optional[TimeRange]): time range used to filter events that fall in a specific period. Yields: EventObject: event.
""" # pylint: disable=unused-argument
[docs] def FinalizeTaskStorage(self, task): """Finalizes a processed task storage. Args: task (Task): task. Raises: NotImplementedError: since there is no implementation. """
raise NotImplementedError()
[docs] @abc.abstractmethod def Open(self):
"""Opens the storage writer.""" # pylint: disable=unused-argument
[docs] def PrepareMergeTaskStorage(self, task): """Prepares a task storage for merging. Args: task (Task): task. Raises: NotImplementedError: since there is no implementation. """
raise NotImplementedError()
[docs] @abc.abstractmethod def ReadPreprocessingInformation(self, knowledge_base): """Reads preprocessing information. The preprocessing information contains the system configuration which contains information about various system specific configuration data, for example the user accounts. Args: knowledge_base (KnowledgeBase): is used to store the preprocessing information.
""" # pylint: disable=unused-argument
[docs] def RemoveProcessedTaskStorage(self, task): """Removes a processed task storage. Args: task (Task): task. Raises: NotImplementedError: since there is no implementation. """
raise NotImplementedError()
[docs] @abc.abstractmethod def SetSerializersProfiler(self, serializers_profiler): """Sets the serializers profiler. Args: serializers_profiler (SerializersProfiler): serializers profiler.
"""
[docs] @abc.abstractmethod def SetStorageProfiler(self, storage_profiler): """Sets the storage profiler. Args: storage_profiler (StorageProfiler): storage profiler.
"""
[docs] @abc.abstractmethod def WritePreprocessingInformation(self, knowledge_base): """Writes preprocessing information. Args: knowledge_base (KnowledgeBase): contains the preprocessing information.
"""
[docs] @abc.abstractmethod def WriteSessionCompletion(self, aborted=False): """Writes session completion information. Args: aborted (Optional[bool]): True if the session was aborted.
"""
[docs] @abc.abstractmethod def WriteSessionStart(self):
"""Writes session start information."""
[docs] @abc.abstractmethod def WriteTaskCompletion(self, aborted=False): """Writes task completion information. Args: aborted (Optional[bool]): True if the session was aborted.
"""
[docs] @abc.abstractmethod def WriteTaskStart(self):
"""Writes task start information."""
[docs]class StorageFileWriter(StorageWriter): """Defines an interface for a file-backed storage writer.""" def __init__( self, session, output_file, storage_type=definitions.STORAGE_TYPE_SESSION, task=None): """Initializes a storage writer. Args: session (Session): session the storage changes are part of. output_file (str): path to the output file. storage_type (Optional[str]): storage type. task(Optional[Task]): task. """ super(StorageFileWriter, self).__init__( session, storage_type=storage_type, task=task) self._merge_task_storage_path = '' self._output_file = output_file self._processed_task_storage_path = '' self._storage_file = None self._task_storage_path = None @abc.abstractmethod def _CreateStorageFile(self): """Creates a storage file. Returns: BaseStorageFile: storage file. """ @abc.abstractmethod def _CreateTaskStorageMergeReader(self, path): """Creates a task storage merge reader. Args: path (str): path to the task storage file that should be merged. Returns: StorageMergeReader: storage merge reader. """ @abc.abstractmethod def _CreateTaskStorageWriter(self, path, task): """Creates a task storage writer. Args: path (str): path to the storage file. task (Task): task. Returns: StorageWriter: storage writer. """ def _GetMergeTaskStorageFilePath(self, task): """Retrieves the path of a task storage file in the merge directory. Args: task (Task): task. Returns: str: path of a task storage file file in the merge directory. """ filename = '{0:s}.plaso'.format(task.identifier) return os.path.join(self._merge_task_storage_path, filename) def _GetProcessedStorageFilePath(self, task): """Retrieves the path of a task storage file in the processed directory. Args: task (Task): task. Returns: str: path of a task storage file in the processed directory. """ filename = '{0:s}.plaso'.format(task.identifier) return os.path.join(self._processed_task_storage_path, filename) def _GetTaskStorageFilePath(self, task): """Retrieves the path of a task storage file in the temporary directory. Args: task (Task): task. Returns: str: path of a task storage file in the temporary directory. """ filename = '{0:s}.plaso'.format(task.identifier) return os.path.join(self._task_storage_path, filename) def _UpdateCounters(self, event): """Updates the counters. Args: event (EventObject): event. """ self._session.parsers_counter['total'] += 1 # Here we want the name of the parser or plugin not the parser chain. parser_name = getattr(event, 'parser', '') _, _, parser_name = parser_name.rpartition('/') if not parser_name: parser_name = 'N/A' self._session.parsers_counter[parser_name] += 1 def _RaiseIfNotWritable(self): """Raises if the storage writer is not writable. Raises: IOError: when the storage writer is closed. """ if not self._storage_file: raise IOError('Unable to write to closed storage writer.')
[docs] def AddAnalysisReport(self, analysis_report): """Adds an analysis report. Args: analysis_report (AnalysisReport): analysis report. Raises: IOError: when the storage writer is closed. """ self._RaiseIfNotWritable() self._storage_file.AddAnalysisReport(analysis_report) report_identifier = analysis_report.plugin_name self._session.analysis_reports_counter['total'] += 1 self._session.analysis_reports_counter[report_identifier] += 1
self.number_of_analysis_reports += 1
[docs] def AddError(self, error): """Adds an error. Args: error (AnalysisError|ExtractionError): an analysis or extraction error. Raises: IOError: when the storage writer is closed. """ self._RaiseIfNotWritable() self._storage_file.AddError(error)
self.number_of_errors += 1
[docs] def AddEvent(self, event): """Adds an event. Args: event (EventObject): an event. Raises: IOError: when the storage writer is closed. """ self._RaiseIfNotWritable() self._storage_file.AddEvent(event) self.number_of_events += 1
self._UpdateCounters(event)
[docs] def AddEventData(self, event_data): """Adds event data. Args: event_data (EventData): event data. Raises: IOError: when the storage writer is closed. """ self._RaiseIfNotWritable()
self._storage_file.AddEventData(event_data)
[docs] def AddEventSource(self, event_source): """Adds an event source. Args: event_source (EventSource): an event source. Raises: IOError: when the storage writer is closed. """ self._RaiseIfNotWritable() self._storage_file.AddEventSource(event_source)
self.number_of_event_sources += 1
[docs] def AddEventTag(self, event_tag): """Adds an event tag. Args: event_tag (EventTag): an event tag. Raises: IOError: when the storage writer is closed. """ self._RaiseIfNotWritable() self._storage_file.AddEventTag(event_tag) self._session.event_labels_counter['total'] += 1 for label in event_tag.labels: self._session.event_labels_counter[label] += 1
self.number_of_event_tags += 1
[docs] def CheckTaskReadyForMerge(self, task): """Checks if a task is ready for merging with this session storage. If the task is ready to be merged, this method also sets the task's storage file size. Args: task (Task): task. Returns: bool: True if the task is ready to be merged. Raises: IOError: if the storage type is not supported or if the temporary path for the task storage does not exist. """ if self._storage_type != definitions.STORAGE_TYPE_SESSION: raise IOError('Unsupported storage type.') if not self._processed_task_storage_path: raise IOError('Missing processed task storage path.') processed_storage_file_path = self._GetProcessedStorageFilePath(task) try: stat_info = os.stat(processed_storage_file_path) except (IOError, OSError): return False task.storage_file_size = stat_info.st_size
return True
[docs] def Close(self): """Closes the storage writer. Raises: IOError: when the storage writer is closed. """ self._RaiseIfNotWritable() self._storage_file.Close()
self._storage_file = None
[docs] def CreateTaskStorage(self, task): """Creates a task storage. The task storage is used to store attributes created by the task. Args: task(Task): task. Returns: StorageWriter: storage writer. Raises: IOError: if the storage type is not supported. """ if self._storage_type != definitions.STORAGE_TYPE_SESSION: raise IOError('Unsupported storage type.') storage_file_path = self._GetTaskStorageFilePath(task)
return self._CreateTaskStorageWriter(storage_file_path, task)
[docs] def GetEventDataByIdentifier(self, identifier): """Retrieves specific event data. Args: identifier (AttributeContainerIdentifier): event data identifier. Returns: EventData: event data or None if not available. """
return self._storage_file.GetEventDataByIdentifier(identifier)
[docs] def GetEvents(self): """Retrieves the events. Returns: generator(EventObject): event generator. Raises: IOError: when the storage writer is closed. """
return self._storage_file.GetEvents()
[docs] def GetEventTagByIdentifier(self, identifier): """Retrieves a specific event tag. Args: identifier (AttributeContainerIdentifier): event tag identifier. Returns: EventTag: event tag or None if not available. """
return self._storage_file.GetEventTagByIdentifier(identifier)
[docs] def GetEventTags(self): """Retrieves the event tags. Returns: generator(EventTag): event tag generator. """
return self._storage_file.GetEventTags()
[docs] def GetFirstWrittenEventSource(self): """Retrieves the first event source that was written after open. Using GetFirstWrittenEventSource and GetNextWrittenEventSource newly added event sources can be retrieved in order of addition. Returns: EventSource: event source or None if there are no newly written ones. Raises: IOError: when the storage writer is closed. """ if not self._storage_file: raise IOError('Unable to read from closed storage writer.') event_source = self._storage_file.GetEventSourceByIndex( self._first_written_event_source_index) if event_source: self._written_event_source_index = ( self._first_written_event_source_index + 1)
return event_source
[docs] def GetNextWrittenEventSource(self): """Retrieves the next event source that was written after open. Returns: EventSource: event source or None if there are no newly written ones. Raises: IOError: when the storage writer is closed. """ if not self._storage_file: raise IOError('Unable to read from closed storage writer.') event_source = self._storage_file.GetEventSourceByIndex( self._written_event_source_index) if event_source: self._written_event_source_index += 1
return event_source
[docs] def GetProcessedTaskIdentifiers(self): """Identifiers for tasks which have been processed. Returns: list[str]: task identifiers that are processed. Raises: IOError: if the storage type is not supported or if the temporary path for the task storage does not exist. """ if self._storage_type != definitions.STORAGE_TYPE_SESSION: raise IOError('Unsupported storage type.') if not self._processed_task_storage_path: raise IOError('Missing processed task storage path.') return [ path.replace('.plaso', '')
for path in os.listdir(self._processed_task_storage_path)]
[docs] def GetSortedEvents(self, time_range=None): """Retrieves the events in increasing chronological order. This includes all events written to the storage including those pending being flushed (written) to the storage. Args: time_range (Optional[TimeRange]): time range used to filter events that fall in a specific period. Returns: generator(EventObject): event generator. Raises: IOError: when the storage writer is closed. """ if not self._storage_file: raise IOError('Unable to read from closed storage writer.')
return self._storage_file.GetSortedEvents(time_range=time_range)
[docs] def FinalizeTaskStorage(self, task): """Finalizes a processed task storage. Moves the task storage file from its temporary directory to the processed directory. Args: task (Task): task. Raises: IOError: if the storage type is not supported or if the storage file cannot be renamed. """ if self._storage_type != definitions.STORAGE_TYPE_SESSION: raise IOError('Unsupported storage type.') storage_file_path = self._GetTaskStorageFilePath(task) processed_storage_file_path = self._GetProcessedStorageFilePath(task) try: os.rename(storage_file_path, processed_storage_file_path) except OSError as exception: raise IOError(( 'Unable to rename task storage file: {0:s} with error: '
'{1!s}').format(storage_file_path, exception))
[docs] def Open(self): """Opens the storage writer. Raises: IOError: if the storage writer is already opened. """ if self._storage_file: raise IOError('Storage writer already opened.') self._storage_file = self._CreateStorageFile() if self._serializers_profiler: self._storage_file.SetSerializersProfiler(self._serializers_profiler) if self._storage_profiler: self._storage_file.SetStorageProfiler(self._storage_profiler) self._storage_file.Open(path=self._output_file, read_only=False) self._first_written_event_source_index = ( self._storage_file.GetNumberOfEventSources())
self._written_event_source_index = self._first_written_event_source_index
[docs] def PrepareMergeTaskStorage(self, task): """Prepares a task storage for merging. Moves the task storage file from the processed directory to the merge directory. Args: task (Task): task. Raises: IOError: if the storage type is not supported or if the storage file cannot be renamed. """ if self._storage_type != definitions.STORAGE_TYPE_SESSION: raise IOError('Unsupported storage type.') merge_storage_file_path = self._GetMergeTaskStorageFilePath(task) processed_storage_file_path = self._GetProcessedStorageFilePath(task) task.storage_file_size = os.path.getsize(processed_storage_file_path) try: os.rename(processed_storage_file_path, merge_storage_file_path) except OSError as exception: raise IOError(( 'Unable to rename task storage file: {0:s} with error: '
'{1!s}').format(processed_storage_file_path, exception))
[docs] def ReadPreprocessingInformation(self, knowledge_base): """Reads preprocessing information. The preprocessing information contains the system configuration which contains information about various system specific configuration data, for example the user accounts. Args: knowledge_base (KnowledgeBase): is used to store the preprocessing information. Raises: IOError: when the storage writer is closed. """ if not self._storage_file: raise IOError('Unable to read from closed storage writer.')
self._storage_file.ReadPreprocessingInformation(knowledge_base)
[docs] def RemoveProcessedTaskStorage(self, task): """Removes a processed task storage. Args: task (Task): task. Raises: IOError: if the storage type is not supported or if the storage file cannot be removed. """ if self._storage_type != definitions.STORAGE_TYPE_SESSION: raise IOError('Unsupported storage type.') processed_storage_file_path = self._GetProcessedStorageFilePath(task) try: os.remove(processed_storage_file_path) except OSError as exception: raise IOError(( 'Unable to remove task storage file: {0:s} with error: '
'{1!s}').format(processed_storage_file_path, exception))
[docs] def SetSerializersProfiler(self, serializers_profiler): """Sets the serializers profiler. Args: serializers_profiler (SerializersProfiler): serializers profiler. """ self._serializers_profiler = serializers_profiler if self._storage_file:
self._storage_file.SetSerializersProfiler(serializers_profiler)
[docs] def SetStorageProfiler(self, storage_profiler): """Sets the storage profiler. Args: storage_profiler (StorageProfiler): storage profiler. """ self._storage_profiler = storage_profiler if self._storage_file:
self._storage_file.SetStorageProfiler(storage_profiler)
[docs] def StartMergeTaskStorage(self, task): """Starts a merge of a task storage with the session storage. Args: task (Task): task. Returns: StorageMergeReader: storage merge reader of the task storage. Raises: IOError: if the storage file cannot be opened or if the storage type is not supported or if the temporary path for the task storage does not exist or if the temporary path for the task storage doe not refers to a file. """ if self._storage_type != definitions.STORAGE_TYPE_SESSION: raise IOError('Unsupported storage type.') if not self._merge_task_storage_path: raise IOError('Missing merge task storage path.') merge_storage_file_path = self._GetMergeTaskStorageFilePath(task) if not os.path.isfile(merge_storage_file_path): raise IOError('Merge task storage path is not a file.')
return self._CreateTaskStorageMergeReader(merge_storage_file_path)
[docs] def StartTaskStorage(self): """Creates a temporary path for the task storage. Raises: IOError: if the storage type is not supported or if the temporary path for the task storage already exists. """ if self._storage_type != definitions.STORAGE_TYPE_SESSION: raise IOError('Unsupported storage type.') if self._task_storage_path: raise IOError('Task storage path already exists.') output_directory = os.path.dirname(self._output_file) self._task_storage_path = tempfile.mkdtemp(dir=output_directory) self._merge_task_storage_path = os.path.join( self._task_storage_path, 'merge') os.mkdir(self._merge_task_storage_path) self._processed_task_storage_path = os.path.join( self._task_storage_path, 'processed')
os.mkdir(self._processed_task_storage_path)
[docs] def StopTaskStorage(self, abort=False): """Removes the temporary path for the task storage. The results of tasks will be lost on abort. Args: abort (bool): True to indicate the stop is issued on abort. Raises: IOError: if the storage type is not supported. """ if self._storage_type != definitions.STORAGE_TYPE_SESSION: raise IOError('Unsupported storage type.') if os.path.isdir(self._merge_task_storage_path): if abort: shutil.rmtree(self._merge_task_storage_path) else: os.rmdir(self._merge_task_storage_path) if os.path.isdir(self._processed_task_storage_path): if abort: shutil.rmtree(self._processed_task_storage_path) else: os.rmdir(self._processed_task_storage_path) if os.path.isdir(self._task_storage_path): if abort: shutil.rmtree(self._task_storage_path) else: os.rmdir(self._task_storage_path) self._merge_task_storage_path = None self._processed_task_storage_path = None
self._task_storage_path = None
[docs] def WritePreprocessingInformation(self, knowledge_base): """Writes preprocessing information. Args: knowledge_base (KnowledgeBase): contains the preprocessing information. Raises: IOError: if the storage type does not support writing preprocessing information or when the storage writer is closed. """ self._RaiseIfNotWritable() if self._storage_type != definitions.STORAGE_TYPE_SESSION: raise IOError('Preprocessing information not supported by storage type.')
self._storage_file.WritePreprocessingInformation(knowledge_base)
[docs] def WriteSessionCompletion(self, aborted=False): """Writes session completion information. Args: aborted (Optional[bool]): True if the session was aborted. Raises: IOError: if the storage type is not supported or when the storage writer is closed. """ self._RaiseIfNotWritable() if self._storage_type != definitions.STORAGE_TYPE_SESSION: raise IOError('Unsupported storage type.') self._session.aborted = aborted session_completion = self._session.CreateSessionCompletion()
self._storage_file.WriteSessionCompletion(session_completion)
[docs] def WriteSessionStart(self): """Writes session start information. Raises: IOError: if the storage type is not supported or when the storage writer is closed. """ self._RaiseIfNotWritable() if self._storage_type != definitions.STORAGE_TYPE_SESSION: raise IOError('Unsupported storage type.') session_start = self._session.CreateSessionStart()
self._storage_file.WriteSessionStart(session_start)
[docs] def WriteTaskCompletion(self, aborted=False): """Writes task completion information. Args: aborted (Optional[bool]): True if the session was aborted. Raises: IOError: if the storage type is not supported or when the storage writer is closed. """ self._RaiseIfNotWritable() if self._storage_type != definitions.STORAGE_TYPE_TASK: raise IOError('Unsupported storage type.') self._task.aborted = aborted task_completion = self._task.CreateTaskCompletion()
self._storage_file.WriteTaskCompletion(task_completion)
[docs] def WriteTaskStart(self): """Writes task start information. Raises: IOError: if the storage type is not supported or when the storage writer is closed. """ self._RaiseIfNotWritable() if self._storage_type != definitions.STORAGE_TYPE_TASK: raise IOError('Unsupported storage type.') task_start = self._task.CreateTaskStart()
self._storage_file.WriteTaskStart(task_start)