Source code for plaso.storage.interface

# -*- coding: utf-8 -*-
"""The storage interface classes."""

from __future__ import unicode_literals

import abc
from plaso.lib import definitions


# pylint: disable=redundant-returns-doc,redundant-yields-doc
[docs]class BaseStore(object): """Storage interface. Attributes: format_version (int): storage format version. serialization_format (str): serialization format. storage_type (str): storage type. """ def __init__(self): """Initializes a store.""" super(BaseStore, self).__init__() self.format_version = None self.serialization_format = None self.storage_type = None self._serializers_profiler = None self._storage_profiler = None
[docs] @abc.abstractmethod def AddAnalysisReport(self, analysis_report): """Adds an analysis report. Args: analysis_report (AnalysisReport): analysis report. """
[docs] @abc.abstractmethod def AddEvent(self, event): """Adds an event. Args: event (EventObject): event. """
[docs] @abc.abstractmethod def AddEventSource(self, event_source): """Adds an event source. Args: event_source (EventSource): event source. """
[docs] @abc.abstractmethod def AddEventTag(self, event_tag): """Adds an event tag. Args: event_tag (EventTag): event tag. """
[docs] @abc.abstractmethod def AddWarning(self, warning): """Adds a warning. Args: warning (ExtractionWarning): warning. """
[docs] @abc.abstractmethod def Close(self): """Closes the storage."""
[docs] @abc.abstractmethod def GetAnalysisReports(self): """Retrieves the analysis reports. Yields: AnalysisReport: analysis report. """
[docs] @abc.abstractmethod def GetEventData(self): """Retrieves the event data. Yields: EventData: event data. """
[docs] @abc.abstractmethod def GetEventDataByIdentifier(self, identifier): """Retrieves specific event data. Args: identifier (AttributeContainerIdentifier): event data identifier. Returns: EventData: event data or None if not available. """
[docs] @abc.abstractmethod def GetEvents(self): """Retrieves the events. Yields: EventObject: event. """
[docs] @abc.abstractmethod def GetEventSources(self): """Retrieves the event sources. Yields: EventSource: event source. """
[docs] @abc.abstractmethod def GetEventTagByIdentifier(self, identifier): """Retrieves a specific event tag. Args: identifier (AttributeContainerIdentifier): event tag identifier. Returns: EventTag: event tag or None if not available. """
[docs] @abc.abstractmethod def GetEventTags(self): """Retrieves the event tags. Yields: EventTag: event tag. """
[docs] @abc.abstractmethod def GetNumberOfEventSources(self): """Retrieves the number event sources. Returns: int: number of event sources. """
[docs] @abc.abstractmethod def GetSessions(self): """Retrieves the sessions. Yields: Session: session. """
[docs] @abc.abstractmethod def GetSortedEvents(self, time_range=None): """Retrieves the events in increasing chronological order. This includes all events written to the storage including those pending being flushed (written) to the storage. Args: time_range (Optional[TimeRange]): time range used to filter events that fall in a specific period. Yields: EventObject: event. """
[docs] @abc.abstractmethod def GetWarnings(self): """Retrieves the warnings. Yields: ExtractionWarning: warning. """
[docs] @abc.abstractmethod def HasAnalysisReports(self): """Determines if a store contains analysis reports. Returns: bool: True if the store contains analysis reports. """
[docs] @abc.abstractmethod def HasWarnings(self): """Determines if a store contains extraction warnings. Returns: bool: True if the store contains extraction warnings. """
[docs] @abc.abstractmethod def HasEventTags(self): """Determines if a store contains event tags. Returns: bool: True if the store contains event tags. """
[docs] @abc.abstractmethod def Open(self, **kwargs): """Opens the storage."""
[docs] @abc.abstractmethod def ReadPreprocessingInformation(self, knowledge_base): """Reads preprocessing information. The preprocessing information contains the system configuration which contains information about various system specific configuration data, for example the user accounts. Args: knowledge_base (KnowledgeBase): is used to store the preprocessing information. """
[docs] def SetSerializersProfiler(self, serializers_profiler): """Sets the serializers profiler. Args: serializers_profiler (SerializersProfiler): serializers profiler. """ self._serializers_profiler = serializers_profiler
[docs] def SetStorageProfiler(self, storage_profiler): """Sets the storage profiler. Args: storage_profiler (StorageProfiler): storage profiler. """ self._storage_profiler = storage_profiler
[docs] @abc.abstractmethod def WritePreprocessingInformation(self, knowledge_base): """Writes preprocessing information. Args: knowledge_base (KnowledgeBase): contains the preprocessing information. """
[docs] @abc.abstractmethod def WriteSessionCompletion(self, session_completion): """Writes session completion information. Args: session_completion (SessionCompletion): session completion information. """
[docs] @abc.abstractmethod def WriteSessionStart(self, session_start): """Writes session start information. Args: session_start (SessionStart): session start information. """
[docs] @abc.abstractmethod def WriteTaskCompletion(self, task_completion): """Writes task completion information. Args: task_completion (TaskCompletion): task completion information. """
[docs] @abc.abstractmethod def WriteTaskStart(self, task_start): """Writes task start information. Args: task_start (TaskStart): task start information. """
[docs]class StorageMergeReader(object): """Storage reader interface for merging.""" def __init__(self, storage_writer): """Initializes a storage merge reader. Args: storage_writer (StorageWriter): storage writer. """ super(StorageMergeReader, self).__init__() self._storage_writer = storage_writer
[docs] @abc.abstractmethod def MergeAttributeContainers( self, callback=None, maximum_number_of_containers=0): """Reads attribute containers from a task storage file into the writer. Args: callback (function[StorageWriter, AttributeContainer]): function to call after each attribute container is deserialized. maximum_number_of_containers (Optional[int]): maximum number of containers to merge, where 0 represent no limit. Returns: bool: True if the entire task storage file has been merged. """
# pylint: disable=redundant-returns-doc,redundant-yields-doc
[docs]class StorageReader(object): """Storage reader interface."""
[docs] def __enter__(self): """Make usable with "with" statement.""" return self
# pylint: disable=unused-argument
[docs] def __exit__(self, exception_type, value, traceback): """Make usable with "with" statement.""" self.Close()
@abc.abstractproperty def format_version(self): """int: format version""" @abc.abstractproperty def serialization_format(self): """str: serialization format.""" @abc.abstractproperty def storage_type(self): """str: storage type."""
[docs] @abc.abstractmethod def Close(self): """Closes the storage reader."""
[docs] @abc.abstractmethod def GetAnalysisReports(self): """Retrieves the analysis reports. Yields: AnalysisReport: analysis report. """
[docs] @abc.abstractmethod def GetWarnings(self): """Retrieves the warnings. Yields: ExtractionWarning: warning. """
[docs] @abc.abstractmethod def GetEventData(self): """Retrieves the event data. Yields: EventData: event data. """
[docs] @abc.abstractmethod def GetEventDataByIdentifier(self, identifier): """Retrieves specific event data. Args: identifier (AttributeContainerIdentifier): event data identifier. Returns: EventData: event data or None if not available. """
[docs] @abc.abstractmethod def GetEvents(self): """Retrieves the events. Yields: EventObject: event. """
[docs] @abc.abstractmethod def GetEventSources(self): """Retrieves event sources. Yields: EventSourceObject: event source. """
[docs] @abc.abstractmethod def GetEventTagByIdentifier(self, identifier): """Retrieves a specific event tag. Args: identifier (AttributeContainerIdentifier): event tag identifier. Returns: EventTag: event tag or None if not available. """
[docs] @abc.abstractmethod def GetEventTags(self): """Retrieves the event tags. Yields: EventTag: event tag. """
[docs] @abc.abstractmethod def GetNumberOfAnalysisReports(self): """Retrieves the number analysis reports. Returns: int: number of analysis reports. """
[docs] @abc.abstractmethod def GetNumberOfEventSources(self): """Retrieves the number event sources. Returns: int: number of event sources. """
[docs] @abc.abstractmethod def GetSessions(self): """Retrieves the sessions. Yields: Session: session. """
[docs] @abc.abstractmethod def GetSortedEvents(self, time_range=None): """Retrieves the events in increasing chronological order. This includes all events written to the storage including those pending being flushed (written) to the storage. Args: time_range (Optional[TimeRange]): time range used to filter events that fall in a specific period. Yields: EventObject: event. """
[docs] @abc.abstractmethod def HasAnalysisReports(self): """Determines if a store contains analysis reports. Returns: bool: True if the store contains analysis reports. """
[docs] @abc.abstractmethod def HasEventTags(self): """Determines if a store contains event tags. Returns: bool: True if the store contains event tags. """
[docs] @abc.abstractmethod def HasWarnings(self): """Determines if a store contains extraction warnings. Returns: bool: True if the store contains extraction warnings. """
[docs] @abc.abstractmethod def ReadPreprocessingInformation(self, knowledge_base): """Reads preprocessing information. The preprocessing information contains the system configuration which contains information about various system specific configuration data, for example the user accounts. Args: knowledge_base (KnowledgeBase): is used to store the preprocessing information. """
[docs] @abc.abstractmethod def SetSerializersProfiler(self, serializers_profiler): """Sets the serializers profiler. Args: serializers_profiler (SerializersProfiler): serializers profiler. """
[docs] @abc.abstractmethod def SetStorageProfiler(self, storage_profiler): """Sets the storage profiler. Args: storage_profiler (StorageProfiler): storage profile. """
# pylint: disable=redundant-returns-doc,redundant-yields-doc
[docs]class StorageWriter(object): """Storage writer interface. Attributes: number_of_analysis_reports (int): number of analysis reports written. number_of_event_sources (int): number of event sources written. number_of_event_tags (int): number of event tags written. number_of_events (int): number of events written. number_of_warnings (int): number of warnings written. """ def __init__( self, session, storage_type=definitions.STORAGE_TYPE_SESSION, task=None): """Initializes a storage writer. Args: session (Session): session the storage changes are part of. storage_type (Optional[str]): storage type. task(Optional[Task]): task. """ super(StorageWriter, self).__init__() self._first_written_event_source_index = 0 self._serializers_profiler = None self._session = session self._storage_profiler = None self._storage_type = storage_type self._task = task self._written_event_source_index = 0 self.number_of_analysis_reports = 0 self.number_of_event_sources = 0 self.number_of_event_tags = 0 self.number_of_events = 0 self.number_of_warnings = 0
[docs] @abc.abstractmethod def AddAnalysisReport(self, analysis_report): """Adds an analysis report. Args: analysis_report (AnalysisReport): a report. """
[docs] @abc.abstractmethod def AddEvent(self, event): """Adds an event. Args: event(EventObject): an event. """
[docs] @abc.abstractmethod def AddEventSource(self, event_source): """Adds an event source. Args: event_source (EventSource): an event source. """
[docs] @abc.abstractmethod def AddEventTag(self, event_tag): """Adds an event tag. Args: event_tag (EventTag): an event tag. """
[docs] @abc.abstractmethod def AddWarning(self, warning): """Adds an warning. Args: warning (ExtractionWarning): a warning. """
[docs] @abc.abstractmethod def Close(self): """Closes the storage writer."""
# pylint: disable=unused-argument
[docs] def CreateTaskStorage(self, task): """Creates a task storage. Args: task (Task): task. Returns: StorageWriter: storage writer. Raises: NotImplementedError: since there is no implementation. """ raise NotImplementedError()
[docs] @abc.abstractmethod def GetEventDataByIdentifier(self, identifier): """Retrieves specific event data. Args: identifier (AttributeContainerIdentifier): event data identifier. Returns: EventData: event data or None if not available. """
[docs] @abc.abstractmethod def GetEvents(self): """Retrieves the events. Yields: EventObject: event. """
[docs] @abc.abstractmethod def GetFirstWrittenEventSource(self): """Retrieves the first event source that was written after open. Using GetFirstWrittenEventSource and GetNextWrittenEventSource newly added event sources can be retrieved in order of addition. Returns: EventSource: event source or None if there are no newly written ones. """
[docs] @abc.abstractmethod def GetNextWrittenEventSource(self): """Retrieves the next event source that was written after open. Returns: EventSource: event source or None if there are no newly written ones. """
[docs] @abc.abstractmethod def GetSortedEvents(self, time_range=None): """Retrieves the events in increasing chronological order. This includes all events written to the storage including those pending being flushed (written) to the storage. Args: time_range (Optional[TimeRange]): time range used to filter events that fall in a specific period. Yields: EventObject: event. """
# pylint: disable=unused-argument
[docs] def FinalizeTaskStorage(self, task): """Finalizes a processed task storage. Args: task (Task): task. Raises: NotImplementedError: since there is no implementation. """ raise NotImplementedError()
[docs] @abc.abstractmethod def Open(self): """Opens the storage writer."""
# pylint: disable=unused-argument
[docs] def PrepareMergeTaskStorage(self, task): """Prepares a task storage for merging. Args: task (Task): task. Raises: NotImplementedError: since there is no implementation. """ raise NotImplementedError()
[docs] @abc.abstractmethod def ReadPreprocessingInformation(self, knowledge_base): """Reads preprocessing information. The preprocessing information contains the system configuration which contains information about various system specific configuration data, for example the user accounts. Args: knowledge_base (KnowledgeBase): is used to store the preprocessing information. """
# pylint: disable=unused-argument
[docs] def RemoveProcessedTaskStorage(self, task): """Removes a processed task storage. Args: task (Task): task. Raises: NotImplementedError: since there is no implementation. """ raise NotImplementedError()
[docs] @abc.abstractmethod def SetSerializersProfiler(self, serializers_profiler): """Sets the serializers profiler. Args: serializers_profiler (SerializersProfiler): serializers profiler. """
[docs] @abc.abstractmethod def SetStorageProfiler(self, storage_profiler): """Sets the storage profiler. Args: storage_profiler (StorageProfiler): storage profiler. """
[docs] @abc.abstractmethod def WritePreprocessingInformation(self, knowledge_base): """Writes preprocessing information. Args: knowledge_base (KnowledgeBase): contains the preprocessing information. """
[docs] @abc.abstractmethod def WriteSessionCompletion(self, aborted=False): """Writes session completion information. Args: aborted (Optional[bool]): True if the session was aborted. """
[docs] @abc.abstractmethod def WriteSessionStart(self): """Writes session start information."""
[docs] @abc.abstractmethod def WriteTaskCompletion(self, aborted=False): """Writes task completion information. Args: aborted (Optional[bool]): True if the session was aborted. """
[docs] @abc.abstractmethod def WriteTaskStart(self): """Writes task start information."""