# -*- coding: utf-8 -*-
"""The storage interface classes."""
from __future__ import unicode_literals
import abc
from plaso.lib import definitions
# pylint: disable=redundant-returns-doc,redundant-yields-doc
[docs]class BaseStore(object):
"""Storage interface.
Attributes:
format_version (int): storage format version.
serialization_format (str): serialization format.
storage_type (str): storage type.
"""
def __init__(self):
"""Initializes a store."""
super(BaseStore, self).__init__()
self.format_version = None
self.serialization_format = None
self.storage_type = None
self._serializers_profiler = None
self._storage_profiler = None
[docs] @abc.abstractmethod
def AddAnalysisReport(self, analysis_report):
"""Adds an analysis report.
Args:
analysis_report (AnalysisReport): analysis report.
"""
[docs] @abc.abstractmethod
def AddEvent(self, event):
"""Adds an event.
Args:
event (EventObject): event.
"""
[docs] @abc.abstractmethod
def AddEventSource(self, event_source):
"""Adds an event source.
Args:
event_source (EventSource): event source.
"""
[docs] @abc.abstractmethod
def AddEventTag(self, event_tag):
"""Adds an event tag.
Args:
event_tag (EventTag): event tag.
"""
[docs] @abc.abstractmethod
def AddWarning(self, warning):
"""Adds a warning.
Args:
warning (ExtractionWarning): warning.
"""
[docs] @abc.abstractmethod
def Close(self):
"""Closes the storage."""
[docs] @abc.abstractmethod
def GetAnalysisReports(self):
"""Retrieves the analysis reports.
Yields:
AnalysisReport: analysis report.
"""
[docs] @abc.abstractmethod
def GetEventData(self):
"""Retrieves the event data.
Yields:
EventData: event data.
"""
[docs] @abc.abstractmethod
def GetEventDataByIdentifier(self, identifier):
"""Retrieves specific event data.
Args:
identifier (AttributeContainerIdentifier): event data identifier.
Returns:
EventData: event data or None if not available.
"""
[docs] @abc.abstractmethod
def GetEvents(self):
"""Retrieves the events.
Yields:
EventObject: event.
"""
[docs] @abc.abstractmethod
def GetEventSources(self):
"""Retrieves the event sources.
Yields:
EventSource: event source.
"""
[docs] @abc.abstractmethod
def GetEventTagByIdentifier(self, identifier):
"""Retrieves a specific event tag.
Args:
identifier (AttributeContainerIdentifier): event tag identifier.
Returns:
EventTag: event tag or None if not available.
"""
[docs] @abc.abstractmethod
def GetNumberOfEventSources(self):
"""Retrieves the number event sources.
Returns:
int: number of event sources.
"""
[docs] @abc.abstractmethod
def GetSessions(self):
"""Retrieves the sessions.
Yields:
Session: session.
"""
[docs] @abc.abstractmethod
def GetSortedEvents(self, time_range=None):
"""Retrieves the events in increasing chronological order.
This includes all events written to the storage including those pending
being flushed (written) to the storage.
Args:
time_range (Optional[TimeRange]): time range used to filter events
that fall in a specific period.
Yields:
EventObject: event.
"""
[docs] @abc.abstractmethod
def GetWarnings(self):
"""Retrieves the warnings.
Yields:
ExtractionWarning: warning.
"""
[docs] @abc.abstractmethod
def HasAnalysisReports(self):
"""Determines if a store contains analysis reports.
Returns:
bool: True if the store contains analysis reports.
"""
[docs] @abc.abstractmethod
def HasWarnings(self):
"""Determines if a store contains extraction warnings.
Returns:
bool: True if the store contains extraction warnings.
"""
[docs] @abc.abstractmethod
def Open(self, **kwargs):
"""Opens the storage."""
[docs] def SetSerializersProfiler(self, serializers_profiler):
"""Sets the serializers profiler.
Args:
serializers_profiler (SerializersProfiler): serializers profiler.
"""
self._serializers_profiler = serializers_profiler
[docs] def SetStorageProfiler(self, storage_profiler):
"""Sets the storage profiler.
Args:
storage_profiler (StorageProfiler): storage profiler.
"""
self._storage_profiler = storage_profiler
[docs] @abc.abstractmethod
def WriteSessionCompletion(self, session_completion):
"""Writes session completion information.
Args:
session_completion (SessionCompletion): session completion information.
"""
[docs] @abc.abstractmethod
def WriteSessionStart(self, session_start):
"""Writes session start information.
Args:
session_start (SessionStart): session start information.
"""
[docs] @abc.abstractmethod
def WriteTaskCompletion(self, task_completion):
"""Writes task completion information.
Args:
task_completion (TaskCompletion): task completion information.
"""
[docs] @abc.abstractmethod
def WriteTaskStart(self, task_start):
"""Writes task start information.
Args:
task_start (TaskStart): task start information.
"""
[docs]class StorageMergeReader(object):
"""Storage reader interface for merging."""
def __init__(self, storage_writer):
"""Initializes a storage merge reader.
Args:
storage_writer (StorageWriter): storage writer.
"""
super(StorageMergeReader, self).__init__()
self._storage_writer = storage_writer
[docs] @abc.abstractmethod
def MergeAttributeContainers(
self, callback=None, maximum_number_of_containers=0):
"""Reads attribute containers from a task storage file into the writer.
Args:
callback (function[StorageWriter, AttributeContainer]): function to call
after each attribute container is deserialized.
maximum_number_of_containers (Optional[int]): maximum number of
containers to merge, where 0 represent no limit.
Returns:
bool: True if the entire task storage file has been merged.
"""
# pylint: disable=redundant-returns-doc,redundant-yields-doc
[docs]class StorageReader(object):
"""Storage reader interface."""
[docs] def __enter__(self):
"""Make usable with "with" statement."""
return self
# pylint: disable=unused-argument
[docs] def __exit__(self, exception_type, value, traceback):
"""Make usable with "with" statement."""
self.Close()
@abc.abstractproperty
def format_version(self):
"""int: format version"""
@abc.abstractproperty
def serialization_format(self):
"""str: serialization format."""
@abc.abstractproperty
def storage_type(self):
"""str: storage type."""
[docs] @abc.abstractmethod
def Close(self):
"""Closes the storage reader."""
[docs] @abc.abstractmethod
def GetAnalysisReports(self):
"""Retrieves the analysis reports.
Yields:
AnalysisReport: analysis report.
"""
[docs] @abc.abstractmethod
def GetWarnings(self):
"""Retrieves the warnings.
Yields:
ExtractionWarning: warning.
"""
[docs] @abc.abstractmethod
def GetEventData(self):
"""Retrieves the event data.
Yields:
EventData: event data.
"""
[docs] @abc.abstractmethod
def GetEventDataByIdentifier(self, identifier):
"""Retrieves specific event data.
Args:
identifier (AttributeContainerIdentifier): event data identifier.
Returns:
EventData: event data or None if not available.
"""
[docs] @abc.abstractmethod
def GetEvents(self):
"""Retrieves the events.
Yields:
EventObject: event.
"""
[docs] @abc.abstractmethod
def GetEventSources(self):
"""Retrieves event sources.
Yields:
EventSourceObject: event source.
"""
[docs] @abc.abstractmethod
def GetEventTagByIdentifier(self, identifier):
"""Retrieves a specific event tag.
Args:
identifier (AttributeContainerIdentifier): event tag identifier.
Returns:
EventTag: event tag or None if not available.
"""
[docs] @abc.abstractmethod
def GetNumberOfAnalysisReports(self):
"""Retrieves the number analysis reports.
Returns:
int: number of analysis reports.
"""
[docs] @abc.abstractmethod
def GetNumberOfEventSources(self):
"""Retrieves the number event sources.
Returns:
int: number of event sources.
"""
[docs] @abc.abstractmethod
def GetSessions(self):
"""Retrieves the sessions.
Yields:
Session: session.
"""
[docs] @abc.abstractmethod
def GetSortedEvents(self, time_range=None):
"""Retrieves the events in increasing chronological order.
This includes all events written to the storage including those pending
being flushed (written) to the storage.
Args:
time_range (Optional[TimeRange]): time range used to filter events
that fall in a specific period.
Yields:
EventObject: event.
"""
[docs] @abc.abstractmethod
def HasAnalysisReports(self):
"""Determines if a store contains analysis reports.
Returns:
bool: True if the store contains analysis reports.
"""
[docs] @abc.abstractmethod
def HasWarnings(self):
"""Determines if a store contains extraction warnings.
Returns:
bool: True if the store contains extraction warnings.
"""
[docs] @abc.abstractmethod
def SetSerializersProfiler(self, serializers_profiler):
"""Sets the serializers profiler.
Args:
serializers_profiler (SerializersProfiler): serializers profiler.
"""
[docs] @abc.abstractmethod
def SetStorageProfiler(self, storage_profiler):
"""Sets the storage profiler.
Args:
storage_profiler (StorageProfiler): storage profile.
"""
# pylint: disable=redundant-returns-doc,redundant-yields-doc
[docs]class StorageWriter(object):
"""Storage writer interface.
Attributes:
number_of_analysis_reports (int): number of analysis reports written.
number_of_event_sources (int): number of event sources written.
number_of_event_tags (int): number of event tags written.
number_of_events (int): number of events written.
number_of_warnings (int): number of warnings written.
"""
def __init__(
self, session, storage_type=definitions.STORAGE_TYPE_SESSION, task=None):
"""Initializes a storage writer.
Args:
session (Session): session the storage changes are part of.
storage_type (Optional[str]): storage type.
task(Optional[Task]): task.
"""
super(StorageWriter, self).__init__()
self._first_written_event_source_index = 0
self._serializers_profiler = None
self._session = session
self._storage_profiler = None
self._storage_type = storage_type
self._task = task
self._written_event_source_index = 0
self.number_of_analysis_reports = 0
self.number_of_event_sources = 0
self.number_of_event_tags = 0
self.number_of_events = 0
self.number_of_warnings = 0
[docs] @abc.abstractmethod
def AddAnalysisReport(self, analysis_report):
"""Adds an analysis report.
Args:
analysis_report (AnalysisReport): a report.
"""
[docs] @abc.abstractmethod
def AddEvent(self, event):
"""Adds an event.
Args:
event(EventObject): an event.
"""
[docs] @abc.abstractmethod
def AddEventSource(self, event_source):
"""Adds an event source.
Args:
event_source (EventSource): an event source.
"""
[docs] @abc.abstractmethod
def AddEventTag(self, event_tag):
"""Adds an event tag.
Args:
event_tag (EventTag): an event tag.
"""
[docs] @abc.abstractmethod
def AddWarning(self, warning):
"""Adds an warning.
Args:
warning (ExtractionWarning): a warning.
"""
[docs] @abc.abstractmethod
def Close(self):
"""Closes the storage writer."""
# pylint: disable=unused-argument
[docs] def CreateTaskStorage(self, task):
"""Creates a task storage.
Args:
task (Task): task.
Returns:
StorageWriter: storage writer.
Raises:
NotImplementedError: since there is no implementation.
"""
raise NotImplementedError()
[docs] @abc.abstractmethod
def GetEventDataByIdentifier(self, identifier):
"""Retrieves specific event data.
Args:
identifier (AttributeContainerIdentifier): event data identifier.
Returns:
EventData: event data or None if not available.
"""
[docs] @abc.abstractmethod
def GetEvents(self):
"""Retrieves the events.
Yields:
EventObject: event.
"""
[docs] @abc.abstractmethod
def GetFirstWrittenEventSource(self):
"""Retrieves the first event source that was written after open.
Using GetFirstWrittenEventSource and GetNextWrittenEventSource newly
added event sources can be retrieved in order of addition.
Returns:
EventSource: event source or None if there are no newly written ones.
"""
[docs] @abc.abstractmethod
def GetNextWrittenEventSource(self):
"""Retrieves the next event source that was written after open.
Returns:
EventSource: event source or None if there are no newly written ones.
"""
[docs] @abc.abstractmethod
def GetSortedEvents(self, time_range=None):
"""Retrieves the events in increasing chronological order.
This includes all events written to the storage including those pending
being flushed (written) to the storage.
Args:
time_range (Optional[TimeRange]): time range used to filter events
that fall in a specific period.
Yields:
EventObject: event.
"""
# pylint: disable=unused-argument
[docs] def FinalizeTaskStorage(self, task):
"""Finalizes a processed task storage.
Args:
task (Task): task.
Raises:
NotImplementedError: since there is no implementation.
"""
raise NotImplementedError()
[docs] @abc.abstractmethod
def Open(self):
"""Opens the storage writer."""
# pylint: disable=unused-argument
[docs] def PrepareMergeTaskStorage(self, task):
"""Prepares a task storage for merging.
Args:
task (Task): task.
Raises:
NotImplementedError: since there is no implementation.
"""
raise NotImplementedError()
# pylint: disable=unused-argument
[docs] def RemoveProcessedTaskStorage(self, task):
"""Removes a processed task storage.
Args:
task (Task): task.
Raises:
NotImplementedError: since there is no implementation.
"""
raise NotImplementedError()
[docs] @abc.abstractmethod
def SetSerializersProfiler(self, serializers_profiler):
"""Sets the serializers profiler.
Args:
serializers_profiler (SerializersProfiler): serializers profiler.
"""
[docs] @abc.abstractmethod
def SetStorageProfiler(self, storage_profiler):
"""Sets the storage profiler.
Args:
storage_profiler (StorageProfiler): storage profiler.
"""
[docs] @abc.abstractmethod
def WriteSessionCompletion(self, aborted=False):
"""Writes session completion information.
Args:
aborted (Optional[bool]): True if the session was aborted.
"""
[docs] @abc.abstractmethod
def WriteSessionStart(self):
"""Writes session start information."""
[docs] @abc.abstractmethod
def WriteTaskCompletion(self, aborted=False):
"""Writes task completion information.
Args:
aborted (Optional[bool]): True if the session was aborted.
"""
[docs] @abc.abstractmethod
def WriteTaskStart(self):
"""Writes task start information."""