Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Artifact definitions filter helper #1883

Merged
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 201 additions & 0 deletions plaso/engine/artifact_filters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
# -*- coding: utf-8 -*-
"""Helper to create filters based on forensic artifact definitions."""

from __future__ import unicode_literals

from artifacts import definitions as artifact_types

from dfvfs.helpers import file_system_searcher
from dfwinreg import registry_searcher
from plaso.engine import logger
from plaso.engine import path_helper

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • white line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


class ArtifactDefinitionsFilterHelper(object):
"""Helper to create filters based on artifact definitions.

Builds extraction filters from forensic artifact definitions.

For more information about Forensic Artifacts see:
https://github.com/ForensicArtifacts/artifacts/blob/master/docs/Artifacts%20definition%20format%20and%20style%20guide.asciidoc
"""

_KNOWLEDGE_BASE_VALUE = 'ARTIFACT_FILTERS'

_COMPATIBLE_REGISTRY_KEY_PATH_PREFIXES = ['HKEY_LOCAL_MACHINE']

def __init__(self, artifacts_registry, artifact_definitions, knowledge_base):
"""Initializes an artifact definitions filter helper.

Args:
artifacts_registry (artifacts.ArtifactDefinitionsRegistry]): artifact
definitions registry.
artifact_definitions (list[str]): artifact definition names to filter.
path (str): path to a file that contains one or more artifact definitions.
knowledge_base (KnowledgeBase): contains information from the source
data needed for filtering.
"""
super(ArtifactDefinitionsFilterHelper, self).__init__()
self._artifacts = artifact_definitions
self._artifacts_registry = artifacts_registry
self._knowledge_base = knowledge_base

def _CheckKeyCompatibility(self, key_path):
"""Checks if a Windows Registry key path is supported by dfWinReg.

Args:
key_path (str): path of the Windows Registry key.

Returns:
bool: True if key is compatible or False if not.
"""
for key_path_prefix in self._COMPATIBLE_REGISTRY_KEY_PATH_PREFIXES:
if key_path.startswith(key_path_prefix):
return True

logger.warning(
'Prefix of key "{0:s}" is currently not supported'.format(key_path))
return False

def BuildFindSpecs(self, environment_variables=None):
"""Builds find specifications from artifact definitions.

The resulting find specifications are set in the knowledge base.

Args:
environment_variables (Optional[list[EnvironmentVariableArtifact]]):
environment variables.
"""
find_specs_per_source_type = {
artifact_types.TYPE_INDICATOR_FILE: [],
artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY: []}

for name in self._artifacts:
definition = self._artifacts_registry.GetDefinitionByName(name)
if not definition:
continue

for source in definition.sources:
if source.type_indicator == artifact_types.TYPE_INDICATOR_FILE:
# TODO: move source.paths iteration into
# BuildFindSpecsFromFileArtifact.
for path_entry in set(source.paths):
find_specs = self.BuildFindSpecsFromFileArtifact(
path_entry, source.separator, environment_variables,
self._knowledge_base.user_accounts)
find_specs_per_source_type[
artifact_types.TYPE_INDICATOR_FILE].extend(find_specs)

elif (source.type_indicator ==
artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY):
# TODO: move source.keys iteration into
# BuildFindSpecsFromRegistryArtifact.
for key_path in set(source.keys):
if self._CheckKeyCompatibility(key_path):
find_specs = self.BuildFindSpecsFromRegistryArtifact(key_path)
find_specs_per_source_type[
artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY].extend(
find_specs)

elif (source.type_indicator ==
artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_VALUE):
# TODO: Handle Registry Values Once Supported in dfwinreg.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which dfwinreg issue is this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed log2timeline/dfwinreg#98 and added to TODO.

# https://github.com/log2timeline/dfwinreg/issues/98
logger.warning((
'Windows Registry values are not supported, extracting key: '
'"{0!s}"').format(source.key_value_pairs))

# TODO: move source.key_value_pairs iteration into
# BuildFindSpecsFromRegistryArtifact.
for key_path in set([
key_path for key_path, _ in source.key_value_pairs]):
if self._CheckKeyCompatibility(key_path):
find_specs = self.BuildFindSpecsFromRegistryArtifact(key_path)
find_specs_per_source_type[
artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY].extend(
find_specs)

else:
logger.warning(
'Unsupported artifact definition source type: "{0:s}"'.format(
source.type_indicator))

self._knowledge_base.SetValue(
self._KNOWLEDGE_BASE_VALUE, find_specs_per_source_type)

def BuildFindSpecsFromFileArtifact(
self, source_path, path_separator, environment_variables, user_accounts):
"""Builds find specifications from a file source type.

Args:
source_path (str): file system path defined by the source.
path_separator (str): file system path segment separator.
environment_variables list(str): environment variable attributes used to
dynamically populate environment variables in key.
user_accounts (list[str]): identified user accounts stored in the
knowledge base.

Returns:
list[dfvfs.FindSpec]: find specifications for the file source type.
"""
find_specs = []
for glob_path in path_helper.PathHelper.ExpandRecursiveGlobs(
source_path, path_separator):
for path in path_helper.PathHelper.ExpandUsersHomeDirectoryPath(
glob_path, user_accounts):
if '%' in path:
path = path_helper.PathHelper.ExpandWindowsPath(
path, environment_variables)

if not path.startswith(path_separator):
logger.warning((
'The path filter must be defined as an absolute path: '
'"{0:s}"').format(path))
continue

# Convert the path filters into a list of path segments and
# strip the root path segment.
path_segments = path.split(path_separator)

# Remove initial root entry
path_segments.pop(0)

if not path_segments[-1]:
logger.warning(
'Empty last path segment in path filter: "{0:s}"'.format(path))
path_segments.pop(-1)

try:
find_spec = file_system_searcher.FindSpec(
location_glob=path_segments, case_sensitive=False)
except ValueError as exception:
logger.error((
'Unable to build find specification for path: "{0:s}" with '
'error: {1!s}').format(path, exception))
continue

find_specs.append(find_spec)

return find_specs

def BuildFindSpecsFromRegistryArtifact(self, source_key_path):
"""Build find specifications from a Windows Registry source type.

Args:
source_key_path (str): Windows Registry key path defined by the source.

Returns:
list[dfwinreg.FindSpec]: find specifications for the Windows Registry
source type.
"""
find_specs = []
for key_path in path_helper.PathHelper.ExpandRecursiveGlobs(
source_key_path, '\\'):
if '%%' in key_path:
logger.error('Unable to expand key path: "{0:s}"'.format(key_path))
continue

find_spec = registry_searcher.FindSpec(key_path_glob=key_path)
find_specs.append(find_spec)

return find_specs
122 changes: 121 additions & 1 deletion plaso/engine/path_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,121 @@

from __future__ import unicode_literals

import re

from dfvfs.lib import definitions as dfvfs_definitions

from plaso.engine import logger
from plaso.lib import py2to3


class PathHelper(object):
"""Class that implements the path helper."""

_RECURSIVE_GLOB_LIMIT = 10

@classmethod
def AppendPathEntries(cls, path, path_separator, count, skip_first):
"""Appends wildcard entries to end of path.

Will append wildcard * to given path building a list of strings for "count"
iterations, skipping the first directory if skip_first is true.

Args:
path (str): Path to append wildcards to.
path_separator (str): path segment separator.
count (int): Number of entries to be appended.
skip_first (bool): Whether or not to skip first entry to append.

Returns:
list[str]: Paths that were expanded from the path with wildcards.
"""
paths = []
replacement = '{0:s}*'.format(path_separator)

iteration = 0
while iteration < count:
if skip_first and iteration == 0:
path += replacement
else:
path += replacement
paths.append(path)
iteration += 1

return paths

@classmethod
def ExpandRecursiveGlobs(cls, path, path_separator):
"""Expands recursive like globs present in an artifact path.

If a path ends in '**', with up to two optional digits such as '**10',
the '**' will recursively match all files and zero or more directories
from the specified path. The optional digits indicate the recursion depth.
By default recursion depth is 10 directories.

If the glob is followed by the specified path segment separator, only
directories and subdirectories will be matched.

Args:
path (str): path to be expanded.
path_separator (str): path segment separator.

Returns:
list[str]: String path expanded for each glob.
"""
glob_regex = r'(.*)?{0}\*\*(\d{{1,2}})?({0})?$'.format(
re.escape(path_separator))

match = re.search(glob_regex, path)
if not match:
return [path]

skip_first = False
if match.group(3):
skip_first = True
if match.group(2):
iterations = int(match.group(2))
else:
iterations = cls._RECURSIVE_GLOB_LIMIT
logger.warning((
'Path "{0:s}" contains fully recursive glob, limiting to 10 '
'levels').format(path))

return cls.AppendPathEntries(
match.group(1), path_separator, iterations, skip_first)

@classmethod
def ExpandUsersHomeDirectoryPath(cls, path, user_accounts):
"""Expands a path to contain all users home or profile directories.

Expands the GRR artifacts path variable "%%users.homedir%%".

Args:
path (str): Windows path with environment variables.
user_accounts (list[UserAccountArtifact]): user accounts.

Returns:
list [str]: paths returned for user accounts without a drive letter.
"""
path_upper_case = path.upper()
if not path_upper_case.startswith('%%USERS.HOMEDIR%%'):
user_paths = [path]
else:
regex = re.compile(re.escape('%%users.homedir%%'))

user_paths = []
for user_account in user_accounts:
user_path = regex.sub(user_account.user_directory, path, re.IGNORECASE)
user_paths.append(user_path)

# Remove the drive letter, if it exists.
for path_index, user_path in enumerate(user_paths):
if len(user_path) > 2 and user_path[1] == ':':
_, _, user_path = user_path.rpartition(':')
user_paths[path_index] = user_path

return user_paths

@classmethod
def ExpandWindowsPath(cls, path, environment_variables):
"""Expands a Windows path containing environment variables.
Expand All @@ -23,6 +130,8 @@ def ExpandWindowsPath(cls, path, environment_variables):
Returns:
str: expanded Windows path.
"""
# TODO: Add support for items such as %%users.localappdata%%

if environment_variables is None:
environment_variables = []

Expand All @@ -42,9 +151,20 @@ def ExpandWindowsPath(cls, path, environment_variables):
not path_segment.endswith('%')):
continue

lookup_key = path_segment.upper()[1:-1]
check_for_drive_letter = False
path_segment_upper_case = path_segment.upper()
if path_segment_upper_case.startswith('%%ENVIRON_'):
lookup_key = path_segment_upper_case[10:-2]
check_for_drive_letter = True
else:
lookup_key = path_segment_upper_case[1:-1]
path_segments[index] = lookup_table.get(lookup_key, path_segment)

if check_for_drive_letter:
# Remove the drive letter.
if len(path_segments[index]) >= 2 and path_segments[index][1] == ':':
_, _, path_segments[index] = path_segments[index].rpartition(':')

return '\\'.join(path_segments)

@classmethod
Expand Down
Loading