Source code for plaso.cli.image_export_tool
# -*- coding: utf-8 -*-
"""The image export CLI tool."""
from __future__ import unicode_literals
import argparse
import codecs
import io
import os
import textwrap
from dfvfs.helpers import file_system_searcher
from dfvfs.lib import errors as dfvfs_errors
from dfvfs.path import factory as path_spec_factory
from dfvfs.resolver import context
from dfvfs.resolver import resolver as path_spec_resolver
from plaso.analyzers.hashers import manager as hashers_manager
from plaso.cli import logger
from plaso.cli import storage_media_tool
from plaso.cli.helpers import manager as helpers_manager
from plaso.engine import engine
from plaso.engine import extractors
from plaso.engine import knowledge_base
from plaso.engine import path_helper
from plaso.filters import file_entry as file_entry_filters
from plaso.lib import errors
from plaso.lib import loggers
from plaso.lib import specification
from plaso.preprocessors import manager as preprocess_manager
[docs]class ImageExportTool(storage_media_tool.StorageMediaTool):
"""Class that implements the image export CLI tool.
Attributes:
has_filters (bool): True if filters have been specified via the options.
list_signature_identifiers (bool): True if information about the signature
identifiers should be shown.
"""
NAME = 'image_export'
DESCRIPTION = (
'This is a simple collector designed to export files inside an '
'image, both within a regular RAW image as well as inside a VSS. '
'The tool uses a collection filter that uses the same syntax as a '
'targeted plaso filter.')
EPILOG = 'And that is how you export files, plaso style.'
_DIRTY_CHARACTERS = frozenset([
'\x00', '\x01', '\x02', '\x03', '\x04', '\x05', '\x06', '\x07',
'\x08', '\x09', '\x0a', '\x0b', '\x0c', '\x0d', '\x0e', '\x0f',
'\x10', '\x11', '\x12', '\x13', '\x14', '\x15', '\x16', '\x17',
'\x18', '\x19', '\x1a', '\x1b', '\x1c', '\x1d', '\x1e', '\x1f',
os.path.sep, '!', '$', '%', '&', '*', '+', ':', ';', '<', '>',
'?', '@', '|', '~', '\x7f'])
_COPY_BUFFER_SIZE = 32768
_READ_BUFFER_SIZE = 4096
# TODO: remove this redirect.
_SOURCE_OPTION = 'image'
_SPECIFICATION_FILE_ENCODING = 'utf-8'
def __init__(self, input_reader=None, output_writer=None):
"""Initializes the CLI tool object.
Args:
input_reader (Optional[InputReader]): input reader, where None indicates
that the stdin input reader should be used.
output_writer (Optional[OutputWriter]): output writer, where None
indicates that the stdout output writer should be used.
"""
super(ImageExportTool, self).__init__(
input_reader=input_reader, output_writer=output_writer)
self._abort = False
self._artifact_definitions_path = None
self._artifact_filters = None
self._artifacts_registry = None
self._custom_artifacts_path = None
self._destination_path = None
self._digests = {}
self._filter_collection = file_entry_filters.FileEntryFilterCollection()
self._filter_file = None
self._knowledge_base = knowledge_base.KnowledgeBase()
self._path_spec_extractor = extractors.PathSpecExtractor()
self._process_memory_limit = None
self._resolver_context = context.Context()
self._skip_duplicates = True
self.has_filters = False
self.list_signature_identifiers = False
def _CalculateDigestHash(self, file_entry, data_stream_name):
"""Calculates a SHA-256 digest of the contents of the file entry.
Args:
file_entry (dfvfs.FileEntry): file entry whose content will be hashed.
data_stream_name (str): name of the data stream whose content is to be
hashed.
Returns:
str: hexadecimal representation of the SHA-256 hash or None if the digest
cannot be determined.
"""
file_object = file_entry.GetFileObject(data_stream_name=data_stream_name)
if not file_object:
return None
try:
file_object.seek(0, os.SEEK_SET)
hasher_object = hashers_manager.HashersManager.GetHasher('sha256')
data = file_object.read(self._READ_BUFFER_SIZE)
while data:
hasher_object.Update(data)
data = file_object.read(self._READ_BUFFER_SIZE)
finally:
file_object.close()
return hasher_object.GetStringDigest()
def _CreateSanitizedDestination(
self, source_file_entry, source_path_spec, destination_path):
"""Creates a sanitized path of both destination directory and filename.
This function replaces non-printable and other characters defined in
_DIRTY_CHARACTERS with an underscore "_".
Args:
source_file_entry (dfvfs.FileEntry): file entry of the source file.
source_path_spec (dfvfs.PathSpec): path specification of the source file.
destination_path (str): path of the destination directory.
Returns:
tuple[str, str]: sanitized paths of both destination directory and
filename.
"""
file_system = source_file_entry.GetFileSystem()
path = getattr(source_path_spec, 'location', None)
path_segments = file_system.SplitPath(path)
# Sanitize each path segment.
for index, path_segment in enumerate(path_segments):
path_segments[index] = ''.join([
character if character not in self._DIRTY_CHARACTERS else '_'
for character in path_segment])
return (
os.path.join(destination_path, *path_segments[:-1]), path_segments[-1])
# TODO: merge with collector and/or engine.
def _Extract(
self, source_path_specs, destination_path, output_writer,
skip_duplicates=True):
"""Extracts files.
Args:
source_path_specs (list[dfvfs.PathSpec]): path specifications to extract.
destination_path (str): path where the extracted files should be stored.
output_writer (CLIOutputWriter): output writer.
skip_duplicates (Optional[bool]): True if files with duplicate content
should be skipped.
"""
output_writer.Write('Extracting file entries.\n')
path_spec_generator = self._path_spec_extractor.ExtractPathSpecs(
source_path_specs, resolver_context=self._resolver_context)
for path_spec in path_spec_generator:
self._ExtractFileEntry(
path_spec, destination_path, output_writer,
skip_duplicates=skip_duplicates)
def _ExtractDataStream(
self, file_entry, data_stream_name, destination_path, output_writer,
skip_duplicates=True):
"""Extracts a data stream.
Args:
file_entry (dfvfs.FileEntry): file entry containing the data stream.
data_stream_name (str): name of the data stream.
destination_path (str): path where the extracted files should be stored.
output_writer (CLIOutputWriter): output writer.
skip_duplicates (Optional[bool]): True if files with duplicate content
should be skipped.
"""
if not data_stream_name and not file_entry.IsFile():
return
display_name = path_helper.PathHelper.GetDisplayNameForPathSpec(
file_entry.path_spec)
if skip_duplicates:
try:
digest = self._CalculateDigestHash(file_entry, data_stream_name)
except (IOError, dfvfs_errors.BackEndError) as exception:
output_writer.Write((
'[skipping] unable to read content of file entry: {0:s} '
'with error: {1!s}\n').format(display_name, exception))
return
if not digest:
output_writer.Write(
'[skipping] unable to read content of file entry: {0:s}\n'.format(
display_name))
return
duplicate_display_name = self._digests.get(digest, None)
if duplicate_display_name:
output_writer.Write((
'[skipping] file entry: {0:s} is a duplicate of: {1:s} with '
'digest: {2:s}\n').format(
display_name, duplicate_display_name, digest))
return
self._digests[digest] = display_name
target_directory, target_filename = self._CreateSanitizedDestination(
file_entry, file_entry.path_spec, destination_path)
parent_path_spec = getattr(file_entry.path_spec, 'parent', None)
if parent_path_spec:
vss_store_number = getattr(parent_path_spec, 'store_index', None)
if vss_store_number is not None:
target_filename = 'vss{0:d}_{1:s}'.format(
vss_store_number + 1, target_filename)
if data_stream_name:
target_filename = '{0:s}_{1:s}'.format(target_filename, data_stream_name)
if not target_directory:
target_directory = destination_path
elif not os.path.isdir(target_directory):
os.makedirs(target_directory)
target_path = os.path.join(target_directory, target_filename)
if os.path.exists(target_path):
output_writer.Write((
'[skipping] unable to export contents of file entry: {0:s} '
'because exported file: {1:s} already exists.\n').format(
display_name, target_path))
return
try:
self._WriteFileEntry(file_entry, data_stream_name, target_path)
except (IOError, dfvfs_errors.BackEndError) as exception:
output_writer.Write((
'[skipping] unable to export contents of file entry: {0:s} '
'with error: {1!s}\n').format(display_name, exception))
try:
os.remove(target_path)
except (IOError, OSError):
pass
def _ExtractFileEntry(
self, path_spec, destination_path, output_writer, skip_duplicates=True):
"""Extracts a file entry.
Args:
path_spec (dfvfs.PathSpec): path specification of the source file.
destination_path (str): path where the extracted files should be stored.
output_writer (CLIOutputWriter): output writer.
skip_duplicates (Optional[bool]): True if files with duplicate content
should be skipped.
"""
file_entry = path_spec_resolver.Resolver.OpenFileEntry(path_spec)
if not self._filter_collection.Matches(file_entry):
return
file_entry_processed = False
for data_stream in file_entry.data_streams:
if self._abort:
break
self._ExtractDataStream(
file_entry, data_stream.name, destination_path, output_writer,
skip_duplicates=skip_duplicates)
file_entry_processed = True
if not file_entry_processed:
self._ExtractDataStream(
file_entry, '', destination_path, output_writer,
skip_duplicates=skip_duplicates)
# TODO: merge with collector and/or engine.
def _ExtractWithFilter(
self, source_path_specs, destination_path, output_writer,
artifact_filters, filter_file, artifact_definitions_path,
custom_artifacts_path, skip_duplicates=True):
"""Extracts files using a filter expression.
This method runs the file extraction process on the image and
potentially on every VSS if that is wanted.
Args:
source_path_specs (list[dfvfs.PathSpec]): path specifications to extract.
destination_path (str): path where the extracted files should be stored.
output_writer (CLIOutputWriter): output writer.
artifact_definitions_path (str): path to artifact definitions file.
custom_artifacts_path (str): path to custom artifact definitions file.
artifact_filters (list[str]): names of artifact definitions that are
used for filtering file system and Windows Registry key paths.
filter_file (str): path of the file that contains the filter file path
filters.
skip_duplicates (Optional[bool]): True if files with duplicate content
should be skipped.
"""
for source_path_spec in source_path_specs:
file_system, mount_point = self._GetSourceFileSystem(
source_path_spec, resolver_context=self._resolver_context)
if self._knowledge_base is None:
self._Preprocess(file_system, mount_point)
display_name = path_helper.PathHelper.GetDisplayNameForPathSpec(
source_path_spec)
output_writer.Write(
'Extracting file entries from: {0:s}\n'.format(display_name))
filter_find_specs = engine.BaseEngine.BuildFilterFindSpecs(
artifact_definitions_path, custom_artifacts_path,
self._knowledge_base, artifact_filters, filter_file)
searcher = file_system_searcher.FileSystemSearcher(
file_system, mount_point)
for path_spec in searcher.Find(find_specs=filter_find_specs):
self._ExtractFileEntry(
path_spec, destination_path, output_writer,
skip_duplicates=skip_duplicates)
file_system.Close()
# TODO: refactor, this is a duplicate of the function in engine.
def _GetSourceFileSystem(self, source_path_spec, resolver_context=None):
"""Retrieves the file system of the source.
Args:
source_path_spec (dfvfs.PathSpec): source path specification of the file
system.
resolver_context (dfvfs.Context): resolver context.
Returns:
tuple: containing:
dfvfs.FileSystem: file system.
dfvfs.PathSpec: mount point path specification that refers
to the base location of the file system.
Raises:
RuntimeError: if source path specification is not set.
"""
if not source_path_spec:
raise RuntimeError('Missing source.')
file_system = path_spec_resolver.Resolver.OpenFileSystem(
source_path_spec, resolver_context=resolver_context)
type_indicator = source_path_spec.type_indicator
if path_spec_factory.Factory.IsSystemLevelTypeIndicator(type_indicator):
mount_point = source_path_spec
else:
mount_point = source_path_spec.parent
return file_system, mount_point
def _ParseExtensionsString(self, extensions_string):
"""Parses the extensions string.
Args:
extensions_string (str): comma separated extensions to filter.
"""
if not extensions_string:
return
extensions_string = extensions_string.lower()
extensions = [
extension.strip() for extension in extensions_string.split(',')]
file_entry_filter = file_entry_filters.ExtensionsFileEntryFilter(extensions)
self._filter_collection.AddFilter(file_entry_filter)
def _ParseNamesString(self, names_string):
"""Parses the name string.
Args:
names_string (str): comma separated filenames to filter.
"""
if not names_string:
return
names_string = names_string.lower()
names = [name.strip() for name in names_string.split(',')]
file_entry_filter = file_entry_filters.NamesFileEntryFilter(names)
self._filter_collection.AddFilter(file_entry_filter)
def _ParseFilterOptions(self, options):
"""Parses the filter options.
Args:
options (argparse.Namespace): command line arguments.
Raises:
BadConfigOption: if the options are invalid.
"""
names = ['artifact_filters', 'date_filters', 'filter_file']
helpers_manager.ArgumentHelperManager.ParseOptions(
options, self, names=names)
extensions_string = self.ParseStringOption(options, 'extensions_string')
self._ParseExtensionsString(extensions_string)
names_string = getattr(options, 'names_string', None)
self._ParseNamesString(names_string)
signature_identifiers = getattr(options, 'signature_identifiers', None)
try:
self._ParseSignatureIdentifiers(
self._data_location, signature_identifiers)
except (IOError, ValueError) as exception:
raise errors.BadConfigOption(exception)
if self._artifact_filters or self._filter_file:
self.has_filters = True
else:
self.has_filters = self._filter_collection.HasFilters()
def _ParseSignatureIdentifiers(self, data_location, signature_identifiers):
"""Parses the signature identifiers.
Args:
data_location (str): location of the format specification file, for
example, "signatures.conf".
signature_identifiers (str): comma separated signature identifiers.
Raises:
IOError: if the format specification file could not be read from
the specified data location.
ValueError: if no data location was specified.
"""
if not signature_identifiers:
return
if not data_location:
raise ValueError('Missing data location.')
path = os.path.join(data_location, 'signatures.conf')
if not os.path.exists(path):
raise IOError(
'No such format specification file: {0:s}'.format(path))
try:
specification_store = self._ReadSpecificationFile(path)
except IOError as exception:
raise IOError((
'Unable to read format specification file: {0:s} with error: '
'{1!s}').format(path, exception))
signature_identifiers = signature_identifiers.lower()
signature_identifiers = [
identifier.strip() for identifier in signature_identifiers.split(',')]
file_entry_filter = file_entry_filters.SignaturesFileEntryFilter(
specification_store, signature_identifiers)
self._filter_collection.AddFilter(file_entry_filter)
def _Preprocess(self, file_system, mount_point):
"""Preprocesses the image.
Args:
file_system (dfvfs.FileSystem): file system to be preprocessed.
mount_point (dfvfs.PathSpec): mount point path specification that refers
to the base location of the file system.
"""
logger.debug('Starting preprocessing.')
try:
preprocess_manager.PreprocessPluginsManager.RunPlugins(
self._artifacts_registry, file_system, mount_point,
self._knowledge_base)
except IOError as exception:
logger.error('Unable to preprocess with error: {0!s}'.format(exception))
logger.debug('Preprocessing done.')
def _ReadSpecificationFile(self, path):
"""Reads the format specification file.
Args:
path (str): path of the format specification file.
Returns:
FormatSpecificationStore: format specification store.
"""
specification_store = specification.FormatSpecificationStore()
with io.open(
path, 'rt', encoding=self._SPECIFICATION_FILE_ENCODING) as file_object:
for line in file_object.readlines():
line = line.strip()
if not line or line.startswith('#'):
continue
try:
identifier, offset, pattern = line.split()
except ValueError:
logger.error('[skipping] invalid line: {0:s}'.format(line))
continue
try:
offset = int(offset, 10)
except ValueError:
logger.error('[skipping] invalid offset in line: {0:s}'.format(line))
continue
try:
# TODO: find another way to do this that doesn't use an undocumented
# API.
pattern = codecs.escape_decode(pattern)[0]
# ValueError is raised e.g. when the patterns contains "\xg1".
except ValueError:
logger.error(
'[skipping] invalid pattern in line: {0:s}'.format(line))
continue
format_specification = specification.FormatSpecification(identifier)
format_specification.AddNewSignature(pattern, offset=offset)
specification_store.AddSpecification(format_specification)
return specification_store
def _WriteFileEntry(self, file_entry, data_stream_name, destination_file):
"""Writes the contents of the source file entry to a destination file.
Note that this function will overwrite an existing file.
Args:
file_entry (dfvfs.FileEntry): file entry whose content is to be written.
data_stream_name (str): name of the data stream whose content is to be
written.
destination_file (str): path of the destination file.
"""
source_file_object = file_entry.GetFileObject(
data_stream_name=data_stream_name)
if not source_file_object:
return
try:
with open(destination_file, 'wb') as destination_file_object:
source_file_object.seek(0, os.SEEK_SET)
data = source_file_object.read(self._COPY_BUFFER_SIZE)
while data:
destination_file_object.write(data)
data = source_file_object.read(self._COPY_BUFFER_SIZE)
finally:
source_file_object.close()
[docs] def AddFilterOptions(self, argument_group):
"""Adds the filter options to the argument group.
Args:
argument_group (argparse._ArgumentGroup): argparse argument group.
"""
names = ['artifact_filters', 'date_filters', 'filter_file']
helpers_manager.ArgumentHelperManager.AddCommandLineArguments(
argument_group, names=names)
argument_group.add_argument(
'-x', '--extensions', dest='extensions_string', action='store',
type=str, metavar='EXTENSIONS', help=(
'Filter on file name extensions. This option accepts multiple '
'multiple comma separated values e.g. "csv,docx,pst".'))
argument_group.add_argument(
'--names', dest='names_string', action='store',
type=str, metavar='NAMES', help=(
'Filter on file names. This option accepts a comma separated '
'string denoting all file names, e.g. -x '
'"NTUSER.DAT,UsrClass.dat".'))
argument_group.add_argument(
'--signatures', dest='signature_identifiers', action='store',
type=str, metavar='IDENTIFIERS', help=(
'Filter on file format signature identifiers. This option '
'accepts multiple comma separated values e.g. "esedb,lnk". '
'Use "list" to show an overview of the supported file format '
'signatures.'))
[docs] def ListSignatureIdentifiers(self):
"""Lists the signature identifier.
Raises:
BadConfigOption: if the data location is invalid.
"""
if not self._data_location:
raise errors.BadConfigOption('Missing data location.')
path = os.path.join(self._data_location, 'signatures.conf')
if not os.path.exists(path):
raise errors.BadConfigOption(
'No such format specification file: {0:s}'.format(path))
try:
specification_store = self._ReadSpecificationFile(path)
except IOError as exception:
raise errors.BadConfigOption((
'Unable to read format specification file: {0:s} with error: '
'{1!s}').format(path, exception))
identifiers = []
for format_specification in specification_store.specifications:
identifiers.append(format_specification.identifier)
self._output_writer.Write('Available signature identifiers:\n')
self._output_writer.Write(
'\n'.join(textwrap.wrap(', '.join(sorted(identifiers)), 79)))
self._output_writer.Write('\n\n')
[docs] def ParseArguments(self):
"""Parses the command line arguments.
Returns:
bool: True if the arguments were successfully parsed.
"""
loggers.ConfigureLogging()
argument_parser = argparse.ArgumentParser(
description=self.DESCRIPTION, epilog=self.EPILOG, add_help=False,
formatter_class=argparse.RawDescriptionHelpFormatter)
self.AddBasicOptions(argument_parser)
self.AddInformationalOptions(argument_parser)
argument_helper_names = ['artifact_definitions', 'data_location']
if self._CanEnforceProcessMemoryLimit():
argument_helper_names.append('process_resources')
helpers_manager.ArgumentHelperManager.AddCommandLineArguments(
argument_parser, names=argument_helper_names)
self.AddLogFileOptions(argument_parser)
self.AddStorageMediaImageOptions(argument_parser)
self.AddVSSProcessingOptions(argument_parser)
self.AddFilterOptions(argument_parser)
argument_parser.add_argument(
'-w', '--write', action='store', dest='path', type=str,
metavar='PATH', default='export', help=(
'The directory in which extracted files should be stored.'))
argument_parser.add_argument(
'--include_duplicates', dest='include_duplicates',
action='store_true', default=False, help=(
'If extraction from VSS is enabled, by default a digest hash '
'is calculated for each file. These hashes are compared to the '
'previously exported files and duplicates are skipped. Use '
'this option to include duplicate files in the export.'))
argument_parser.add_argument(
self._SOURCE_OPTION, nargs='?', action='store', metavar='IMAGE',
default=None, type=str, help=(
'The full path to the image file that we are about to extract '
'files from, it should be a raw image or another image that '
'plaso supports.'))
try:
options = argument_parser.parse_args()
except UnicodeEncodeError:
# If we get here we are attempting to print help in a non-Unicode
# terminal.
self._output_writer.Write('')
self._output_writer.Write(argument_parser.format_help())
return False
try:
self.ParseOptions(options)
except errors.BadConfigOption as exception:
self._output_writer.Write('ERROR: {0!s}\n'.format(exception))
self._output_writer.Write('')
self._output_writer.Write(argument_parser.format_usage())
return False
loggers.ConfigureLogging(
debug_output=self._debug_mode, filename=self._log_file,
quiet_mode=self._quiet_mode)
return True
[docs] def ParseOptions(self, options):
"""Parses the options and initializes the front-end.
Args:
options (argparse.Namespace): command line arguments.
Raises:
BadConfigOption: if the options are invalid.
"""
# The data location is required to list signatures.
helpers_manager.ArgumentHelperManager.ParseOptions(
options, self, names=['data_location'])
# Check the list options first otherwise required options will raise.
signature_identifiers = self.ParseStringOption(
options, 'signature_identifiers')
if signature_identifiers == 'list':
self.list_signature_identifiers = True
if self.list_signature_identifiers:
return
self._ParseInformationalOptions(options)
self._ParseLogFileOptions(options)
self._ParseStorageMediaOptions(options)
self._destination_path = self.ParseStringOption(
options, 'path', default_value='export')
if not self._data_location:
logger.warning('Unable to automatically determine data location.')
argument_helper_names = ['artifact_definitions', 'process_resources']
helpers_manager.ArgumentHelperManager.ParseOptions(
options, self, names=argument_helper_names)
self._ParseFilterOptions(options)
if (getattr(options, 'no_vss', False) or
getattr(options, 'include_duplicates', False)):
self._skip_duplicates = False
self._EnforceProcessMemoryLimit(self._process_memory_limit)
self._filter_collection.Print(self._output_writer)
[docs] def ProcessSources(self):
"""Processes the sources.
Raises:
SourceScannerError: if the source scanner could not find a supported
file system.
UserAbort: if the user initiated an abort.
"""
self.ScanSource(self._source_path)
self._output_writer.Write('Export started.\n')
if not os.path.isdir(self._destination_path):
os.makedirs(self._destination_path)
if self._artifact_filters or self._filter_file:
self._ExtractWithFilter(
self._source_path_specs, self._destination_path, self._output_writer,
self._artifact_filters, self._filter_file,
self._artifact_definitions_path, self._custom_artifacts_path,
skip_duplicates=self._skip_duplicates)
else:
self._Extract(
self._source_path_specs, self._destination_path, self._output_writer,
skip_duplicates=self._skip_duplicates)
self._output_writer.Write('Export completed.\n')
self._output_writer.Write('\n')