From 99d22dfa79c5fa1707cef1ce1c1c0c37e48bdcda Mon Sep 17 00:00:00 2001 From: James Nettesheim Date: Fri, 4 May 2018 00:24:21 -0700 Subject: [PATCH 01/17] Initial commit artifact artifact definitions filter helper --- plaso/engine/artifact_filters.py | 259 +++++++++++++++++++++ test_data/artifacts/artifacts_filters.yaml | 66 ++++++ tests/engine/artifact_filters.py | 164 +++++++++++++ 3 files changed, 489 insertions(+) create mode 100644 plaso/engine/artifact_filters.py create mode 100644 test_data/artifacts/artifacts_filters.yaml create mode 100644 tests/engine/artifact_filters.py diff --git a/plaso/engine/artifact_filters.py b/plaso/engine/artifact_filters.py new file mode 100644 index 0000000000..b7422ad082 --- /dev/null +++ b/plaso/engine/artifact_filters.py @@ -0,0 +1,259 @@ +# -*- coding: utf-8 -*- +"""Helper to create filters based on forensic artifact definitions.""" + +from __future__ import unicode_literals + +import logging +import re + +from artifacts import definitions as artifact_types + +from dfvfs.helpers import file_system_searcher +from dfwinreg import registry_searcher + +from plaso.engine import path_helper + +class ArtifactDefinitionsFilterHelper(object): + """Helper to create filters based on artifact defintions. + + Builds extraction and parsing filters from forensic artifact definitions. + + For more information about Forensic Artifacts see: + https://github.com/ForensicArtifacts/artifacts/blob/master/docs/Artifacts%20definition%20format%20and%20style%20guide.asciidoc + """ + + ARTIFACT_FILTERS = 'ARTIFACT_FILTERS' + COMPATIBLE_DFWINREG_KEYS = frozenset([ + 'HKEY_LOCAL_MACHINE', + 'HKEY_LOCAL_MACHINE\\SYSTEM', + 'HKEY_LOCAL_MACHINE\\SOFTWARE', + 'HKEY_LOCAL_MACHINE\\SAM', + 'HKEY_LOCAL_MACHINE\\SECURITY']) + STANDARD_OS_FILTERS = frozenset([ + 'windows', + 'linux', + 'darwin']) + RECURSIVE_GLOB_LIMIT = 10 + + def __init__(self, artifacts_registry, artifacts, knowledge_base): + """Initializes an artifact definitions filter helper. + + Args: + artifacts_registry (artifacts.ArtifactDefinitionsRegistry]): artifact + definitions registry. + artifacts (list[str]): artifact names to process. + path (str): path to a file that contains one or more forensic artifacts. + knowledge_base (KnowledgeBase): Knowledge base for Log2Timeline. + """ + super(ArtifactDefinitionsFilterHelper, self).__init__() + self._artifacts_registry = artifacts_registry + self._artifacts = artifacts + self._knowledge_base = knowledge_base + + def BuildFindSpecs(self, environment_variables=None): + """Build find specification from a forensic artifact definitions. + + Args: + environment_variables (Optional[list[EnvironmentVariableArtifact]]): + environment variables. + """ + find_specs = {} + + artifact_defintions = [] + # Check if single operating system type has been provided, in that case + # process all artifact types for that OS. + if (len(self._artifacts) == 1 and + self._artifacts[0] in self.STANDARD_OS_FILTERS): + for definition in self._artifacts_registry.GetDefinitions(): + if self._filters[0].lower() in ( + os.lower() for os in definition.supported_os): + artifact_defintions.append(definition) + else: + for artifact_filter in self._artifacts: + if self._artifacts_registry.GetDefinitionByName(artifact_filter): + artifact_defintions.append( + self._artifacts_registry.GetDefinitionByName(artifact_filter)) + + for definition in artifact_defintions: + for source in definition.sources: + if source.type_indicator == artifact_types.TYPE_INDICATOR_FILE: + for path_entry in source.paths: + self.BuildFindSpecsFromFileArtifact( + path_entry, source.separator, environment_variables, find_specs) + elif (source.type_indicator == + artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY): + keys = set(source.keys) + for key_entry in keys: + if self._CheckKeyCompatibility(key_entry): + self.BuildFindSpecsFromRegistryArtifact( + key_entry, find_specs) + elif (source.type_indicator == + artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_VALUE): + # TODO: Handle Registry Values Once Supported in dfwinreg. + logging.warning(('Unable to handle Registry Value, extracting ' + 'key only: {0:s} ').format(source.key_value_pairs)) + + for key_pair in source.key_value_pairs: + keys = set() + keys.add(key_pair.get('key')) + for key_entry in keys: + if self._CheckKeyCompatibility(key_entry): + self.BuildFindSpecsFromRegistryArtifact( + key_entry, find_specs) + else: + logging.warning(('Unable to handle artifact, plaso does not ' + 'support: {0:s} ').format(source.type_indicator)) + + self._knowledge_base.SetValue(self.ARTIFACT_FILTERS, find_specs) + + def BuildFindSpecsFromFileArtifact( + self, path_entry, separator, environment_variables, find_specs): + """Build find specification from a FILE artifact type. + + Args: + path_entry (str): Current file system path to add. + environment variables. + separator (str): File system path separator. + environment_variables list(str): Environment variable attributes used to + dynamically populate environment variables in key. + find_specs dict[artifacts.artifact_types]: Dictionary containing + find_specs. + """ + for glob_path in self._ExpandRecursiveGlobs(path_entry): + for path in path_helper.PathHelper.ExpandUserHomeDirPath( + glob_path, self._knowledge_base.user_accounts): + if '%' in path: + path = path_helper.PathHelper.ExpandWindowsPath( + path, environment_variables) + + if not path.startswith('/') and not path.startswith('\\'): + logging.warning(( + 'The path filter must be defined as an absolute path: ' + '{0:s}').format(path)) + continue + + # Convert the path filters into a list of path segments and + # strip the root path segment. + path_segments = path.split(separator) + + # If the source didn't specify a separator, '/' is returned by + # default from ForensicArtifacts, this is sometimes wrong. Thus, + # need to check if '\' characters are still present and split on those. + if len(path_segments) == 1 and '\\' in path_segments[0]: + logging.warning('Potentially bad separator = {0:s} , trying \'\\\'' + .format(separator)) + path_segments = path.split('\\') + path_segments.pop(0) + + if not path_segments[-1]: + logging.warning( + 'Empty last path segment in path filter: {0:s}'.format(path)) + path_segments.pop(-1) + + try: + find_spec = file_system_searcher.FindSpec( + location_glob=path_segments, case_sensitive=False) + except ValueError as exception: + logging.error(( + 'Unable to build find spec for path: {0:s} with error: {1!s}' + ).format(path, exception)) + continue + if artifact_types.TYPE_INDICATOR_FILE not in find_specs: + find_specs[artifact_types.TYPE_INDICATOR_FILE] = [] + find_specs[artifact_types.TYPE_INDICATOR_FILE].append( + find_spec) + + def BuildFindSpecsFromRegistryArtifact( + self, key_entry, find_specs): + """Build find specification from a Windows registry artifact type. + + Args: + key_entry (str): Current file system key to add. + find_specs dict[artifacts.artifact_types]: Dictionary containing + find_specs. + """ + for key in self._ExpandRecursiveGlobs(key_entry): + if '%%' in key: + logging.error(( + 'Unable to expand path filter: {0:s}').format(key)) + continue + find_spec = registry_searcher.FindSpec(key_path_glob=key) + if artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY not in find_specs: + find_specs[artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY] = [] + + find_specs[artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY].append( + find_spec) + + def _CheckKeyCompatibility(self, key): + """Check if a registry key is compatible with dfwinreg. + + Args: + key (str): String key to to check for dfwinreg compatibility. + + Returns: + (bool): Boolean whether key is compatible or not. + """ + key_path_prefix = key.split('\\')[0] + if key_path_prefix.upper() in self.COMPATIBLE_DFWINREG_KEYS: + return True + logging.warning('Key {0:s}, has a prefix {1:s} that is not supported ' + 'by dfwinreg presently'.format(key, key_path_prefix)) + return False + + def _ExpandRecursiveGlobs(self, path): + """Expand recursive like globs present in an artifact path. + + If a path ends in '**', with up two optional digits such as '**10', + the '**' will match all files and zero or more directories and + subdirectories from the specified path recursively. The optional digits + provide the depth to which the recursion should continue. By default + recursion depth is 10 directories. If the pattern is followed by a ‘/’ + or '\', only directories and subdirectories match. + + Args: + path (str): String path to be expanded. + + Returns: + list[str]: String path expanded for each glob. + """ + + match = re.search(r'(.*)?(\\|/)\*\*(\d{1,2})?(\\|/)?$', path) + if match: + skip_first = False + if match.group(4): + skip_first = True + if match.group(3): + iterations = match.group(3) + else: + iterations = self.RECURSIVE_GLOB_LIMIT + logging.warning('Path {0:s} contains fully recursive glob, limiting ' + 'to ten levels'.format(path)) + paths = self._BuildRecursivePaths(match.group(1), iterations, skip_first) + return paths + else: + return [path] + + def _BuildRecursivePaths(self, path, count, skip_first): + """Append wildcard entries to end of path. + + Args: + path (str): Path to append wildcards to. + + Returns: + path list[str]: Paths expanded with wildcards. + """ + paths = [] + if '\\' in path: + replacement = '\\*' + else: + replacement = '/*' + + for iteration in range(count): + if skip_first and iteration == 0: + continue + else: + path += replacement + paths.append(path) + iteration += 1 + + return paths diff --git a/test_data/artifacts/artifacts_filters.yaml b/test_data/artifacts/artifacts_filters.yaml new file mode 100644 index 0000000000..19723f8567 --- /dev/null +++ b/test_data/artifacts/artifacts_filters.yaml @@ -0,0 +1,66 @@ +# Artifact definitions. + +name: TestFiles +doc: Test Doc +sources: +- type: FILE + attributes: + paths: ['%%environ_systemdrive%%\AUTHORS'] + separator: '\' +labels: [System] +supported_os: [Windows] +--- +name: TestFiles2 +doc: Test Doc2 +sources: +- type: FILE + attributes: + paths: + - '%%environ_systemdrive%%\test_data\*.evtx' + - '\test_data\testdir\filter_*.txt' + - '\does_not_exist\some_file_*.txt' + - '\globbed\test\path\**\' + - 'failing' + separator: '\' +labels: [System] +supported_os: [Windows] +--- +name: TestRegistry +doc: Test Registry Doc +sources: +- type: REGISTRY_KEY + attributes: + keys: ['HKEY_LOCAL_MACHINE\System\CurrentControlSet\Control\SecurityProviders\*'] +supported_os: [Windows] +--- +name: TestRegistryKey +doc: Test Registry Doc Key +sources: +- type: REGISTRY_KEY + attributes: + keys: + - 'HKEY_LOCAL_MACHINE\System\ControlSet001\services\**\' + - 'HKEY_LOCAL_MACHINE\System\ControlSet002\services\**\' + - 'HKEY_LOCAL_MACHINE\System\CurrentControlSet\Enum\USBSTOR' + - 'HKEY_LOCAL_MACHINE\System\CurrentControlSet\Enum\USBSTOR\**' +supported_os: [Windows] +--- +name: TestRegistryValue +doc: Test Registry Doc Value +sources: +- type: REGISTRY_VALUE + attributes: + key_value_pairs: + - {key: 'HKEY_LOCAL_MACHINE\System\ControlSet001\Control\Session Manager', value: 'BootExecute'} + - {key: 'HKEY_LOCAL_MACHINE\System\ControlSet002\Control\Session Manager', value: 'BootExecute'} +supported_os: [Windows] +--- +name: TestFilesImageExport +doc: Test Doc +sources: +- type: FILE + attributes: + paths: ['\a_directory\*_file'] + separator: '\' +labels: [System] +supported_os: [Windows] \ No newline at end of file diff --git a/tests/engine/artifact_filters.py b/tests/engine/artifact_filters.py new file mode 100644 index 0000000000..a0436ac80c --- /dev/null +++ b/tests/engine/artifact_filters.py @@ -0,0 +1,164 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +"""Tests for the artifacts file filter functions.""" + +from __future__ import unicode_literals + +import unittest + +from artifacts import definitions as artifact_types +from artifacts import reader as artifacts_reader +from artifacts import registry as artifacts_registry + +from dfwinreg import registry as dfwinreg_registry +from dfwinreg import registry_searcher as dfwinreg_registry_searcher + +from dfvfs.helpers import file_system_searcher +from dfvfs.lib import definitions as dfvfs_definitions +from dfvfs.path import factory as path_spec_factory +from dfvfs.resolver import resolver as path_spec_resolver + +from plaso.containers import artifacts +from plaso.engine import artifact_filters +from plaso.engine import knowledge_base as knowledge_base_engine +from plaso.parsers import winreg as windows_registry_parser + +from tests import test_lib as shared_test_lib + + +class BuildFindSpecsFromFileTest(shared_test_lib.BaseTestCase): + """Tests for the BuildFindSpecsFromFile function.""" + + @shared_test_lib.skipUnlessHasTestFile(['artifacts']) + @shared_test_lib.skipUnlessHasTestFile(['System.evtx']) + @shared_test_lib.skipUnlessHasTestFile(['testdir', 'filter_1.txt']) + @shared_test_lib.skipUnlessHasTestFile(['testdir', 'filter_3.txt']) + def testBuildFileFindSpecs(self): + """Tests the BuildFindSpecs function for file type artifacts.""" + knowledge_base = knowledge_base_engine.KnowledgeBase() + + artifact_definitions = ['TestFiles', 'TestFiles2'] + registry = artifacts_registry.ArtifactDefinitionsRegistry() + reader = artifacts_reader.YamlArtifactsReader() + + registry.ReadFromDirectory(reader, self._GetTestFilePath(['artifacts'])) + + test_filter_file = artifact_filters.ArtifactDefinitionsFilterHelper( + registry, artifact_definitions, knowledge_base) + + environment_variable = artifacts.EnvironmentVariableArtifact( + case_sensitive=False, name='SystemDrive', value='C:') + + test_filter_file.BuildFindSpecs( + environment_variables=[environment_variable]) + find_specs = knowledge_base.GetValue( + artifact_filters.ArtifactDefinitionsFilterHelper.ARTIFACT_FILTERS) + + self.assertEqual(len(find_specs[artifact_types.TYPE_INDICATOR_FILE]), 13) + + path_spec = path_spec_factory.Factory.NewPathSpec( + dfvfs_definitions.TYPE_INDICATOR_OS, location='.') + file_system = path_spec_resolver.Resolver.OpenFileSystem(path_spec) + searcher = file_system_searcher.FileSystemSearcher( + file_system, path_spec) + + path_spec_generator = searcher.Find( + find_specs=find_specs[artifact_types.TYPE_INDICATOR_FILE]) + self.assertIsNotNone(path_spec_generator) + + path_specs = list(path_spec_generator) + # Two evtx, one symbolic link to evtx, one AUTHORS, two filter_*.txt files, + # total 6 path specifications. + self.assertEqual(len(path_specs), 6) + + file_system.Close() + + @shared_test_lib.skipUnlessHasTestFile(['artifacts']) + @shared_test_lib.skipUnlessHasTestFile(['SYSTEM']) + def testBuildRegistryFindSpecs(self): + """Tests the BuildFindSpecs function for registry artifacts.""" + knowledge_base = knowledge_base_engine.KnowledgeBase() + + artifact_definitions = ['TestRegistry'] + registry = artifacts_registry.ArtifactDefinitionsRegistry() + reader = artifacts_reader.YamlArtifactsReader() + + registry.ReadFromDirectory(reader, self._GetTestFilePath(['artifacts'])) + + test_filter_file = artifact_filters.ArtifactDefinitionsFilterHelper( + registry, artifact_definitions, knowledge_base) + + test_filter_file.BuildFindSpecs(environment_variables=None) + find_specs = knowledge_base.GetValue( + artifact_filters.ArtifactDefinitionsFilterHelper.ARTIFACT_FILTERS) + + self.assertEqual( + len(find_specs[artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY]), 1) + + win_registry_reader = ( + windows_registry_parser.FileObjectWinRegistryFileReader()) + + file_entry = self._GetTestFileEntry(['SYSTEM']) + file_object = file_entry.GetFileObject() + + registry_file = win_registry_reader.Open(file_object) + + win_registry = dfwinreg_registry.WinRegistry() + key_path_prefix = win_registry.GetRegistryFileMapping(registry_file) + registry_file.SetKeyPathPrefix(key_path_prefix) + win_registry.MapFile(key_path_prefix, registry_file) + + searcher = dfwinreg_registry_searcher.WinRegistrySearcher(win_registry) + key_paths = list(searcher.Find(find_specs=find_specs[ + artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY])) + + self.assertIsNotNone(key_paths) + + # Three key paths found + self.assertEqual(len(key_paths), 3) + + + def testBuildRegistryFindSpecs(self): + """Tests the BuildFindSpecs function for registry artifacts.""" + knowledge_base = knowledge_base_engine.KnowledgeBase() + + artifact_definitions = ['TestRegistry'] + registry = artifacts_registry.ArtifactDefinitionsRegistry() + reader = artifacts_reader.YamlArtifactsReader() + + registry.ReadFromDirectory(reader, self._GetTestFilePath(['artifacts'])) + + test_filter_file = artifact_filters.ArtifactDefinitionsFilterHelper( + registry, artifact_definitions, knowledge_base) + + test_filter_file.BuildFindSpecs(environment_variables=None) + find_specs = knowledge_base.GetValue( + artifact_filters.ArtifactDefinitionsFilterHelper.ARTIFACT_FILTERS) + + self.assertEqual( + len(find_specs[artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY]), 1) + + win_registry_reader = ( + windows_registry_parser.FileObjectWinRegistryFileReader()) + + file_entry = self._GetTestFileEntry(['SYSTEM']) + file_object = file_entry.GetFileObject() + + registry_file = win_registry_reader.Open(file_object) + + win_registry = dfwinreg_registry.WinRegistry() + key_path_prefix = win_registry.GetRegistryFileMapping(registry_file) + registry_file.SetKeyPathPrefix(key_path_prefix) + win_registry.MapFile(key_path_prefix, registry_file) + + searcher = dfwinreg_registry_searcher.WinRegistrySearcher(win_registry) + key_paths = list(searcher.Find(find_specs=find_specs[ + artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY])) + + self.assertIsNotNone(key_paths) + + # Three key paths found + self.assertEqual(len(key_paths), 3) + +if __name__ == '__main__': + unittest.main() From a220538560f1f28672e0da7dcc41bbdc1bfc53c6 Mon Sep 17 00:00:00 2001 From: James Nettesheim Date: Thu, 10 May 2018 16:07:55 -0700 Subject: [PATCH 02/17] Minor code changes and adding path_helper changes --- plaso/engine/artifact_filters.py | 2 +- plaso/engine/path_helper.py | 39 ++++++++++++++++++++++++++++- tests/engine/artifact_filters.py | 43 -------------------------------- 3 files changed, 39 insertions(+), 45 deletions(-) diff --git a/plaso/engine/artifact_filters.py b/plaso/engine/artifact_filters.py index b7422ad082..2b7f953b16 100644 --- a/plaso/engine/artifact_filters.py +++ b/plaso/engine/artifact_filters.py @@ -65,7 +65,7 @@ def BuildFindSpecs(self, environment_variables=None): if (len(self._artifacts) == 1 and self._artifacts[0] in self.STANDARD_OS_FILTERS): for definition in self._artifacts_registry.GetDefinitions(): - if self._filters[0].lower() in ( + if self._artifacts[0].lower() in ( os.lower() for os in definition.supported_os): artifact_defintions.append(definition) else: diff --git a/plaso/engine/path_helper.py b/plaso/engine/path_helper.py index b3867cd5b8..7aa85aca63 100644 --- a/plaso/engine/path_helper.py +++ b/plaso/engine/path_helper.py @@ -3,6 +3,8 @@ from __future__ import unicode_literals +import re + from dfvfs.lib import definitions as dfvfs_definitions from plaso.lib import py2to3 @@ -23,6 +25,8 @@ def ExpandWindowsPath(cls, path, environment_variables): Returns: str: expanded Windows path. """ + #TODO: Add support for items such as %%users.localappdata%% + if environment_variables is None: environment_variables = [] @@ -42,9 +46,19 @@ def ExpandWindowsPath(cls, path, environment_variables): not path_segment.endswith('%')): continue - lookup_key = path_segment.upper()[1:-1] + check_for_drive_letter = False + if path_segment.upper().startswith('%%ENVIRON_'): + lookup_key = path_segment.upper()[10:-2] + check_for_drive_letter = True + else: + lookup_key = path_segment.upper()[1:-1] path_segments[index] = lookup_table.get(lookup_key, path_segment) + if check_for_drive_letter: + # Remove the drive letter. + if len(path_segments[index]) >= 2 and path_segments[index][1] == ':': + _, _, path_segments[index] = path_segments[index].rpartition(':') + return '\\'.join(path_segments) @classmethod @@ -133,3 +147,26 @@ def GetRelativePathForPathSpec(cls, path_spec, mount_path=None): location = location[len(mount_path):] return location + + @classmethod + def ExpandUserHomeDirPath(cls, path, user_accounts): + """Expands a path to contain all users home directories. + + Args: + path (str): Windows path with environment variables. + user_accounts (list[UserAccountArtifact]): user accounts. + + Returns: + list [str]: paths returned for user accounts. + """ + + user_paths = [] + if path.upper().startswith('%%USERS.HOMEDIR%%'): + regex = re.compile(re.escape('%%users.homedir%%')) + for user_account in user_accounts: + new_path = regex.sub(user_account.user_directory, path) + user_paths.append(new_path) + else: + user_paths = [path] + + return user_paths diff --git a/tests/engine/artifact_filters.py b/tests/engine/artifact_filters.py index a0436ac80c..42b9611b4a 100644 --- a/tests/engine/artifact_filters.py +++ b/tests/engine/artifact_filters.py @@ -117,48 +117,5 @@ def testBuildRegistryFindSpecs(self): # Three key paths found self.assertEqual(len(key_paths), 3) - - def testBuildRegistryFindSpecs(self): - """Tests the BuildFindSpecs function for registry artifacts.""" - knowledge_base = knowledge_base_engine.KnowledgeBase() - - artifact_definitions = ['TestRegistry'] - registry = artifacts_registry.ArtifactDefinitionsRegistry() - reader = artifacts_reader.YamlArtifactsReader() - - registry.ReadFromDirectory(reader, self._GetTestFilePath(['artifacts'])) - - test_filter_file = artifact_filters.ArtifactDefinitionsFilterHelper( - registry, artifact_definitions, knowledge_base) - - test_filter_file.BuildFindSpecs(environment_variables=None) - find_specs = knowledge_base.GetValue( - artifact_filters.ArtifactDefinitionsFilterHelper.ARTIFACT_FILTERS) - - self.assertEqual( - len(find_specs[artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY]), 1) - - win_registry_reader = ( - windows_registry_parser.FileObjectWinRegistryFileReader()) - - file_entry = self._GetTestFileEntry(['SYSTEM']) - file_object = file_entry.GetFileObject() - - registry_file = win_registry_reader.Open(file_object) - - win_registry = dfwinreg_registry.WinRegistry() - key_path_prefix = win_registry.GetRegistryFileMapping(registry_file) - registry_file.SetKeyPathPrefix(key_path_prefix) - win_registry.MapFile(key_path_prefix, registry_file) - - searcher = dfwinreg_registry_searcher.WinRegistrySearcher(win_registry) - key_paths = list(searcher.Find(find_specs=find_specs[ - artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY])) - - self.assertIsNotNone(key_paths) - - # Three key paths found - self.assertEqual(len(key_paths), 3) - if __name__ == '__main__': unittest.main() From f40837abbee9d6a23c9ef9fd1762e2015e33a9dd Mon Sep 17 00:00:00 2001 From: James Nettesheim Date: Mon, 14 May 2018 16:33:59 -0700 Subject: [PATCH 03/17] Docstring updates --- plaso/engine/artifact_filters.py | 18 +++++++++--------- plaso/engine/path_helper.py | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/plaso/engine/artifact_filters.py b/plaso/engine/artifact_filters.py index 2b7f953b16..c011aec145 100644 --- a/plaso/engine/artifact_filters.py +++ b/plaso/engine/artifact_filters.py @@ -108,7 +108,7 @@ def BuildFindSpecs(self, environment_variables=None): def BuildFindSpecsFromFileArtifact( self, path_entry, separator, environment_variables, find_specs): - """Build find specification from a FILE artifact type. + """Build find specifications from a FILE artifact type. Args: path_entry (str): Current file system path to add. @@ -165,7 +165,7 @@ def BuildFindSpecsFromFileArtifact( def BuildFindSpecsFromRegistryArtifact( self, key_entry, find_specs): - """Build find specification from a Windows registry artifact type. + """Build find specifications from a Windows registry artifact type. Args: key_entry (str): Current file system key to add. @@ -203,12 +203,12 @@ def _CheckKeyCompatibility(self, key): def _ExpandRecursiveGlobs(self, path): """Expand recursive like globs present in an artifact path. - If a path ends in '**', with up two optional digits such as '**10', - the '**' will match all files and zero or more directories and - subdirectories from the specified path recursively. The optional digits - provide the depth to which the recursion should continue. By default - recursion depth is 10 directories. If the pattern is followed by a ‘/’ - or '\', only directories and subdirectories match. + If a path ends in '**', with up to two optional digits such as '**10', + the '**' will recursively match all files and zero or more directories + from the specified path. The optional digits indicate the recursion depth. + By default recursion depth is 10 directories. + If the glob is followed by a ‘/’ or '\', only directories and + subdirectories will be matched. Args: path (str): String path to be expanded. @@ -240,7 +240,7 @@ def _BuildRecursivePaths(self, path, count, skip_first): path (str): Path to append wildcards to. Returns: - path list[str]: Paths expanded with wildcards. + paths list[str]: Paths expanded with wildcards. """ paths = [] if '\\' in path: diff --git a/plaso/engine/path_helper.py b/plaso/engine/path_helper.py index 7aa85aca63..e67b4a3ed0 100644 --- a/plaso/engine/path_helper.py +++ b/plaso/engine/path_helper.py @@ -150,7 +150,7 @@ def GetRelativePathForPathSpec(cls, path_spec, mount_path=None): @classmethod def ExpandUserHomeDirPath(cls, path, user_accounts): - """Expands a path to contain all users home directories. + """Expands a Windows path to contain all user home directories. Args: path (str): Windows path with environment variables. From 0b47f08fd3d540a1042d4852e22e1ce308f104ef Mon Sep 17 00:00:00 2001 From: James Nettesheim Date: Fri, 4 May 2018 00:24:21 -0700 Subject: [PATCH 04/17] Initial commit artifact artifact definitions filter helper --- plaso/engine/artifact_filters.py | 259 +++++++++++++++++++++ test_data/artifacts/artifacts_filters.yaml | 66 ++++++ tests/engine/artifact_filters.py | 164 +++++++++++++ 3 files changed, 489 insertions(+) create mode 100644 plaso/engine/artifact_filters.py create mode 100644 test_data/artifacts/artifacts_filters.yaml create mode 100644 tests/engine/artifact_filters.py diff --git a/plaso/engine/artifact_filters.py b/plaso/engine/artifact_filters.py new file mode 100644 index 0000000000..b7422ad082 --- /dev/null +++ b/plaso/engine/artifact_filters.py @@ -0,0 +1,259 @@ +# -*- coding: utf-8 -*- +"""Helper to create filters based on forensic artifact definitions.""" + +from __future__ import unicode_literals + +import logging +import re + +from artifacts import definitions as artifact_types + +from dfvfs.helpers import file_system_searcher +from dfwinreg import registry_searcher + +from plaso.engine import path_helper + +class ArtifactDefinitionsFilterHelper(object): + """Helper to create filters based on artifact defintions. + + Builds extraction and parsing filters from forensic artifact definitions. + + For more information about Forensic Artifacts see: + https://github.com/ForensicArtifacts/artifacts/blob/master/docs/Artifacts%20definition%20format%20and%20style%20guide.asciidoc + """ + + ARTIFACT_FILTERS = 'ARTIFACT_FILTERS' + COMPATIBLE_DFWINREG_KEYS = frozenset([ + 'HKEY_LOCAL_MACHINE', + 'HKEY_LOCAL_MACHINE\\SYSTEM', + 'HKEY_LOCAL_MACHINE\\SOFTWARE', + 'HKEY_LOCAL_MACHINE\\SAM', + 'HKEY_LOCAL_MACHINE\\SECURITY']) + STANDARD_OS_FILTERS = frozenset([ + 'windows', + 'linux', + 'darwin']) + RECURSIVE_GLOB_LIMIT = 10 + + def __init__(self, artifacts_registry, artifacts, knowledge_base): + """Initializes an artifact definitions filter helper. + + Args: + artifacts_registry (artifacts.ArtifactDefinitionsRegistry]): artifact + definitions registry. + artifacts (list[str]): artifact names to process. + path (str): path to a file that contains one or more forensic artifacts. + knowledge_base (KnowledgeBase): Knowledge base for Log2Timeline. + """ + super(ArtifactDefinitionsFilterHelper, self).__init__() + self._artifacts_registry = artifacts_registry + self._artifacts = artifacts + self._knowledge_base = knowledge_base + + def BuildFindSpecs(self, environment_variables=None): + """Build find specification from a forensic artifact definitions. + + Args: + environment_variables (Optional[list[EnvironmentVariableArtifact]]): + environment variables. + """ + find_specs = {} + + artifact_defintions = [] + # Check if single operating system type has been provided, in that case + # process all artifact types for that OS. + if (len(self._artifacts) == 1 and + self._artifacts[0] in self.STANDARD_OS_FILTERS): + for definition in self._artifacts_registry.GetDefinitions(): + if self._filters[0].lower() in ( + os.lower() for os in definition.supported_os): + artifact_defintions.append(definition) + else: + for artifact_filter in self._artifacts: + if self._artifacts_registry.GetDefinitionByName(artifact_filter): + artifact_defintions.append( + self._artifacts_registry.GetDefinitionByName(artifact_filter)) + + for definition in artifact_defintions: + for source in definition.sources: + if source.type_indicator == artifact_types.TYPE_INDICATOR_FILE: + for path_entry in source.paths: + self.BuildFindSpecsFromFileArtifact( + path_entry, source.separator, environment_variables, find_specs) + elif (source.type_indicator == + artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY): + keys = set(source.keys) + for key_entry in keys: + if self._CheckKeyCompatibility(key_entry): + self.BuildFindSpecsFromRegistryArtifact( + key_entry, find_specs) + elif (source.type_indicator == + artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_VALUE): + # TODO: Handle Registry Values Once Supported in dfwinreg. + logging.warning(('Unable to handle Registry Value, extracting ' + 'key only: {0:s} ').format(source.key_value_pairs)) + + for key_pair in source.key_value_pairs: + keys = set() + keys.add(key_pair.get('key')) + for key_entry in keys: + if self._CheckKeyCompatibility(key_entry): + self.BuildFindSpecsFromRegistryArtifact( + key_entry, find_specs) + else: + logging.warning(('Unable to handle artifact, plaso does not ' + 'support: {0:s} ').format(source.type_indicator)) + + self._knowledge_base.SetValue(self.ARTIFACT_FILTERS, find_specs) + + def BuildFindSpecsFromFileArtifact( + self, path_entry, separator, environment_variables, find_specs): + """Build find specification from a FILE artifact type. + + Args: + path_entry (str): Current file system path to add. + environment variables. + separator (str): File system path separator. + environment_variables list(str): Environment variable attributes used to + dynamically populate environment variables in key. + find_specs dict[artifacts.artifact_types]: Dictionary containing + find_specs. + """ + for glob_path in self._ExpandRecursiveGlobs(path_entry): + for path in path_helper.PathHelper.ExpandUserHomeDirPath( + glob_path, self._knowledge_base.user_accounts): + if '%' in path: + path = path_helper.PathHelper.ExpandWindowsPath( + path, environment_variables) + + if not path.startswith('/') and not path.startswith('\\'): + logging.warning(( + 'The path filter must be defined as an absolute path: ' + '{0:s}').format(path)) + continue + + # Convert the path filters into a list of path segments and + # strip the root path segment. + path_segments = path.split(separator) + + # If the source didn't specify a separator, '/' is returned by + # default from ForensicArtifacts, this is sometimes wrong. Thus, + # need to check if '\' characters are still present and split on those. + if len(path_segments) == 1 and '\\' in path_segments[0]: + logging.warning('Potentially bad separator = {0:s} , trying \'\\\'' + .format(separator)) + path_segments = path.split('\\') + path_segments.pop(0) + + if not path_segments[-1]: + logging.warning( + 'Empty last path segment in path filter: {0:s}'.format(path)) + path_segments.pop(-1) + + try: + find_spec = file_system_searcher.FindSpec( + location_glob=path_segments, case_sensitive=False) + except ValueError as exception: + logging.error(( + 'Unable to build find spec for path: {0:s} with error: {1!s}' + ).format(path, exception)) + continue + if artifact_types.TYPE_INDICATOR_FILE not in find_specs: + find_specs[artifact_types.TYPE_INDICATOR_FILE] = [] + find_specs[artifact_types.TYPE_INDICATOR_FILE].append( + find_spec) + + def BuildFindSpecsFromRegistryArtifact( + self, key_entry, find_specs): + """Build find specification from a Windows registry artifact type. + + Args: + key_entry (str): Current file system key to add. + find_specs dict[artifacts.artifact_types]: Dictionary containing + find_specs. + """ + for key in self._ExpandRecursiveGlobs(key_entry): + if '%%' in key: + logging.error(( + 'Unable to expand path filter: {0:s}').format(key)) + continue + find_spec = registry_searcher.FindSpec(key_path_glob=key) + if artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY not in find_specs: + find_specs[artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY] = [] + + find_specs[artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY].append( + find_spec) + + def _CheckKeyCompatibility(self, key): + """Check if a registry key is compatible with dfwinreg. + + Args: + key (str): String key to to check for dfwinreg compatibility. + + Returns: + (bool): Boolean whether key is compatible or not. + """ + key_path_prefix = key.split('\\')[0] + if key_path_prefix.upper() in self.COMPATIBLE_DFWINREG_KEYS: + return True + logging.warning('Key {0:s}, has a prefix {1:s} that is not supported ' + 'by dfwinreg presently'.format(key, key_path_prefix)) + return False + + def _ExpandRecursiveGlobs(self, path): + """Expand recursive like globs present in an artifact path. + + If a path ends in '**', with up two optional digits such as '**10', + the '**' will match all files and zero or more directories and + subdirectories from the specified path recursively. The optional digits + provide the depth to which the recursion should continue. By default + recursion depth is 10 directories. If the pattern is followed by a ‘/’ + or '\', only directories and subdirectories match. + + Args: + path (str): String path to be expanded. + + Returns: + list[str]: String path expanded for each glob. + """ + + match = re.search(r'(.*)?(\\|/)\*\*(\d{1,2})?(\\|/)?$', path) + if match: + skip_first = False + if match.group(4): + skip_first = True + if match.group(3): + iterations = match.group(3) + else: + iterations = self.RECURSIVE_GLOB_LIMIT + logging.warning('Path {0:s} contains fully recursive glob, limiting ' + 'to ten levels'.format(path)) + paths = self._BuildRecursivePaths(match.group(1), iterations, skip_first) + return paths + else: + return [path] + + def _BuildRecursivePaths(self, path, count, skip_first): + """Append wildcard entries to end of path. + + Args: + path (str): Path to append wildcards to. + + Returns: + path list[str]: Paths expanded with wildcards. + """ + paths = [] + if '\\' in path: + replacement = '\\*' + else: + replacement = '/*' + + for iteration in range(count): + if skip_first and iteration == 0: + continue + else: + path += replacement + paths.append(path) + iteration += 1 + + return paths diff --git a/test_data/artifacts/artifacts_filters.yaml b/test_data/artifacts/artifacts_filters.yaml new file mode 100644 index 0000000000..19723f8567 --- /dev/null +++ b/test_data/artifacts/artifacts_filters.yaml @@ -0,0 +1,66 @@ +# Artifact definitions. + +name: TestFiles +doc: Test Doc +sources: +- type: FILE + attributes: + paths: ['%%environ_systemdrive%%\AUTHORS'] + separator: '\' +labels: [System] +supported_os: [Windows] +--- +name: TestFiles2 +doc: Test Doc2 +sources: +- type: FILE + attributes: + paths: + - '%%environ_systemdrive%%\test_data\*.evtx' + - '\test_data\testdir\filter_*.txt' + - '\does_not_exist\some_file_*.txt' + - '\globbed\test\path\**\' + - 'failing' + separator: '\' +labels: [System] +supported_os: [Windows] +--- +name: TestRegistry +doc: Test Registry Doc +sources: +- type: REGISTRY_KEY + attributes: + keys: ['HKEY_LOCAL_MACHINE\System\CurrentControlSet\Control\SecurityProviders\*'] +supported_os: [Windows] +--- +name: TestRegistryKey +doc: Test Registry Doc Key +sources: +- type: REGISTRY_KEY + attributes: + keys: + - 'HKEY_LOCAL_MACHINE\System\ControlSet001\services\**\' + - 'HKEY_LOCAL_MACHINE\System\ControlSet002\services\**\' + - 'HKEY_LOCAL_MACHINE\System\CurrentControlSet\Enum\USBSTOR' + - 'HKEY_LOCAL_MACHINE\System\CurrentControlSet\Enum\USBSTOR\**' +supported_os: [Windows] +--- +name: TestRegistryValue +doc: Test Registry Doc Value +sources: +- type: REGISTRY_VALUE + attributes: + key_value_pairs: + - {key: 'HKEY_LOCAL_MACHINE\System\ControlSet001\Control\Session Manager', value: 'BootExecute'} + - {key: 'HKEY_LOCAL_MACHINE\System\ControlSet002\Control\Session Manager', value: 'BootExecute'} +supported_os: [Windows] +--- +name: TestFilesImageExport +doc: Test Doc +sources: +- type: FILE + attributes: + paths: ['\a_directory\*_file'] + separator: '\' +labels: [System] +supported_os: [Windows] \ No newline at end of file diff --git a/tests/engine/artifact_filters.py b/tests/engine/artifact_filters.py new file mode 100644 index 0000000000..a0436ac80c --- /dev/null +++ b/tests/engine/artifact_filters.py @@ -0,0 +1,164 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +"""Tests for the artifacts file filter functions.""" + +from __future__ import unicode_literals + +import unittest + +from artifacts import definitions as artifact_types +from artifacts import reader as artifacts_reader +from artifacts import registry as artifacts_registry + +from dfwinreg import registry as dfwinreg_registry +from dfwinreg import registry_searcher as dfwinreg_registry_searcher + +from dfvfs.helpers import file_system_searcher +from dfvfs.lib import definitions as dfvfs_definitions +from dfvfs.path import factory as path_spec_factory +from dfvfs.resolver import resolver as path_spec_resolver + +from plaso.containers import artifacts +from plaso.engine import artifact_filters +from plaso.engine import knowledge_base as knowledge_base_engine +from plaso.parsers import winreg as windows_registry_parser + +from tests import test_lib as shared_test_lib + + +class BuildFindSpecsFromFileTest(shared_test_lib.BaseTestCase): + """Tests for the BuildFindSpecsFromFile function.""" + + @shared_test_lib.skipUnlessHasTestFile(['artifacts']) + @shared_test_lib.skipUnlessHasTestFile(['System.evtx']) + @shared_test_lib.skipUnlessHasTestFile(['testdir', 'filter_1.txt']) + @shared_test_lib.skipUnlessHasTestFile(['testdir', 'filter_3.txt']) + def testBuildFileFindSpecs(self): + """Tests the BuildFindSpecs function for file type artifacts.""" + knowledge_base = knowledge_base_engine.KnowledgeBase() + + artifact_definitions = ['TestFiles', 'TestFiles2'] + registry = artifacts_registry.ArtifactDefinitionsRegistry() + reader = artifacts_reader.YamlArtifactsReader() + + registry.ReadFromDirectory(reader, self._GetTestFilePath(['artifacts'])) + + test_filter_file = artifact_filters.ArtifactDefinitionsFilterHelper( + registry, artifact_definitions, knowledge_base) + + environment_variable = artifacts.EnvironmentVariableArtifact( + case_sensitive=False, name='SystemDrive', value='C:') + + test_filter_file.BuildFindSpecs( + environment_variables=[environment_variable]) + find_specs = knowledge_base.GetValue( + artifact_filters.ArtifactDefinitionsFilterHelper.ARTIFACT_FILTERS) + + self.assertEqual(len(find_specs[artifact_types.TYPE_INDICATOR_FILE]), 13) + + path_spec = path_spec_factory.Factory.NewPathSpec( + dfvfs_definitions.TYPE_INDICATOR_OS, location='.') + file_system = path_spec_resolver.Resolver.OpenFileSystem(path_spec) + searcher = file_system_searcher.FileSystemSearcher( + file_system, path_spec) + + path_spec_generator = searcher.Find( + find_specs=find_specs[artifact_types.TYPE_INDICATOR_FILE]) + self.assertIsNotNone(path_spec_generator) + + path_specs = list(path_spec_generator) + # Two evtx, one symbolic link to evtx, one AUTHORS, two filter_*.txt files, + # total 6 path specifications. + self.assertEqual(len(path_specs), 6) + + file_system.Close() + + @shared_test_lib.skipUnlessHasTestFile(['artifacts']) + @shared_test_lib.skipUnlessHasTestFile(['SYSTEM']) + def testBuildRegistryFindSpecs(self): + """Tests the BuildFindSpecs function for registry artifacts.""" + knowledge_base = knowledge_base_engine.KnowledgeBase() + + artifact_definitions = ['TestRegistry'] + registry = artifacts_registry.ArtifactDefinitionsRegistry() + reader = artifacts_reader.YamlArtifactsReader() + + registry.ReadFromDirectory(reader, self._GetTestFilePath(['artifacts'])) + + test_filter_file = artifact_filters.ArtifactDefinitionsFilterHelper( + registry, artifact_definitions, knowledge_base) + + test_filter_file.BuildFindSpecs(environment_variables=None) + find_specs = knowledge_base.GetValue( + artifact_filters.ArtifactDefinitionsFilterHelper.ARTIFACT_FILTERS) + + self.assertEqual( + len(find_specs[artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY]), 1) + + win_registry_reader = ( + windows_registry_parser.FileObjectWinRegistryFileReader()) + + file_entry = self._GetTestFileEntry(['SYSTEM']) + file_object = file_entry.GetFileObject() + + registry_file = win_registry_reader.Open(file_object) + + win_registry = dfwinreg_registry.WinRegistry() + key_path_prefix = win_registry.GetRegistryFileMapping(registry_file) + registry_file.SetKeyPathPrefix(key_path_prefix) + win_registry.MapFile(key_path_prefix, registry_file) + + searcher = dfwinreg_registry_searcher.WinRegistrySearcher(win_registry) + key_paths = list(searcher.Find(find_specs=find_specs[ + artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY])) + + self.assertIsNotNone(key_paths) + + # Three key paths found + self.assertEqual(len(key_paths), 3) + + + def testBuildRegistryFindSpecs(self): + """Tests the BuildFindSpecs function for registry artifacts.""" + knowledge_base = knowledge_base_engine.KnowledgeBase() + + artifact_definitions = ['TestRegistry'] + registry = artifacts_registry.ArtifactDefinitionsRegistry() + reader = artifacts_reader.YamlArtifactsReader() + + registry.ReadFromDirectory(reader, self._GetTestFilePath(['artifacts'])) + + test_filter_file = artifact_filters.ArtifactDefinitionsFilterHelper( + registry, artifact_definitions, knowledge_base) + + test_filter_file.BuildFindSpecs(environment_variables=None) + find_specs = knowledge_base.GetValue( + artifact_filters.ArtifactDefinitionsFilterHelper.ARTIFACT_FILTERS) + + self.assertEqual( + len(find_specs[artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY]), 1) + + win_registry_reader = ( + windows_registry_parser.FileObjectWinRegistryFileReader()) + + file_entry = self._GetTestFileEntry(['SYSTEM']) + file_object = file_entry.GetFileObject() + + registry_file = win_registry_reader.Open(file_object) + + win_registry = dfwinreg_registry.WinRegistry() + key_path_prefix = win_registry.GetRegistryFileMapping(registry_file) + registry_file.SetKeyPathPrefix(key_path_prefix) + win_registry.MapFile(key_path_prefix, registry_file) + + searcher = dfwinreg_registry_searcher.WinRegistrySearcher(win_registry) + key_paths = list(searcher.Find(find_specs=find_specs[ + artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY])) + + self.assertIsNotNone(key_paths) + + # Three key paths found + self.assertEqual(len(key_paths), 3) + +if __name__ == '__main__': + unittest.main() From 9f75e53f72cc35e3f85e794b665db1ab7c2552c2 Mon Sep 17 00:00:00 2001 From: James Nettesheim Date: Thu, 10 May 2018 16:07:55 -0700 Subject: [PATCH 05/17] Minor code changes and adding path_helper changes --- plaso/engine/artifact_filters.py | 2 +- plaso/engine/path_helper.py | 39 ++++++++++++++++++++++++++++- tests/engine/artifact_filters.py | 43 -------------------------------- 3 files changed, 39 insertions(+), 45 deletions(-) diff --git a/plaso/engine/artifact_filters.py b/plaso/engine/artifact_filters.py index b7422ad082..2b7f953b16 100644 --- a/plaso/engine/artifact_filters.py +++ b/plaso/engine/artifact_filters.py @@ -65,7 +65,7 @@ def BuildFindSpecs(self, environment_variables=None): if (len(self._artifacts) == 1 and self._artifacts[0] in self.STANDARD_OS_FILTERS): for definition in self._artifacts_registry.GetDefinitions(): - if self._filters[0].lower() in ( + if self._artifacts[0].lower() in ( os.lower() for os in definition.supported_os): artifact_defintions.append(definition) else: diff --git a/plaso/engine/path_helper.py b/plaso/engine/path_helper.py index b3867cd5b8..7aa85aca63 100644 --- a/plaso/engine/path_helper.py +++ b/plaso/engine/path_helper.py @@ -3,6 +3,8 @@ from __future__ import unicode_literals +import re + from dfvfs.lib import definitions as dfvfs_definitions from plaso.lib import py2to3 @@ -23,6 +25,8 @@ def ExpandWindowsPath(cls, path, environment_variables): Returns: str: expanded Windows path. """ + #TODO: Add support for items such as %%users.localappdata%% + if environment_variables is None: environment_variables = [] @@ -42,9 +46,19 @@ def ExpandWindowsPath(cls, path, environment_variables): not path_segment.endswith('%')): continue - lookup_key = path_segment.upper()[1:-1] + check_for_drive_letter = False + if path_segment.upper().startswith('%%ENVIRON_'): + lookup_key = path_segment.upper()[10:-2] + check_for_drive_letter = True + else: + lookup_key = path_segment.upper()[1:-1] path_segments[index] = lookup_table.get(lookup_key, path_segment) + if check_for_drive_letter: + # Remove the drive letter. + if len(path_segments[index]) >= 2 and path_segments[index][1] == ':': + _, _, path_segments[index] = path_segments[index].rpartition(':') + return '\\'.join(path_segments) @classmethod @@ -133,3 +147,26 @@ def GetRelativePathForPathSpec(cls, path_spec, mount_path=None): location = location[len(mount_path):] return location + + @classmethod + def ExpandUserHomeDirPath(cls, path, user_accounts): + """Expands a path to contain all users home directories. + + Args: + path (str): Windows path with environment variables. + user_accounts (list[UserAccountArtifact]): user accounts. + + Returns: + list [str]: paths returned for user accounts. + """ + + user_paths = [] + if path.upper().startswith('%%USERS.HOMEDIR%%'): + regex = re.compile(re.escape('%%users.homedir%%')) + for user_account in user_accounts: + new_path = regex.sub(user_account.user_directory, path) + user_paths.append(new_path) + else: + user_paths = [path] + + return user_paths diff --git a/tests/engine/artifact_filters.py b/tests/engine/artifact_filters.py index a0436ac80c..42b9611b4a 100644 --- a/tests/engine/artifact_filters.py +++ b/tests/engine/artifact_filters.py @@ -117,48 +117,5 @@ def testBuildRegistryFindSpecs(self): # Three key paths found self.assertEqual(len(key_paths), 3) - - def testBuildRegistryFindSpecs(self): - """Tests the BuildFindSpecs function for registry artifacts.""" - knowledge_base = knowledge_base_engine.KnowledgeBase() - - artifact_definitions = ['TestRegistry'] - registry = artifacts_registry.ArtifactDefinitionsRegistry() - reader = artifacts_reader.YamlArtifactsReader() - - registry.ReadFromDirectory(reader, self._GetTestFilePath(['artifacts'])) - - test_filter_file = artifact_filters.ArtifactDefinitionsFilterHelper( - registry, artifact_definitions, knowledge_base) - - test_filter_file.BuildFindSpecs(environment_variables=None) - find_specs = knowledge_base.GetValue( - artifact_filters.ArtifactDefinitionsFilterHelper.ARTIFACT_FILTERS) - - self.assertEqual( - len(find_specs[artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY]), 1) - - win_registry_reader = ( - windows_registry_parser.FileObjectWinRegistryFileReader()) - - file_entry = self._GetTestFileEntry(['SYSTEM']) - file_object = file_entry.GetFileObject() - - registry_file = win_registry_reader.Open(file_object) - - win_registry = dfwinreg_registry.WinRegistry() - key_path_prefix = win_registry.GetRegistryFileMapping(registry_file) - registry_file.SetKeyPathPrefix(key_path_prefix) - win_registry.MapFile(key_path_prefix, registry_file) - - searcher = dfwinreg_registry_searcher.WinRegistrySearcher(win_registry) - key_paths = list(searcher.Find(find_specs=find_specs[ - artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY])) - - self.assertIsNotNone(key_paths) - - # Three key paths found - self.assertEqual(len(key_paths), 3) - if __name__ == '__main__': unittest.main() From 15ebd7832ac5786b0372cf8c96fbd3bc65f7bc27 Mon Sep 17 00:00:00 2001 From: James Nettesheim Date: Mon, 14 May 2018 16:33:59 -0700 Subject: [PATCH 06/17] Docstring updates --- plaso/engine/artifact_filters.py | 18 +++++++++--------- plaso/engine/path_helper.py | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/plaso/engine/artifact_filters.py b/plaso/engine/artifact_filters.py index 2b7f953b16..c011aec145 100644 --- a/plaso/engine/artifact_filters.py +++ b/plaso/engine/artifact_filters.py @@ -108,7 +108,7 @@ def BuildFindSpecs(self, environment_variables=None): def BuildFindSpecsFromFileArtifact( self, path_entry, separator, environment_variables, find_specs): - """Build find specification from a FILE artifact type. + """Build find specifications from a FILE artifact type. Args: path_entry (str): Current file system path to add. @@ -165,7 +165,7 @@ def BuildFindSpecsFromFileArtifact( def BuildFindSpecsFromRegistryArtifact( self, key_entry, find_specs): - """Build find specification from a Windows registry artifact type. + """Build find specifications from a Windows registry artifact type. Args: key_entry (str): Current file system key to add. @@ -203,12 +203,12 @@ def _CheckKeyCompatibility(self, key): def _ExpandRecursiveGlobs(self, path): """Expand recursive like globs present in an artifact path. - If a path ends in '**', with up two optional digits such as '**10', - the '**' will match all files and zero or more directories and - subdirectories from the specified path recursively. The optional digits - provide the depth to which the recursion should continue. By default - recursion depth is 10 directories. If the pattern is followed by a ‘/’ - or '\', only directories and subdirectories match. + If a path ends in '**', with up to two optional digits such as '**10', + the '**' will recursively match all files and zero or more directories + from the specified path. The optional digits indicate the recursion depth. + By default recursion depth is 10 directories. + If the glob is followed by a ‘/’ or '\', only directories and + subdirectories will be matched. Args: path (str): String path to be expanded. @@ -240,7 +240,7 @@ def _BuildRecursivePaths(self, path, count, skip_first): path (str): Path to append wildcards to. Returns: - path list[str]: Paths expanded with wildcards. + paths list[str]: Paths expanded with wildcards. """ paths = [] if '\\' in path: diff --git a/plaso/engine/path_helper.py b/plaso/engine/path_helper.py index 7aa85aca63..e67b4a3ed0 100644 --- a/plaso/engine/path_helper.py +++ b/plaso/engine/path_helper.py @@ -150,7 +150,7 @@ def GetRelativePathForPathSpec(cls, path_spec, mount_path=None): @classmethod def ExpandUserHomeDirPath(cls, path, user_accounts): - """Expands a path to contain all users home directories. + """Expands a Windows path to contain all user home directories. Args: path (str): Windows path with environment variables. From fc73658d30a54953ba436ab1e010956e4cda5642 Mon Sep 17 00:00:00 2001 From: James Nettesheim Date: Sat, 26 May 2018 23:53:22 -0700 Subject: [PATCH 07/17] Code Review Changes --- plaso/engine/artifact_filters.py | 166 ++++++--------------- plaso/engine/path_helper.py | 77 +++++++++- test_data/artifacts/artifacts_filters.yaml | 1 + tests/engine/artifact_filters.py | 137 ++++++++++++++++- tests/engine/path_helper.py | 81 ++++++++++ 5 files changed, 333 insertions(+), 129 deletions(-) diff --git a/plaso/engine/artifact_filters.py b/plaso/engine/artifact_filters.py index c011aec145..09d48da704 100644 --- a/plaso/engine/artifact_filters.py +++ b/plaso/engine/artifact_filters.py @@ -10,30 +10,19 @@ from dfvfs.helpers import file_system_searcher from dfwinreg import registry_searcher - from plaso.engine import path_helper class ArtifactDefinitionsFilterHelper(object): - """Helper to create filters based on artifact defintions. + """Helper to create filters based on artifact definitions. - Builds extraction and parsing filters from forensic artifact definitions. + Builds extraction filters from forensic artifact definitions. For more information about Forensic Artifacts see: https://github.com/ForensicArtifacts/artifacts/blob/master/docs/Artifacts%20definition%20format%20and%20style%20guide.asciidoc """ ARTIFACT_FILTERS = 'ARTIFACT_FILTERS' - COMPATIBLE_DFWINREG_KEYS = frozenset([ - 'HKEY_LOCAL_MACHINE', - 'HKEY_LOCAL_MACHINE\\SYSTEM', - 'HKEY_LOCAL_MACHINE\\SOFTWARE', - 'HKEY_LOCAL_MACHINE\\SAM', - 'HKEY_LOCAL_MACHINE\\SECURITY']) - STANDARD_OS_FILTERS = frozenset([ - 'windows', - 'linux', - 'darwin']) - RECURSIVE_GLOB_LIMIT = 10 + _COMPATIBLE_DFWINREG_KEYS = ('HKEY_LOCAL_MACHINE') def __init__(self, artifacts_registry, artifacts, knowledge_base): """Initializes an artifact definitions filter helper. @@ -51,7 +40,7 @@ def __init__(self, artifacts_registry, artifacts, knowledge_base): self._knowledge_base = knowledge_base def BuildFindSpecs(self, environment_variables=None): - """Build find specification from a forensic artifact definitions. + """Builds find specification from a forensic artifact definitions. Args: environment_variables (Optional[list[EnvironmentVariableArtifact]]): @@ -59,27 +48,19 @@ def BuildFindSpecs(self, environment_variables=None): """ find_specs = {} - artifact_defintions = [] - # Check if single operating system type has been provided, in that case - # process all artifact types for that OS. - if (len(self._artifacts) == 1 and - self._artifacts[0] in self.STANDARD_OS_FILTERS): - for definition in self._artifacts_registry.GetDefinitions(): - if self._artifacts[0].lower() in ( - os.lower() for os in definition.supported_os): - artifact_defintions.append(definition) - else: - for artifact_filter in self._artifacts: - if self._artifacts_registry.GetDefinitionByName(artifact_filter): - artifact_defintions.append( - self._artifacts_registry.GetDefinitionByName(artifact_filter)) + artifact_definitions = [] + for artifact_filter in self._artifacts: + if self._artifacts_registry.GetDefinitionByName(artifact_filter): + artifact_definitions.append( + self._artifacts_registry.GetDefinitionByName(artifact_filter)) - for definition in artifact_defintions: + for definition in artifact_definitions: for source in definition.sources: if source.type_indicator == artifact_types.TYPE_INDICATOR_FILE: for path_entry in source.paths: self.BuildFindSpecsFromFileArtifact( - path_entry, source.separator, environment_variables, find_specs) + path_entry, source.separator, environment_variables, + self._knowledge_base.user_accounts, find_specs) elif (source.type_indicator == artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY): keys = set(source.keys) @@ -90,24 +71,26 @@ def BuildFindSpecs(self, environment_variables=None): elif (source.type_indicator == artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_VALUE): # TODO: Handle Registry Values Once Supported in dfwinreg. + # https://github.com/log2timeline/dfwinreg/issues/98 logging.warning(('Unable to handle Registry Value, extracting ' - 'key only: {0:s} ').format(source.key_value_pairs)) + 'key only: "{0:s}"').format(source.key_value_pairs)) for key_pair in source.key_value_pairs: keys = set() keys.add(key_pair.get('key')) for key_entry in keys: if self._CheckKeyCompatibility(key_entry): - self.BuildFindSpecsFromRegistryArtifact( - key_entry, find_specs) + self.BuildFindSpecsFromRegistryArtifact(key_entry, find_specs) else: logging.warning(('Unable to handle artifact, plaso does not ' - 'support: {0:s} ').format(source.type_indicator)) + 'support: "{0:s}"').format(source.type_indicator)) self._knowledge_base.SetValue(self.ARTIFACT_FILTERS, find_specs) + @classmethod def BuildFindSpecsFromFileArtifact( - self, path_entry, separator, environment_variables, find_specs): + self, path_entry, separator, environment_variables, user_accounts, + find_specs): """Build find specifications from a FILE artifact type. Args: @@ -116,38 +99,35 @@ def BuildFindSpecsFromFileArtifact( separator (str): File system path separator. environment_variables list(str): Environment variable attributes used to dynamically populate environment variables in key. + user_accounts list(str): Identified user accounts stored in the + knowledge base. find_specs dict[artifacts.artifact_types]: Dictionary containing find_specs. """ - for glob_path in self._ExpandRecursiveGlobs(path_entry): - for path in path_helper.PathHelper.ExpandUserHomeDirPath( - glob_path, self._knowledge_base.user_accounts): + for glob_path in path_helper.PathHelper.ExpandRecursiveGlobs( + path_entry, separator): + for path in path_helper.PathHelper.ExpandUserHomeDirectoryPath( + glob_path, user_accounts): if '%' in path: path = path_helper.PathHelper.ExpandWindowsPath( path, environment_variables) - if not path.startswith('/') and not path.startswith('\\'): + if not path.startswith(separator): logging.warning(( 'The path filter must be defined as an absolute path: ' - '{0:s}').format(path)) + '"{0:s}"').format(path)) continue # Convert the path filters into a list of path segments and # strip the root path segment. path_segments = path.split(separator) - # If the source didn't specify a separator, '/' is returned by - # default from ForensicArtifacts, this is sometimes wrong. Thus, - # need to check if '\' characters are still present and split on those. - if len(path_segments) == 1 and '\\' in path_segments[0]: - logging.warning('Potentially bad separator = {0:s} , trying \'\\\'' - .format(separator)) - path_segments = path.split('\\') + # Remove initial root entry path_segments.pop(0) if not path_segments[-1]: logging.warning( - 'Empty last path segment in path filter: {0:s}'.format(path)) + 'Empty last path segment in path filter: "{0:s}"'.format(path)) path_segments.pop(-1) try: @@ -155,27 +135,28 @@ def BuildFindSpecsFromFileArtifact( location_glob=path_segments, case_sensitive=False) except ValueError as exception: logging.error(( - 'Unable to build find spec for path: {0:s} with error: {1!s}' + 'Unable to build find spec for path: "{0:s}" with error: "{1!s}"' ).format(path, exception)) continue if artifact_types.TYPE_INDICATOR_FILE not in find_specs: find_specs[artifact_types.TYPE_INDICATOR_FILE] = [] - find_specs[artifact_types.TYPE_INDICATOR_FILE].append( - find_spec) + find_specs[artifact_types.TYPE_INDICATOR_FILE].append(find_spec) + @classmethod def BuildFindSpecsFromRegistryArtifact( self, key_entry, find_specs): """Build find specifications from a Windows registry artifact type. Args: - key_entry (str): Current file system key to add. + key_entry (str): Current file system key to add. find_specs dict[artifacts.artifact_types]: Dictionary containing find_specs. """ - for key in self._ExpandRecursiveGlobs(key_entry): + separator = '\\' + for key in path_helper.PathHelper.ExpandRecursiveGlobs( + key_entry, separator): if '%%' in key: - logging.error(( - 'Unable to expand path filter: {0:s}').format(key)) + logging.error(('Unable to expand path filter: "{0:s}"').format(key)) continue find_spec = registry_searcher.FindSpec(key_path_glob=key) if artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY not in find_specs: @@ -184,76 +165,19 @@ def BuildFindSpecsFromRegistryArtifact( find_specs[artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY].append( find_spec) - def _CheckKeyCompatibility(self, key): - """Check if a registry key is compatible with dfwinreg. + @staticmethod + def _CheckKeyCompatibility(key): + """Checks if a Windows Registry key is compatible with dfwinreg. Args: key (str): String key to to check for dfwinreg compatibility. Returns: - (bool): Boolean whether key is compatible or not. + (bool): True if key is compatible or False if not. """ - key_path_prefix = key.split('\\')[0] - if key_path_prefix.upper() in self.COMPATIBLE_DFWINREG_KEYS: + if key.startswith( + ArtifactDefinitionsFilterHelper._COMPATIBLE_DFWINREG_KEYS): return True - logging.warning('Key {0:s}, has a prefix {1:s} that is not supported ' - 'by dfwinreg presently'.format(key, key_path_prefix)) + logging.warning('Key "{0:s}", has a prefix that is not supported ' + 'by dfwinreg presently'.format(key)) return False - - def _ExpandRecursiveGlobs(self, path): - """Expand recursive like globs present in an artifact path. - - If a path ends in '**', with up to two optional digits such as '**10', - the '**' will recursively match all files and zero or more directories - from the specified path. The optional digits indicate the recursion depth. - By default recursion depth is 10 directories. - If the glob is followed by a ‘/’ or '\', only directories and - subdirectories will be matched. - - Args: - path (str): String path to be expanded. - - Returns: - list[str]: String path expanded for each glob. - """ - - match = re.search(r'(.*)?(\\|/)\*\*(\d{1,2})?(\\|/)?$', path) - if match: - skip_first = False - if match.group(4): - skip_first = True - if match.group(3): - iterations = match.group(3) - else: - iterations = self.RECURSIVE_GLOB_LIMIT - logging.warning('Path {0:s} contains fully recursive glob, limiting ' - 'to ten levels'.format(path)) - paths = self._BuildRecursivePaths(match.group(1), iterations, skip_first) - return paths - else: - return [path] - - def _BuildRecursivePaths(self, path, count, skip_first): - """Append wildcard entries to end of path. - - Args: - path (str): Path to append wildcards to. - - Returns: - paths list[str]: Paths expanded with wildcards. - """ - paths = [] - if '\\' in path: - replacement = '\\*' - else: - replacement = '/*' - - for iteration in range(count): - if skip_first and iteration == 0: - continue - else: - path += replacement - paths.append(path) - iteration += 1 - - return paths diff --git a/plaso/engine/path_helper.py b/plaso/engine/path_helper.py index e67b4a3ed0..2eda528221 100644 --- a/plaso/engine/path_helper.py +++ b/plaso/engine/path_helper.py @@ -3,6 +3,7 @@ from __future__ import unicode_literals +import logging import re from dfvfs.lib import definitions as dfvfs_definitions @@ -13,6 +14,8 @@ class PathHelper(object): """Class that implements the path helper.""" + RECURSIVE_GLOB_LIMIT = 10 + @classmethod def ExpandWindowsPath(cls, path, environment_variables): """Expands a Windows path containing environment variables. @@ -149,8 +152,8 @@ def GetRelativePathForPathSpec(cls, path_spec, mount_path=None): return location @classmethod - def ExpandUserHomeDirPath(cls, path, user_accounts): - """Expands a Windows path to contain all user home directories. + def ExpandUserHomeDirectoryPath(cls, path, user_accounts): + """Expands a path to contain all user home directories. Args: path (str): Windows path with environment variables. @@ -165,8 +168,78 @@ def ExpandUserHomeDirPath(cls, path, user_accounts): regex = re.compile(re.escape('%%users.homedir%%')) for user_account in user_accounts: new_path = regex.sub(user_account.user_directory, path) + # Remove the drive letter, if it exists. + if len(new_path) > 2 and new_path[1] == ':': + _, _, new_path = new_path.rpartition(':') user_paths.append(new_path) else: user_paths = [path] return user_paths + + @classmethod + def ExpandRecursiveGlobs(cls, path, separator): + """Expands recursive like globs present in an artifact path. + + If a path ends in '**', with up to two optional digits such as '**10', + the '**' will recursively match all files and zero or more directories + from the specified path. The optional digits indicate the recursion depth. + By default recursion depth is 10 directories. + If the glob is followed by the specified separator, only directories and + subdirectories will be matched. + + Args: + path (str): Path to be expanded. + separator (str): Delimiter for this path from its artifact definition. + + Returns: + list[str]: String path expanded for each glob. + """ + glob_regex = r'(.*)?{0}\*\*(\d{{1,2}})?({0})?$'.format(re.escape(separator)) + match = re.search(glob_regex, path) + if match: + skip_first = False + if match.group(3): + skip_first = True + if match.group(2): + iterations = int(match.group(2)) + else: + iterations = cls.RECURSIVE_GLOB_LIMIT + logging.warning('Path "{0:s}" contains fully recursive glob, limiting ' + 'to 10 levels'.format(path)) + paths = cls.AppendPathEntries( + match.group(1), separator, iterations, skip_first) + return paths + else: + return [path] + + @classmethod + def AppendPathEntries(cls, path, separator, count, skip_first): + """Appends wildcard entries to end of path. + + Will append wildcard * to given path building a list of strings for "count" + iterations, skipping the first directory if skip_first is true. + + Args: + path (str): Path to append wildcards to. + separator (str): Delimiter for this path from its artifact definition. + count (int): Number of entries to be appended. + skip_first (bool): Whether or not to skip first entry to append. + + + Returns: + list[str]: Paths that were expanded from the path with wildcards. + """ + paths = [] + replacement = '{0}*'.format(separator) + + iteration = 0 + while iteration < count: + if skip_first and iteration == 0: + path += replacement + else: + path += replacement + paths.append(path) + iteration += 1 + + return paths \ No newline at end of file diff --git a/test_data/artifacts/artifacts_filters.yaml b/test_data/artifacts/artifacts_filters.yaml index 19723f8567..7de1896d7f 100644 --- a/test_data/artifacts/artifacts_filters.yaml +++ b/test_data/artifacts/artifacts_filters.yaml @@ -17,6 +17,7 @@ sources: attributes: paths: - '%%environ_systemdrive%%\test_data\*.evtx' + - '%%users.homedir%%\Documents\WindowsPowerShell\profile.ps1' - '\test_data\testdir\filter_*.txt' - '\does_not_exist\some_file_*.txt' - '\globbed\test\path\**\' diff --git a/tests/engine/artifact_filters.py b/tests/engine/artifact_filters.py index 42b9611b4a..1b3e5f9cf2 100644 --- a/tests/engine/artifact_filters.py +++ b/tests/engine/artifact_filters.py @@ -33,9 +33,19 @@ class BuildFindSpecsFromFileTest(shared_test_lib.BaseTestCase): @shared_test_lib.skipUnlessHasTestFile(['System.evtx']) @shared_test_lib.skipUnlessHasTestFile(['testdir', 'filter_1.txt']) @shared_test_lib.skipUnlessHasTestFile(['testdir', 'filter_3.txt']) - def testBuildFileFindSpecs(self): + def testBuildFindSpecsWithFileSystem(self): """Tests the BuildFindSpecs function for file type artifacts.""" knowledge_base = knowledge_base_engine.KnowledgeBase() + testuser1 = artifacts.UserAccountArtifact( + identifier='1000', + user_directory='C:\\\\Users\\\\testuser1', + username='testuser1') + testuser2 = artifacts.UserAccountArtifact( + identifier='1001', + user_directory='C:\\\\Users\\\\testuser2', + username='testuser2') + knowledge_base.AddUserAccount(testuser1) + knowledge_base.AddUserAccount(testuser2) artifact_definitions = ['TestFiles', 'TestFiles2'] registry = artifacts_registry.ArtifactDefinitionsRegistry() @@ -52,9 +62,17 @@ def testBuildFileFindSpecs(self): test_filter_file.BuildFindSpecs( environment_variables=[environment_variable]) find_specs = knowledge_base.GetValue( - artifact_filters.ArtifactDefinitionsFilterHelper.ARTIFACT_FILTERS) + artifact_filters.ArtifactDefinitionsFilterHelper.ARTIFACT_FILTERS) + + # Should build 15 FindSpec entries. + self.assertEqual(len(find_specs[artifact_types.TYPE_INDICATOR_FILE]), 15) - self.assertEqual(len(find_specs[artifact_types.TYPE_INDICATOR_FILE]), 13) + # Last entry in find_specs list should be testuser2. + path_segments = ['Users', 'testuser2', 'Documents', 'WindowsPowerShell', + 'profile\\.ps1'] + self.assertEqual( + find_specs[artifact_types.TYPE_INDICATOR_FILE][2]._location_segments, + path_segments) path_spec = path_spec_factory.Factory.NewPathSpec( dfvfs_definitions.TYPE_INDICATOR_OS, location='.') @@ -67,6 +85,7 @@ def testBuildFileFindSpecs(self): self.assertIsNotNone(path_spec_generator) path_specs = list(path_spec_generator) + # Two evtx, one symbolic link to evtx, one AUTHORS, two filter_*.txt files, # total 6 path specifications. self.assertEqual(len(path_specs), 6) @@ -75,8 +94,8 @@ def testBuildFileFindSpecs(self): @shared_test_lib.skipUnlessHasTestFile(['artifacts']) @shared_test_lib.skipUnlessHasTestFile(['SYSTEM']) - def testBuildRegistryFindSpecs(self): - """Tests the BuildFindSpecs function for registry artifacts.""" + def testBuildFindSpecsWithRegistry(self): + """Tests the BuildFindSpecs function on Windows Registry artifacts.""" knowledge_base = knowledge_base_engine.KnowledgeBase() artifact_definitions = ['TestRegistry'] @@ -114,8 +133,114 @@ def testBuildRegistryFindSpecs(self): self.assertIsNotNone(key_paths) - # Three key paths found + # Three key paths found. self.assertEqual(len(key_paths), 3) + def test_CheckKeyCompatibility(self): + """Tests the _CheckKeyCompatibility function""" + + # Compatible Key. + key = 'HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Control' + compatible_key = (artifact_filters.ArtifactDefinitionsFilterHelper. + _CheckKeyCompatibility(key)) + self.assertTrue(compatible_key) + + # NOT a Compatible Key. + key = 'HKEY_USERS\S-1-5-18' + compatible_key = (artifact_filters.ArtifactDefinitionsFilterHelper. + _CheckKeyCompatibility(key)) + self.assertFalse(compatible_key) + + + def testBuildFindSpecsFromFileArtifact(self): + """Tests the BuildFindSpecsFromFileArtifact function for file artifacts.""" + find_specs = {} + separator = '\\' + user_accounts = [] + + # Test expansion of environment variables. + path_entry = '%%environ_systemroot%%\\test_data\\*.evtx' + environment_variable = [artifacts.EnvironmentVariableArtifact( + case_sensitive=False, name='SystemRoot', value='C:\\Windows')] + + (artifact_filters.ArtifactDefinitionsFilterHelper. + BuildFindSpecsFromFileArtifact( + path_entry, separator, environment_variable, user_accounts, find_specs)) + + # Should build 1 find_spec. + self.assertEqual(len(find_specs[artifact_types.TYPE_INDICATOR_FILE]), 1) + + # Location segments should be equivalent to \Windows\test_data\*.evtx. + path_segments = ['Windows', 'test\\_data', '.*\\.evtx'] + self.assertEqual( + find_specs[artifact_types.TYPE_INDICATOR_FILE][0]._location_segments, + path_segments) + + + # Test expansion of globs. + find_specs = {} + path_entry = '\\test_data\\**' + (artifact_filters.ArtifactDefinitionsFilterHelper. + BuildFindSpecsFromFileArtifact( + path_entry, separator, environment_variable, user_accounts, find_specs)) + + # Glob expansion should by default recurse ten levels. + self.assertEqual(len(find_specs[artifact_types.TYPE_INDICATOR_FILE]), 10) + + # Last entry in find_specs list should be 10 levels of depth. + path_segments = ['test\\_data', '.*', '.*', '.*', '.*', + '.*', '.*', '.*', '.*', '.*', '.*'] + self.assertEqual( + find_specs[artifact_types.TYPE_INDICATOR_FILE][9]._location_segments, + path_segments) + + # Test expansion of user home directories + find_specs = {} + separator = '/' + testuser1 = artifacts.UserAccountArtifact( + user_directory='/homes/testuser1', username='testuser1') + testuser2 = artifacts.UserAccountArtifact( + user_directory='/home/testuser2', username='testuser2') + user_accounts = [testuser1, testuser2] + path_entry = '%%users.homedir%%/.thumbnails/**3' + + (artifact_filters.ArtifactDefinitionsFilterHelper. + BuildFindSpecsFromFileArtifact( + path_entry, separator, environment_variable, user_accounts, find_specs)) + + # Six total find specs should be created for testuser1 and testuser2. + self.assertEqual(len(find_specs[artifact_types.TYPE_INDICATOR_FILE]), 6) + + # Last entry in find_specs list should be testuser2 with a depth of 3 + path_segments = ['home', 'testuser2', '\.thumbnails', '.*', '.*', '.*'] + self.assertEqual( + find_specs[artifact_types.TYPE_INDICATOR_FILE][5]._location_segments, + path_segments) + + + # Test Windows path with profile directories and globs with a depth of 4. + find_specs = {} + separator = '\\' + testuser1 = artifacts.UserAccountArtifact( + user_directory='\\Users\\\\testuser1', username='testuser1') + testuser2 = artifacts.UserAccountArtifact( + user_directory='\\Users\\\\testuser2', username='testuser2') + user_accounts = [testuser1, testuser2] + path_entry = '%%users.homedir%%\\AppData\\**4' + + (artifact_filters.ArtifactDefinitionsFilterHelper. + BuildFindSpecsFromFileArtifact( + path_entry, separator, environment_variable, user_accounts, find_specs)) + + # Eight find specs should be created for testuser1 and testuser2. + self.assertEqual(len(find_specs[artifact_types.TYPE_INDICATOR_FILE]), 8) + + # Last entry in find_specs list should be testuser2, with a depth of 4. + path_segments = ['Users', 'testuser2', 'AppData', '.*', '.*', '.*', '.*'] + self.assertEqual( + find_specs[artifact_types.TYPE_INDICATOR_FILE][7]._location_segments, + path_segments) + + if __name__ == '__main__': unittest.main() diff --git a/tests/engine/path_helper.py b/tests/engine/path_helper.py index 13717d9a73..6272d97b7f 100644 --- a/tests/engine/path_helper.py +++ b/tests/engine/path_helper.py @@ -4,6 +4,7 @@ from __future__ import unicode_literals +import logging import os import unittest @@ -40,6 +41,10 @@ def testExpandWindowsPath(self): '%Bogus%\\System32', [environment_variable]) self.assertEqual(expanded_path, '%Bogus%\\System32') + expanded_path = path_helper.PathHelper.ExpandWindowsPath( + '%%environ_systemroot%%\\System32', [environment_variable]) + self.assertEqual(expanded_path, '\\Windows\\System32') + # Test non-string environment variable. environment_variable = artifacts.EnvironmentVariableArtifact( case_sensitive=False, name='SystemRoot', value=('bogus', 0)) @@ -142,6 +147,82 @@ def testGetRelativePathForPathSpec(self): qcow_path_spec) self.assertIsNone(display_name) + def testAppendPathEntries(self): + """Tests the AppendPathEntries function.""" + separator = '\\' + path = '\\Windows\\Test' + + # Test depth of ten skipping first entry. + # The path will have 9 entries as the default depth for ** is 10, but the + # first entry is being skipped. + count = 10 + skip_first = True + paths = path_helper.PathHelper.AppendPathEntries( + path, separator, count, skip_first) + check_paths = [ + '\\Windows\\Test\\*\\*', + '\\Windows\\Test\\*\\*\\*', + '\\Windows\\Test\\*\\*\\*\\*', + '\\Windows\\Test\\*\\*\\*\\*\\*', + '\\Windows\\Test\\*\\*\\*\\*\\*\\*', + '\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*', + '\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*\\*', + '\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*\\*\\*', + '\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*\\*\\*\\*'] + + # Nine paths returned + self.assertEqual(len(paths), 9) + self.assertItemsEqual(paths, check_paths) + + # Now test with skip_first set to False, but only a depth of 4. + # the path will have a total of 4 entries. + count = 4 + skip_first = False + check_paths = [ + '\\Windows\\Test\\*', + '\\Windows\\Test\\*\\*', + '\\Windows\\Test\\*\\*\\*', + '\\Windows\\Test\\*\\*\\*\\*'] + + paths = path_helper.PathHelper.AppendPathEntries( + path, separator, count, skip_first) + self.assertItemsEqual(paths, check_paths) + + def testExpandRecursiveGlobs(self): + """Tests the _ExpandRecursiveGlobs function.""" + separator = '/' + + # Test a path with a trailing /, which means first directory is skipped. + # The path will have 9 entries as the default depth for ** is 10, but the + # first entry is being skipped. + path = '/etc/sysconfig/**/' + paths = path_helper.PathHelper.ExpandRecursiveGlobs(path, separator) + check_paths = [ + '/etc/sysconfig/*/*', + '/etc/sysconfig/*/*/*', + '/etc/sysconfig/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*/*/*/*/*'] + # Nine paths returned + self.assertEqual(len(paths), 9) + self.assertItemsEqual(paths, check_paths) + + # Now test with no trailing separator, but only a depth of 4. + # the path will have a total of 4 entries. + path = '/etc/sysconfig/**4' + check_paths = [ + '/etc/sysconfig/*', + '/etc/sysconfig/*/*', + '/etc/sysconfig/*/*/*', + '/etc/sysconfig/*/*/*/*'] + + paths = path_helper.PathHelper.ExpandRecursiveGlobs(path, separator) + self.assertItemsEqual(paths, check_paths) + if __name__ == '__main__': unittest.main() From 4652ba9b7540b55c12e3b26535b39411e0281b39 Mon Sep 17 00:00:00 2001 From: James Nettesheim Date: Sun, 27 May 2018 00:19:02 -0700 Subject: [PATCH 08/17] Codefactor and other whitespace changes --- plaso/engine/artifact_filters.py | 14 ++- plaso/engine/path_helper.py | 2 +- tests/engine/artifact_filters.py | 52 ++++++----- tests/engine/path_helper.py | 144 +++++++++++++++---------------- 4 files changed, 108 insertions(+), 104 deletions(-) diff --git a/plaso/engine/artifact_filters.py b/plaso/engine/artifact_filters.py index 09d48da704..c03da70a48 100644 --- a/plaso/engine/artifact_filters.py +++ b/plaso/engine/artifact_filters.py @@ -4,7 +4,6 @@ from __future__ import unicode_literals import logging -import re from artifacts import definitions as artifact_types @@ -52,22 +51,22 @@ def BuildFindSpecs(self, environment_variables=None): for artifact_filter in self._artifacts: if self._artifacts_registry.GetDefinitionByName(artifact_filter): artifact_definitions.append( - self._artifacts_registry.GetDefinitionByName(artifact_filter)) + self._artifacts_registry.GetDefinitionByName(artifact_filter)) for definition in artifact_definitions: for source in definition.sources: if source.type_indicator == artifact_types.TYPE_INDICATOR_FILE: for path_entry in source.paths: self.BuildFindSpecsFromFileArtifact( - path_entry, source.separator, environment_variables, - self._knowledge_base.user_accounts, find_specs) + path_entry, source.separator, environment_variables, + self._knowledge_base.user_accounts, find_specs) elif (source.type_indicator == artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY): keys = set(source.keys) for key_entry in keys: if self._CheckKeyCompatibility(key_entry): self.BuildFindSpecsFromRegistryArtifact( - key_entry, find_specs) + key_entry, find_specs) elif (source.type_indicator == artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_VALUE): # TODO: Handle Registry Values Once Supported in dfwinreg. @@ -89,7 +88,7 @@ def BuildFindSpecs(self, environment_variables=None): @classmethod def BuildFindSpecsFromFileArtifact( - self, path_entry, separator, environment_variables, user_accounts, + cls, path_entry, separator, environment_variables, user_accounts, find_specs): """Build find specifications from a FILE artifact type. @@ -143,8 +142,7 @@ def BuildFindSpecsFromFileArtifact( find_specs[artifact_types.TYPE_INDICATOR_FILE].append(find_spec) @classmethod - def BuildFindSpecsFromRegistryArtifact( - self, key_entry, find_specs): + def BuildFindSpecsFromRegistryArtifact(cls, key_entry, find_specs): """Build find specifications from a Windows registry artifact type. Args: diff --git a/plaso/engine/path_helper.py b/plaso/engine/path_helper.py index 2eda528221..02a49a9e74 100644 --- a/plaso/engine/path_helper.py +++ b/plaso/engine/path_helper.py @@ -242,4 +242,4 @@ def AppendPathEntries(cls, path, separator, count, skip_first): paths.append(path) iteration += 1 - return paths \ No newline at end of file + return paths diff --git a/tests/engine/artifact_filters.py b/tests/engine/artifact_filters.py index 1b3e5f9cf2..d230ee0a59 100644 --- a/tests/engine/artifact_filters.py +++ b/tests/engine/artifact_filters.py @@ -140,15 +140,16 @@ def test_CheckKeyCompatibility(self): """Tests the _CheckKeyCompatibility function""" # Compatible Key. - key = 'HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Control' + key = 'HKEY_LOCAL_MACHINE\\SYSTEM\\CurrentControlSet\\Control' compatible_key = (artifact_filters.ArtifactDefinitionsFilterHelper. _CheckKeyCompatibility(key)) self.assertTrue(compatible_key) # NOT a Compatible Key. - key = 'HKEY_USERS\S-1-5-18' - compatible_key = (artifact_filters.ArtifactDefinitionsFilterHelper. - _CheckKeyCompatibility(key)) + key = 'HKEY_USERS\\S-1-5-18' + compatible_key = ( + artifact_filters.ArtifactDefinitionsFilterHelper._CheckKeyCompatibility( + key)) self.assertFalse(compatible_key) @@ -161,11 +162,12 @@ def testBuildFindSpecsFromFileArtifact(self): # Test expansion of environment variables. path_entry = '%%environ_systemroot%%\\test_data\\*.evtx' environment_variable = [artifacts.EnvironmentVariableArtifact( - case_sensitive=False, name='SystemRoot', value='C:\\Windows')] + case_sensitive=False, name='SystemRoot', value='C:\\Windows')] (artifact_filters.ArtifactDefinitionsFilterHelper. BuildFindSpecsFromFileArtifact( - path_entry, separator, environment_variable, user_accounts, find_specs)) + path_entry, separator, environment_variable, user_accounts, + find_specs)) # Should build 1 find_spec. self.assertEqual(len(find_specs[artifact_types.TYPE_INDICATOR_FILE]), 1) @@ -173,8 +175,8 @@ def testBuildFindSpecsFromFileArtifact(self): # Location segments should be equivalent to \Windows\test_data\*.evtx. path_segments = ['Windows', 'test\\_data', '.*\\.evtx'] self.assertEqual( - find_specs[artifact_types.TYPE_INDICATOR_FILE][0]._location_segments, - path_segments) + find_specs[artifact_types.TYPE_INDICATOR_FILE][0]._location_segments, + path_segments) # Test expansion of globs. @@ -182,17 +184,19 @@ def testBuildFindSpecsFromFileArtifact(self): path_entry = '\\test_data\\**' (artifact_filters.ArtifactDefinitionsFilterHelper. BuildFindSpecsFromFileArtifact( - path_entry, separator, environment_variable, user_accounts, find_specs)) + path_entry, separator, environment_variable, user_accounts, + find_specs)) # Glob expansion should by default recurse ten levels. self.assertEqual(len(find_specs[artifact_types.TYPE_INDICATOR_FILE]), 10) # Last entry in find_specs list should be 10 levels of depth. - path_segments = ['test\\_data', '.*', '.*', '.*', '.*', - '.*', '.*', '.*', '.*', '.*', '.*'] + path_segments = [ + 'test\\_data', '.*', '.*', '.*', '.*', '.*', '.*', '.*', '.*', '.*', + '.*'] self.assertEqual( - find_specs[artifact_types.TYPE_INDICATOR_FILE][9]._location_segments, - path_segments) + find_specs[artifact_types.TYPE_INDICATOR_FILE][9]._location_segments, + path_segments) # Test expansion of user home directories find_specs = {} @@ -200,22 +204,23 @@ def testBuildFindSpecsFromFileArtifact(self): testuser1 = artifacts.UserAccountArtifact( user_directory='/homes/testuser1', username='testuser1') testuser2 = artifacts.UserAccountArtifact( - user_directory='/home/testuser2', username='testuser2') + user_directory='/home/testuser2', username='testuser2') user_accounts = [testuser1, testuser2] path_entry = '%%users.homedir%%/.thumbnails/**3' (artifact_filters.ArtifactDefinitionsFilterHelper. BuildFindSpecsFromFileArtifact( - path_entry, separator, environment_variable, user_accounts, find_specs)) + path_entry, separator, environment_variable, user_accounts, + find_specs)) # Six total find specs should be created for testuser1 and testuser2. self.assertEqual(len(find_specs[artifact_types.TYPE_INDICATOR_FILE]), 6) # Last entry in find_specs list should be testuser2 with a depth of 3 - path_segments = ['home', 'testuser2', '\.thumbnails', '.*', '.*', '.*'] + path_segments = ['home', 'testuser2', '\\.thumbnails', '.*', '.*', '.*'] self.assertEqual( - find_specs[artifact_types.TYPE_INDICATOR_FILE][5]._location_segments, - path_segments) + find_specs[artifact_types.TYPE_INDICATOR_FILE][5]._location_segments, + path_segments) # Test Windows path with profile directories and globs with a depth of 4. @@ -224,22 +229,23 @@ def testBuildFindSpecsFromFileArtifact(self): testuser1 = artifacts.UserAccountArtifact( user_directory='\\Users\\\\testuser1', username='testuser1') testuser2 = artifacts.UserAccountArtifact( - user_directory='\\Users\\\\testuser2', username='testuser2') + user_directory='\\Users\\\\testuser2', username='testuser2') user_accounts = [testuser1, testuser2] path_entry = '%%users.homedir%%\\AppData\\**4' (artifact_filters.ArtifactDefinitionsFilterHelper. BuildFindSpecsFromFileArtifact( - path_entry, separator, environment_variable, user_accounts, find_specs)) + path_entry, separator, environment_variable, user_accounts, + find_specs)) # Eight find specs should be created for testuser1 and testuser2. self.assertEqual(len(find_specs[artifact_types.TYPE_INDICATOR_FILE]), 8) # Last entry in find_specs list should be testuser2, with a depth of 4. - path_segments = ['Users', 'testuser2', 'AppData', '.*', '.*', '.*', '.*'] + path_segments = ['Users', 'testuser2', 'AppData', '.*', '.*', '.*', '.*'] self.assertEqual( - find_specs[artifact_types.TYPE_INDICATOR_FILE][7]._location_segments, - path_segments) + find_specs[artifact_types.TYPE_INDICATOR_FILE][7]._location_segments, + path_segments) if __name__ == '__main__': diff --git a/tests/engine/path_helper.py b/tests/engine/path_helper.py index 6272d97b7f..5bb39f0ac0 100644 --- a/tests/engine/path_helper.py +++ b/tests/engine/path_helper.py @@ -148,80 +148,80 @@ def testGetRelativePathForPathSpec(self): self.assertIsNone(display_name) def testAppendPathEntries(self): - """Tests the AppendPathEntries function.""" - separator = '\\' - path = '\\Windows\\Test' - - # Test depth of ten skipping first entry. - # The path will have 9 entries as the default depth for ** is 10, but the - # first entry is being skipped. - count = 10 - skip_first = True - paths = path_helper.PathHelper.AppendPathEntries( - path, separator, count, skip_first) - check_paths = [ - '\\Windows\\Test\\*\\*', - '\\Windows\\Test\\*\\*\\*', - '\\Windows\\Test\\*\\*\\*\\*', - '\\Windows\\Test\\*\\*\\*\\*\\*', - '\\Windows\\Test\\*\\*\\*\\*\\*\\*', - '\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*', - '\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*\\*', - '\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*\\*\\*', - '\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*\\*\\*\\*'] - - # Nine paths returned - self.assertEqual(len(paths), 9) - self.assertItemsEqual(paths, check_paths) - - # Now test with skip_first set to False, but only a depth of 4. - # the path will have a total of 4 entries. - count = 4 - skip_first = False - check_paths = [ - '\\Windows\\Test\\*', - '\\Windows\\Test\\*\\*', - '\\Windows\\Test\\*\\*\\*', - '\\Windows\\Test\\*\\*\\*\\*'] - - paths = path_helper.PathHelper.AppendPathEntries( - path, separator, count, skip_first) - self.assertItemsEqual(paths, check_paths) + """Tests the AppendPathEntries function.""" + separator = '\\' + path = '\\Windows\\Test' + + # Test depth of ten skipping first entry. + # The path will have 9 entries as the default depth for ** is 10, but the + # first entry is being skipped. + count = 10 + skip_first = True + paths = path_helper.PathHelper.AppendPathEntries( + path, separator, count, skip_first) + check_paths = [ + '\\Windows\\Test\\*\\*', + '\\Windows\\Test\\*\\*\\*', + '\\Windows\\Test\\*\\*\\*\\*', + '\\Windows\\Test\\*\\*\\*\\*\\*', + '\\Windows\\Test\\*\\*\\*\\*\\*\\*', + '\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*', + '\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*\\*', + '\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*\\*\\*', + '\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*\\*\\*\\*'] + + # Nine paths returned + self.assertEqual(len(paths), 9) + self.assertItemsEqual(paths, check_paths) + + # Now test with skip_first set to False, but only a depth of 4. + # the path will have a total of 4 entries. + count = 4 + skip_first = False + check_paths = [ + '\\Windows\\Test\\*', + '\\Windows\\Test\\*\\*', + '\\Windows\\Test\\*\\*\\*', + '\\Windows\\Test\\*\\*\\*\\*'] + + paths = path_helper.PathHelper.AppendPathEntries( + path, separator, count, skip_first) + self.assertItemsEqual(paths, check_paths) def testExpandRecursiveGlobs(self): - """Tests the _ExpandRecursiveGlobs function.""" - separator = '/' - - # Test a path with a trailing /, which means first directory is skipped. - # The path will have 9 entries as the default depth for ** is 10, but the - # first entry is being skipped. - path = '/etc/sysconfig/**/' - paths = path_helper.PathHelper.ExpandRecursiveGlobs(path, separator) - check_paths = [ - '/etc/sysconfig/*/*', - '/etc/sysconfig/*/*/*', - '/etc/sysconfig/*/*/*/*', - '/etc/sysconfig/*/*/*/*/*', - '/etc/sysconfig/*/*/*/*/*/*', - '/etc/sysconfig/*/*/*/*/*/*/*', - '/etc/sysconfig/*/*/*/*/*/*/*/*', - '/etc/sysconfig/*/*/*/*/*/*/*/*/*', - '/etc/sysconfig/*/*/*/*/*/*/*/*/*/*'] - # Nine paths returned - self.assertEqual(len(paths), 9) - self.assertItemsEqual(paths, check_paths) - - # Now test with no trailing separator, but only a depth of 4. - # the path will have a total of 4 entries. - path = '/etc/sysconfig/**4' - check_paths = [ - '/etc/sysconfig/*', - '/etc/sysconfig/*/*', - '/etc/sysconfig/*/*/*', - '/etc/sysconfig/*/*/*/*'] - - paths = path_helper.PathHelper.ExpandRecursiveGlobs(path, separator) - self.assertItemsEqual(paths, check_paths) + """Tests the _ExpandRecursiveGlobs function.""" + separator = '/' + + # Test a path with a trailing /, which means first directory is skipped. + # The path will have 9 entries as the default depth for ** is 10, but the + # first entry is being skipped. + path = '/etc/sysconfig/**/' + paths = path_helper.PathHelper.ExpandRecursiveGlobs(path, separator) + check_paths = [ + '/etc/sysconfig/*/*', + '/etc/sysconfig/*/*/*', + '/etc/sysconfig/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*/*/*/*/*'] + # Nine paths returned + self.assertEqual(len(paths), 9) + self.assertItemsEqual(paths, check_paths) + + # Now test with no trailing separator, but only a depth of 4. + # the path will have a total of 4 entries. + path = '/etc/sysconfig/**4' + check_paths = [ + '/etc/sysconfig/*', + '/etc/sysconfig/*/*', + '/etc/sysconfig/*/*/*', + '/etc/sysconfig/*/*/*/*'] + + paths = path_helper.PathHelper.ExpandRecursiveGlobs(path, separator) + self.assertItemsEqual(paths, check_paths) if __name__ == '__main__': From ba8a693e44c7ba082810c2631e371ff9ba7da8e0 Mon Sep 17 00:00:00 2001 From: James Nettesheim Date: Sun, 27 May 2018 11:12:07 -0700 Subject: [PATCH 09/17] PyLine and codefactor changes --- tests/engine/artifact_filters.py | 49 +++++++++++++++++--------------- tests/engine/path_helper.py | 1 - 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/tests/engine/artifact_filters.py b/tests/engine/artifact_filters.py index d230ee0a59..9ee4952ab7 100644 --- a/tests/engine/artifact_filters.py +++ b/tests/engine/artifact_filters.py @@ -29,6 +29,8 @@ class BuildFindSpecsFromFileTest(shared_test_lib.BaseTestCase): """Tests for the BuildFindSpecsFromFile function.""" + # pylint: disable=protected-access + @shared_test_lib.skipUnlessHasTestFile(['artifacts']) @shared_test_lib.skipUnlessHasTestFile(['System.evtx']) @shared_test_lib.skipUnlessHasTestFile(['testdir', 'filter_1.txt']) @@ -37,13 +39,13 @@ def testBuildFindSpecsWithFileSystem(self): """Tests the BuildFindSpecs function for file type artifacts.""" knowledge_base = knowledge_base_engine.KnowledgeBase() testuser1 = artifacts.UserAccountArtifact( - identifier='1000', - user_directory='C:\\\\Users\\\\testuser1', - username='testuser1') + identifier='1000', + user_directory='C:\\\\Users\\\\testuser1', + username='testuser1') testuser2 = artifacts.UserAccountArtifact( - identifier='1001', - user_directory='C:\\\\Users\\\\testuser2', - username='testuser2') + identifier='1001', + user_directory='C:\\\\Users\\\\testuser2', + username='testuser2') knowledge_base.AddUserAccount(testuser1) knowledge_base.AddUserAccount(testuser2) @@ -71,8 +73,8 @@ def testBuildFindSpecsWithFileSystem(self): path_segments = ['Users', 'testuser2', 'Documents', 'WindowsPowerShell', 'profile\\.ps1'] self.assertEqual( - find_specs[artifact_types.TYPE_INDICATOR_FILE][2]._location_segments, - path_segments) + find_specs[artifact_types.TYPE_INDICATOR_FILE][2]._location_segments, + path_segments) path_spec = path_spec_factory.Factory.NewPathSpec( dfvfs_definitions.TYPE_INDICATOR_OS, location='.') @@ -109,7 +111,7 @@ def testBuildFindSpecsWithRegistry(self): test_filter_file.BuildFindSpecs(environment_variables=None) find_specs = knowledge_base.GetValue( - artifact_filters.ArtifactDefinitionsFilterHelper.ARTIFACT_FILTERS) + artifact_filters.ArtifactDefinitionsFilterHelper.ARTIFACT_FILTERS) self.assertEqual( len(find_specs[artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY]), 1) @@ -141,8 +143,9 @@ def test_CheckKeyCompatibility(self): # Compatible Key. key = 'HKEY_LOCAL_MACHINE\\SYSTEM\\CurrentControlSet\\Control' - compatible_key = (artifact_filters.ArtifactDefinitionsFilterHelper. - _CheckKeyCompatibility(key)) + compatible_key = ( + artifact_filters.ArtifactDefinitionsFilterHelper._CheckKeyCompatibility( + key)) self.assertTrue(compatible_key) # NOT a Compatible Key. @@ -165,9 +168,9 @@ def testBuildFindSpecsFromFileArtifact(self): case_sensitive=False, name='SystemRoot', value='C:\\Windows')] (artifact_filters.ArtifactDefinitionsFilterHelper. - BuildFindSpecsFromFileArtifact( - path_entry, separator, environment_variable, user_accounts, - find_specs)) + BuildFindSpecsFromFileArtifact( + path_entry, separator, environment_variable, user_accounts, + find_specs)) # Should build 1 find_spec. self.assertEqual(len(find_specs[artifact_types.TYPE_INDICATOR_FILE]), 1) @@ -183,9 +186,9 @@ def testBuildFindSpecsFromFileArtifact(self): find_specs = {} path_entry = '\\test_data\\**' (artifact_filters.ArtifactDefinitionsFilterHelper. - BuildFindSpecsFromFileArtifact( - path_entry, separator, environment_variable, user_accounts, - find_specs)) + BuildFindSpecsFromFileArtifact( + path_entry, separator, environment_variable, user_accounts, + find_specs)) # Glob expansion should by default recurse ten levels. self.assertEqual(len(find_specs[artifact_types.TYPE_INDICATOR_FILE]), 10) @@ -209,9 +212,9 @@ def testBuildFindSpecsFromFileArtifact(self): path_entry = '%%users.homedir%%/.thumbnails/**3' (artifact_filters.ArtifactDefinitionsFilterHelper. - BuildFindSpecsFromFileArtifact( - path_entry, separator, environment_variable, user_accounts, - find_specs)) + BuildFindSpecsFromFileArtifact( + path_entry, separator, environment_variable, user_accounts, + find_specs)) # Six total find specs should be created for testuser1 and testuser2. self.assertEqual(len(find_specs[artifact_types.TYPE_INDICATOR_FILE]), 6) @@ -234,9 +237,9 @@ def testBuildFindSpecsFromFileArtifact(self): path_entry = '%%users.homedir%%\\AppData\\**4' (artifact_filters.ArtifactDefinitionsFilterHelper. - BuildFindSpecsFromFileArtifact( - path_entry, separator, environment_variable, user_accounts, - find_specs)) + BuildFindSpecsFromFileArtifact( + path_entry, separator, environment_variable, user_accounts, + find_specs)) # Eight find specs should be created for testuser1 and testuser2. self.assertEqual(len(find_specs[artifact_types.TYPE_INDICATOR_FILE]), 8) diff --git a/tests/engine/path_helper.py b/tests/engine/path_helper.py index 5bb39f0ac0..927b0de448 100644 --- a/tests/engine/path_helper.py +++ b/tests/engine/path_helper.py @@ -4,7 +4,6 @@ from __future__ import unicode_literals -import logging import os import unittest From 4e16ceb924cb12df373a84427b29cd885653ab1d Mon Sep 17 00:00:00 2001 From: James Nettesheim Date: Sun, 27 May 2018 22:51:18 -0700 Subject: [PATCH 10/17] Further pylint changes and a few nits --- plaso/engine/path_helper.py | 1 - tests/engine/artifact_filters.py | 29 ++++++++++++++-------------- tests/engine/path_helper.py | 33 ++++++++++++++++++++++---------- 3 files changed, 37 insertions(+), 26 deletions(-) diff --git a/plaso/engine/path_helper.py b/plaso/engine/path_helper.py index 02a49a9e74..e47811b300 100644 --- a/plaso/engine/path_helper.py +++ b/plaso/engine/path_helper.py @@ -226,7 +226,6 @@ def AppendPathEntries(cls, path, separator, count, skip_first): count (int): Number of entries to be appended. skip_first (bool): Whether or not to skip first entry to append. - Returns: list[str]: Paths that were expanded from the path with wildcards. """ diff --git a/tests/engine/artifact_filters.py b/tests/engine/artifact_filters.py index 9ee4952ab7..ab077d3162 100644 --- a/tests/engine/artifact_filters.py +++ b/tests/engine/artifact_filters.py @@ -73,8 +73,8 @@ def testBuildFindSpecsWithFileSystem(self): path_segments = ['Users', 'testuser2', 'Documents', 'WindowsPowerShell', 'profile\\.ps1'] self.assertEqual( - find_specs[artifact_types.TYPE_INDICATOR_FILE][2]._location_segments, - path_segments) + find_specs[artifact_types.TYPE_INDICATOR_FILE][2]._location_segments, + path_segments) path_spec = path_spec_factory.Factory.NewPathSpec( dfvfs_definitions.TYPE_INDICATOR_OS, location='.') @@ -168,9 +168,9 @@ def testBuildFindSpecsFromFileArtifact(self): case_sensitive=False, name='SystemRoot', value='C:\\Windows')] (artifact_filters.ArtifactDefinitionsFilterHelper. - BuildFindSpecsFromFileArtifact( - path_entry, separator, environment_variable, user_accounts, - find_specs)) + BuildFindSpecsFromFileArtifact( + path_entry, separator, environment_variable, user_accounts, + find_specs)) # Should build 1 find_spec. self.assertEqual(len(find_specs[artifact_types.TYPE_INDICATOR_FILE]), 1) @@ -186,9 +186,9 @@ def testBuildFindSpecsFromFileArtifact(self): find_specs = {} path_entry = '\\test_data\\**' (artifact_filters.ArtifactDefinitionsFilterHelper. - BuildFindSpecsFromFileArtifact( - path_entry, separator, environment_variable, user_accounts, - find_specs)) + BuildFindSpecsFromFileArtifact( + path_entry, separator, environment_variable, user_accounts, + find_specs)) # Glob expansion should by default recurse ten levels. self.assertEqual(len(find_specs[artifact_types.TYPE_INDICATOR_FILE]), 10) @@ -212,9 +212,9 @@ def testBuildFindSpecsFromFileArtifact(self): path_entry = '%%users.homedir%%/.thumbnails/**3' (artifact_filters.ArtifactDefinitionsFilterHelper. - BuildFindSpecsFromFileArtifact( - path_entry, separator, environment_variable, user_accounts, - find_specs)) + BuildFindSpecsFromFileArtifact( + path_entry, separator, environment_variable, user_accounts, + find_specs)) # Six total find specs should be created for testuser1 and testuser2. self.assertEqual(len(find_specs[artifact_types.TYPE_INDICATOR_FILE]), 6) @@ -225,7 +225,6 @@ def testBuildFindSpecsFromFileArtifact(self): find_specs[artifact_types.TYPE_INDICATOR_FILE][5]._location_segments, path_segments) - # Test Windows path with profile directories and globs with a depth of 4. find_specs = {} separator = '\\' @@ -237,9 +236,9 @@ def testBuildFindSpecsFromFileArtifact(self): path_entry = '%%users.homedir%%\\AppData\\**4' (artifact_filters.ArtifactDefinitionsFilterHelper. - BuildFindSpecsFromFileArtifact( - path_entry, separator, environment_variable, user_accounts, - find_specs)) + BuildFindSpecsFromFileArtifact( + path_entry, separator, environment_variable, user_accounts, + find_specs)) # Eight find specs should be created for testuser1 and testuser2. self.assertEqual(len(find_specs[artifact_types.TYPE_INDICATOR_FILE]), 8) diff --git a/tests/engine/path_helper.py b/tests/engine/path_helper.py index 927b0de448..f882fd8c7b 100644 --- a/tests/engine/path_helper.py +++ b/tests/engine/path_helper.py @@ -158,6 +158,11 @@ def testAppendPathEntries(self): skip_first = True paths = path_helper.PathHelper.AppendPathEntries( path, separator, count, skip_first) + + # Nine paths returned + self.assertEqual(len(paths), 9) + + # Nine paths in total, each one level deeper than the previous. check_paths = [ '\\Windows\\Test\\*\\*', '\\Windows\\Test\\*\\*\\*', @@ -168,23 +173,24 @@ def testAppendPathEntries(self): '\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*\\*', '\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*\\*\\*', '\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*\\*\\*\\*'] - - # Nine paths returned - self.assertEqual(len(paths), 9) self.assertItemsEqual(paths, check_paths) # Now test with skip_first set to False, but only a depth of 4. # the path will have a total of 4 entries. count = 4 skip_first = False + paths = path_helper.PathHelper.AppendPathEntries( + path, separator, count, skip_first) + + # Four paths returned + self.assertEqual(len(paths), 4) + + # Four paths in total, each one level deeper than the previous. check_paths = [ '\\Windows\\Test\\*', '\\Windows\\Test\\*\\*', '\\Windows\\Test\\*\\*\\*', '\\Windows\\Test\\*\\*\\*\\*'] - - paths = path_helper.PathHelper.AppendPathEntries( - path, separator, count, skip_first) self.assertItemsEqual(paths, check_paths) def testExpandRecursiveGlobs(self): @@ -196,6 +202,11 @@ def testExpandRecursiveGlobs(self): # first entry is being skipped. path = '/etc/sysconfig/**/' paths = path_helper.PathHelper.ExpandRecursiveGlobs(path, separator) + + # Nine paths returned + self.assertEqual(len(paths), 9) + + # Nine paths in total, each one level deeper than the previous. check_paths = [ '/etc/sysconfig/*/*', '/etc/sysconfig/*/*/*', @@ -206,20 +217,22 @@ def testExpandRecursiveGlobs(self): '/etc/sysconfig/*/*/*/*/*/*/*/*', '/etc/sysconfig/*/*/*/*/*/*/*/*/*', '/etc/sysconfig/*/*/*/*/*/*/*/*/*/*'] - # Nine paths returned - self.assertEqual(len(paths), 9) self.assertItemsEqual(paths, check_paths) # Now test with no trailing separator, but only a depth of 4. # the path will have a total of 4 entries. path = '/etc/sysconfig/**4' + paths = path_helper.PathHelper.ExpandRecursiveGlobs(path, separator) + + # Four paths returned + self.assertEqual(len(paths), 4) + + # Four paths in total, each one level deeper than the previous. check_paths = [ '/etc/sysconfig/*', '/etc/sysconfig/*/*', '/etc/sysconfig/*/*/*', '/etc/sysconfig/*/*/*/*'] - - paths = path_helper.PathHelper.ExpandRecursiveGlobs(path, separator) self.assertItemsEqual(paths, check_paths) From d79edc3ffa913e99f28975ca094b825e766edb64 Mon Sep 17 00:00:00 2001 From: James Nettesheim Date: Sun, 27 May 2018 22:53:13 -0700 Subject: [PATCH 11/17] Whitepace --- tests/engine/artifact_filters.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/engine/artifact_filters.py b/tests/engine/artifact_filters.py index ab077d3162..225918afe9 100644 --- a/tests/engine/artifact_filters.py +++ b/tests/engine/artifact_filters.py @@ -181,7 +181,6 @@ def testBuildFindSpecsFromFileArtifact(self): find_specs[artifact_types.TYPE_INDICATOR_FILE][0]._location_segments, path_segments) - # Test expansion of globs. find_specs = {} path_entry = '\\test_data\\**' From 8c0163864307da4c75224a44435dd2784847e06e Mon Sep 17 00:00:00 2001 From: Joachim Metz Date: Mon, 4 Jun 2018 06:19:20 +0200 Subject: [PATCH 12/17] Update artifact_filters.py --- plaso/engine/artifact_filters.py | 1 + 1 file changed, 1 insertion(+) diff --git a/plaso/engine/artifact_filters.py b/plaso/engine/artifact_filters.py index c03da70a48..81c9d02ae9 100644 --- a/plaso/engine/artifact_filters.py +++ b/plaso/engine/artifact_filters.py @@ -11,6 +11,7 @@ from dfwinreg import registry_searcher from plaso.engine import path_helper + class ArtifactDefinitionsFilterHelper(object): """Helper to create filters based on artifact definitions. From 927cb28ae7e4ce9165e93d5aa82f912234626267 Mon Sep 17 00:00:00 2001 From: Joachim Metz Date: Mon, 4 Jun 2018 06:52:58 +0200 Subject: [PATCH 13/17] Update artifact_filters.py --- plaso/engine/artifact_filters.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plaso/engine/artifact_filters.py b/plaso/engine/artifact_filters.py index 81c9d02ae9..54a3e377c5 100644 --- a/plaso/engine/artifact_filters.py +++ b/plaso/engine/artifact_filters.py @@ -32,7 +32,8 @@ def __init__(self, artifacts_registry, artifacts, knowledge_base): definitions registry. artifacts (list[str]): artifact names to process. path (str): path to a file that contains one or more forensic artifacts. - knowledge_base (KnowledgeBase): Knowledge base for Log2Timeline. + knowledge_base (KnowledgeBase): contains information from the source + data needed for filtering. """ super(ArtifactDefinitionsFilterHelper, self).__init__() self._artifacts_registry = artifacts_registry From f8004d703df15bc0706b98fa4eae8cf9aac27753 Mon Sep 17 00:00:00 2001 From: Joachim Metz Date: Mon, 4 Jun 2018 06:57:18 +0200 Subject: [PATCH 14/17] Update artifact_filters.py --- plaso/engine/artifact_filters.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plaso/engine/artifact_filters.py b/plaso/engine/artifact_filters.py index 54a3e377c5..3ab347bfff 100644 --- a/plaso/engine/artifact_filters.py +++ b/plaso/engine/artifact_filters.py @@ -30,7 +30,7 @@ def __init__(self, artifacts_registry, artifacts, knowledge_base): Args: artifacts_registry (artifacts.ArtifactDefinitionsRegistry]): artifact definitions registry. - artifacts (list[str]): artifact names to process. + artifacts (list[str]): artifact names to filter. path (str): path to a file that contains one or more forensic artifacts. knowledge_base (KnowledgeBase): contains information from the source data needed for filtering. From 6fe99d15203658a574706396d6696c548fb9a158 Mon Sep 17 00:00:00 2001 From: Joachim Metz Date: Mon, 4 Jun 2018 06:59:32 +0200 Subject: [PATCH 15/17] Update artifact_filters.py --- plaso/engine/artifact_filters.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/plaso/engine/artifact_filters.py b/plaso/engine/artifact_filters.py index 3ab347bfff..76c2263d66 100644 --- a/plaso/engine/artifact_filters.py +++ b/plaso/engine/artifact_filters.py @@ -92,17 +92,16 @@ def BuildFindSpecs(self, environment_variables=None): def BuildFindSpecsFromFileArtifact( cls, path_entry, separator, environment_variables, user_accounts, find_specs): - """Build find specifications from a FILE artifact type. + """Builds find specifications from a file artifact definition. Args: - path_entry (str): Current file system path to add. - environment variables. - separator (str): File system path separator. - environment_variables list(str): Environment variable attributes used to + path_entry (str): current file system path to add. + separator (str): file system path segment separator. + environment_variables list(str): environment variable attributes used to dynamically populate environment variables in key. - user_accounts list(str): Identified user accounts stored in the + user_accounts (list[str]): identified user accounts stored in the knowledge base. - find_specs dict[artifacts.artifact_types]: Dictionary containing + find_specs (dict[artifacts.artifact_types]): Dictionary containing find_specs. """ for glob_path in path_helper.PathHelper.ExpandRecursiveGlobs( From 116b9fcee36ee067c3d3fb8a989ce7b3d10186f2 Mon Sep 17 00:00:00 2001 From: Joachim Metz Date: Sat, 9 Jun 2018 19:41:37 +0200 Subject: [PATCH 16/17] Changes for review --- plaso/engine/artifact_filters.py | 193 +++++++++++++++------------- plaso/engine/path_helper.py | 207 +++++++++++++++--------------- tests/engine/artifact_filters.py | 155 +++++++++++------------ tests/engine/path_helper.py | 208 ++++++++++++++++++------------- 4 files changed, 407 insertions(+), 356 deletions(-) diff --git a/plaso/engine/artifact_filters.py b/plaso/engine/artifact_filters.py index 76c2263d66..7a3d91ff78 100644 --- a/plaso/engine/artifact_filters.py +++ b/plaso/engine/artifact_filters.py @@ -3,12 +3,11 @@ from __future__ import unicode_literals -import logging - from artifacts import definitions as artifact_types from dfvfs.helpers import file_system_searcher from dfwinreg import registry_searcher +from plaso.engine import logger from plaso.engine import path_helper @@ -21,112 +20,148 @@ class ArtifactDefinitionsFilterHelper(object): https://github.com/ForensicArtifacts/artifacts/blob/master/docs/Artifacts%20definition%20format%20and%20style%20guide.asciidoc """ - ARTIFACT_FILTERS = 'ARTIFACT_FILTERS' - _COMPATIBLE_DFWINREG_KEYS = ('HKEY_LOCAL_MACHINE') + _KNOWLEDGE_BASE_VALUE = 'ARTIFACT_FILTERS' + + _COMPATIBLE_REGISTRY_KEY_PATH_PREFIXES = ['HKEY_LOCAL_MACHINE'] - def __init__(self, artifacts_registry, artifacts, knowledge_base): + def __init__(self, artifacts_registry, artifact_definitions, knowledge_base): """Initializes an artifact definitions filter helper. Args: artifacts_registry (artifacts.ArtifactDefinitionsRegistry]): artifact definitions registry. - artifacts (list[str]): artifact names to filter. - path (str): path to a file that contains one or more forensic artifacts. + artifact_definitions (list[str]): artifact definition names to filter. + path (str): path to a file that contains one or more artifact definitions. knowledge_base (KnowledgeBase): contains information from the source data needed for filtering. """ super(ArtifactDefinitionsFilterHelper, self).__init__() + self._artifacts = artifact_definitions self._artifacts_registry = artifacts_registry - self._artifacts = artifacts self._knowledge_base = knowledge_base + def _CheckKeyCompatibility(self, key_path): + """Checks if a Windows Registry key path is supported by dfWinReg. + + Args: + key_path (str): path of the Windows Registry key. + + Returns: + bool: True if key is compatible or False if not. + """ + for key_path_prefix in self._COMPATIBLE_REGISTRY_KEY_PATH_PREFIXES: + if key_path.startswith(key_path_prefix): + return True + + logger.warning( + 'Prefix of key "{0:s}" is currently not supported'.format(key_path)) + return False + def BuildFindSpecs(self, environment_variables=None): - """Builds find specification from a forensic artifact definitions. + """Builds find specifications from artifact definitions. + + The resulting find specifications are set in the knowledge base. Args: environment_variables (Optional[list[EnvironmentVariableArtifact]]): environment variables. """ - find_specs = {} + find_specs_per_source_type = { + artifact_types.TYPE_INDICATOR_FILE: [], + artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY: []} - artifact_definitions = [] - for artifact_filter in self._artifacts: - if self._artifacts_registry.GetDefinitionByName(artifact_filter): - artifact_definitions.append( - self._artifacts_registry.GetDefinitionByName(artifact_filter)) + for name in self._artifacts: + definition = self._artifacts_registry.GetDefinitionByName(name) + if not definition: + continue - for definition in artifact_definitions: for source in definition.sources: if source.type_indicator == artifact_types.TYPE_INDICATOR_FILE: - for path_entry in source.paths: - self.BuildFindSpecsFromFileArtifact( + # TODO: move source.paths iteration into + # BuildFindSpecsFromFileArtifact. + for path_entry in set(source.paths): + find_specs = self.BuildFindSpecsFromFileArtifact( path_entry, source.separator, environment_variables, - self._knowledge_base.user_accounts, find_specs) + self._knowledge_base.user_accounts) + find_specs_per_source_type[ + artifact_types.TYPE_INDICATOR_FILE].extend(find_specs) + elif (source.type_indicator == artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY): - keys = set(source.keys) - for key_entry in keys: - if self._CheckKeyCompatibility(key_entry): - self.BuildFindSpecsFromRegistryArtifact( - key_entry, find_specs) + # TODO: move source.keys iteration into + # BuildFindSpecsFromRegistryArtifact. + for key_path in set(source.keys): + if self._CheckKeyCompatibility(key_path): + find_specs = self.BuildFindSpecsFromRegistryArtifact(key_path) + find_specs_per_source_type[ + artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY].extend( + find_specs) + elif (source.type_indicator == artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_VALUE): # TODO: Handle Registry Values Once Supported in dfwinreg. # https://github.com/log2timeline/dfwinreg/issues/98 - logging.warning(('Unable to handle Registry Value, extracting ' - 'key only: "{0:s}"').format(source.key_value_pairs)) - - for key_pair in source.key_value_pairs: - keys = set() - keys.add(key_pair.get('key')) - for key_entry in keys: - if self._CheckKeyCompatibility(key_entry): - self.BuildFindSpecsFromRegistryArtifact(key_entry, find_specs) + logger.warning(( + 'Windows Registry values are not supported, extracting key: ' + '"{0!s}"').format(source.key_value_pairs)) + + # TODO: move source.key_value_pairs iteration into + # BuildFindSpecsFromRegistryArtifact. + for key_path in set([ + key_path for key_path, _ in source.key_value_pairs]): + if self._CheckKeyCompatibility(key_path): + find_specs = self.BuildFindSpecsFromRegistryArtifact(key_path) + find_specs_per_source_type[ + artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY].extend( + find_specs) + else: - logging.warning(('Unable to handle artifact, plaso does not ' - 'support: "{0:s}"').format(source.type_indicator)) + logger.warning( + 'Unsupported artifact definition source type: "{0:s}"'.format( + source.type_indicator)) - self._knowledge_base.SetValue(self.ARTIFACT_FILTERS, find_specs) + self._knowledge_base.SetValue( + self._KNOWLEDGE_BASE_VALUE, find_specs_per_source_type) - @classmethod def BuildFindSpecsFromFileArtifact( - cls, path_entry, separator, environment_variables, user_accounts, - find_specs): - """Builds find specifications from a file artifact definition. + self, source_path, path_separator, environment_variables, user_accounts): + """Builds find specifications from a file source type. Args: - path_entry (str): current file system path to add. - separator (str): file system path segment separator. + source_path (str): file system path defined by the source. + path_separator (str): file system path segment separator. environment_variables list(str): environment variable attributes used to dynamically populate environment variables in key. user_accounts (list[str]): identified user accounts stored in the knowledge base. - find_specs (dict[artifacts.artifact_types]): Dictionary containing - find_specs. + + Returns: + list[dfvfs.FindSpec]: find specifications for the file source type. """ + find_specs = [] for glob_path in path_helper.PathHelper.ExpandRecursiveGlobs( - path_entry, separator): - for path in path_helper.PathHelper.ExpandUserHomeDirectoryPath( + source_path, path_separator): + for path in path_helper.PathHelper.ExpandUsersHomeDirectoryPath( glob_path, user_accounts): if '%' in path: path = path_helper.PathHelper.ExpandWindowsPath( path, environment_variables) - if not path.startswith(separator): - logging.warning(( + if not path.startswith(path_separator): + logger.warning(( 'The path filter must be defined as an absolute path: ' '"{0:s}"').format(path)) continue # Convert the path filters into a list of path segments and # strip the root path segment. - path_segments = path.split(separator) + path_segments = path.split(path_separator) # Remove initial root entry path_segments.pop(0) if not path_segments[-1]: - logging.warning( + logger.warning( 'Empty last path segment in path filter: "{0:s}"'.format(path)) path_segments.pop(-1) @@ -134,49 +169,33 @@ def BuildFindSpecsFromFileArtifact( find_spec = file_system_searcher.FindSpec( location_glob=path_segments, case_sensitive=False) except ValueError as exception: - logging.error(( - 'Unable to build find spec for path: "{0:s}" with error: "{1!s}"' - ).format(path, exception)) + logger.error(( + 'Unable to build find specification for path: "{0:s}" with ' + 'error: {1!s}').format(path, exception)) continue - if artifact_types.TYPE_INDICATOR_FILE not in find_specs: - find_specs[artifact_types.TYPE_INDICATOR_FILE] = [] - find_specs[artifact_types.TYPE_INDICATOR_FILE].append(find_spec) - - @classmethod - def BuildFindSpecsFromRegistryArtifact(cls, key_entry, find_specs): - """Build find specifications from a Windows registry artifact type. - Args: - key_entry (str): Current file system key to add. - find_specs dict[artifacts.artifact_types]: Dictionary containing - find_specs. - """ - separator = '\\' - for key in path_helper.PathHelper.ExpandRecursiveGlobs( - key_entry, separator): - if '%%' in key: - logging.error(('Unable to expand path filter: "{0:s}"').format(key)) - continue - find_spec = registry_searcher.FindSpec(key_path_glob=key) - if artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY not in find_specs: - find_specs[artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY] = [] + find_specs.append(find_spec) - find_specs[artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY].append( - find_spec) + return find_specs - @staticmethod - def _CheckKeyCompatibility(key): - """Checks if a Windows Registry key is compatible with dfwinreg. + def BuildFindSpecsFromRegistryArtifact(self, source_key_path): + """Build find specifications from a Windows Registry source type. Args: - key (str): String key to to check for dfwinreg compatibility. + source_key_path (str): Windows Registry key path defined by the source. Returns: - (bool): True if key is compatible or False if not. + list[dfwinreg.FindSpec]: find specifications for the Windows Registry + source type. """ - if key.startswith( - ArtifactDefinitionsFilterHelper._COMPATIBLE_DFWINREG_KEYS): - return True - logging.warning('Key "{0:s}", has a prefix that is not supported ' - 'by dfwinreg presently'.format(key)) - return False + find_specs = [] + for key_path in path_helper.PathHelper.ExpandRecursiveGlobs( + source_key_path, '\\'): + if '%%' in key_path: + logger.error('Unable to expand key path: "{0:s}"'.format(key_path)) + continue + + find_spec = registry_searcher.FindSpec(key_path_glob=key_path) + find_specs.append(find_spec) + + return find_specs diff --git a/plaso/engine/path_helper.py b/plaso/engine/path_helper.py index e47811b300..190c86bcfc 100644 --- a/plaso/engine/path_helper.py +++ b/plaso/engine/path_helper.py @@ -3,18 +3,120 @@ from __future__ import unicode_literals -import logging import re from dfvfs.lib import definitions as dfvfs_definitions +from plaso.engine import logger from plaso.lib import py2to3 class PathHelper(object): """Class that implements the path helper.""" - RECURSIVE_GLOB_LIMIT = 10 + _RECURSIVE_GLOB_LIMIT = 10 + + @classmethod + def AppendPathEntries(cls, path, path_separator, count, skip_first): + """Appends wildcard entries to end of path. + + Will append wildcard * to given path building a list of strings for "count" + iterations, skipping the first directory if skip_first is true. + + Args: + path (str): Path to append wildcards to. + path_separator (str): path segment separator. + count (int): Number of entries to be appended. + skip_first (bool): Whether or not to skip first entry to append. + + Returns: + list[str]: Paths that were expanded from the path with wildcards. + """ + paths = [] + replacement = '{0:s}*'.format(path_separator) + + iteration = 0 + while iteration < count: + if skip_first and iteration == 0: + path += replacement + else: + path += replacement + paths.append(path) + iteration += 1 + + return paths + + @classmethod + def ExpandRecursiveGlobs(cls, path, path_separator): + """Expands recursive like globs present in an artifact path. + + If a path ends in '**', with up to two optional digits such as '**10', + the '**' will recursively match all files and zero or more directories + from the specified path. The optional digits indicate the recursion depth. + By default recursion depth is 10 directories. + + If the glob is followed by the specified path segment separator, only + directories and subdirectories will be matched. + + Args: + path (str): path to be expanded. + path_separator (str): path segment separator. + + Returns: + list[str]: String path expanded for each glob. + """ + glob_regex = r'(.*)?{0}\*\*(\d{{1,2}})?({0})?$'.format( + re.escape(path_separator)) + + match = re.search(glob_regex, path) + if not match: + return [path] + + skip_first = False + if match.group(3): + skip_first = True + if match.group(2): + iterations = int(match.group(2)) + else: + iterations = cls._RECURSIVE_GLOB_LIMIT + logger.warning(( + 'Path "{0:s}" contains fully recursive glob, limiting to 10 ' + 'levels').format(path)) + + return cls.AppendPathEntries( + match.group(1), path_separator, iterations, skip_first) + + @classmethod + def ExpandUsersHomeDirectoryPath(cls, path, user_accounts): + """Expands a path to contain all users home or profile directories. + + Expands the GRR artifacts path variable "%%users.homedir%%". + + Args: + path (str): Windows path with environment variables. + user_accounts (list[UserAccountArtifact]): user accounts. + + Returns: + list [str]: paths returned for user accounts without a drive letter. + """ + path_upper_case = path.upper() + if not path_upper_case.startswith('%%USERS.HOMEDIR%%'): + user_paths = [path] + else: + regex = re.compile(re.escape('%%users.homedir%%')) + + user_paths = [] + for user_account in user_accounts: + user_path = regex.sub(user_account.user_directory, path, re.IGNORECASE) + user_paths.append(user_path) + + # Remove the drive letter, if it exists. + for path_index, user_path in enumerate(user_paths): + if len(user_path) > 2 and user_path[1] == ':': + _, _, user_path = user_path.rpartition(':') + user_paths[path_index] = user_path + + return user_paths @classmethod def ExpandWindowsPath(cls, path, environment_variables): @@ -28,7 +130,7 @@ def ExpandWindowsPath(cls, path, environment_variables): Returns: str: expanded Windows path. """ - #TODO: Add support for items such as %%users.localappdata%% + # TODO: Add support for items such as %%users.localappdata%% if environment_variables is None: environment_variables = [] @@ -50,11 +152,12 @@ def ExpandWindowsPath(cls, path, environment_variables): continue check_for_drive_letter = False - if path_segment.upper().startswith('%%ENVIRON_'): - lookup_key = path_segment.upper()[10:-2] + path_segment_upper_case = path_segment.upper() + if path_segment_upper_case.startswith('%%ENVIRON_'): + lookup_key = path_segment_upper_case[10:-2] check_for_drive_letter = True else: - lookup_key = path_segment.upper()[1:-1] + lookup_key = path_segment_upper_case[1:-1] path_segments[index] = lookup_table.get(lookup_key, path_segment) if check_for_drive_letter: @@ -150,95 +253,3 @@ def GetRelativePathForPathSpec(cls, path_spec, mount_path=None): location = location[len(mount_path):] return location - - @classmethod - def ExpandUserHomeDirectoryPath(cls, path, user_accounts): - """Expands a path to contain all user home directories. - - Args: - path (str): Windows path with environment variables. - user_accounts (list[UserAccountArtifact]): user accounts. - - Returns: - list [str]: paths returned for user accounts. - """ - - user_paths = [] - if path.upper().startswith('%%USERS.HOMEDIR%%'): - regex = re.compile(re.escape('%%users.homedir%%')) - for user_account in user_accounts: - new_path = regex.sub(user_account.user_directory, path) - # Remove the drive letter, if it exists. - if len(new_path) > 2 and new_path[1] == ':': - _, _, new_path = new_path.rpartition(':') - user_paths.append(new_path) - else: - user_paths = [path] - - return user_paths - - @classmethod - def ExpandRecursiveGlobs(cls, path, separator): - """Expands recursive like globs present in an artifact path. - - If a path ends in '**', with up to two optional digits such as '**10', - the '**' will recursively match all files and zero or more directories - from the specified path. The optional digits indicate the recursion depth. - By default recursion depth is 10 directories. - If the glob is followed by the specified separator, only directories and - subdirectories will be matched. - - Args: - path (str): Path to be expanded. - separator (str): Delimiter for this path from its artifact definition. - - Returns: - list[str]: String path expanded for each glob. - """ - glob_regex = r'(.*)?{0}\*\*(\d{{1,2}})?({0})?$'.format(re.escape(separator)) - match = re.search(glob_regex, path) - if match: - skip_first = False - if match.group(3): - skip_first = True - if match.group(2): - iterations = int(match.group(2)) - else: - iterations = cls.RECURSIVE_GLOB_LIMIT - logging.warning('Path "{0:s}" contains fully recursive glob, limiting ' - 'to 10 levels'.format(path)) - paths = cls.AppendPathEntries( - match.group(1), separator, iterations, skip_first) - return paths - else: - return [path] - - @classmethod - def AppendPathEntries(cls, path, separator, count, skip_first): - """Appends wildcard entries to end of path. - - Will append wildcard * to given path building a list of strings for "count" - iterations, skipping the first directory if skip_first is true. - - Args: - path (str): Path to append wildcards to. - separator (str): Delimiter for this path from its artifact definition. - count (int): Number of entries to be appended. - skip_first (bool): Whether or not to skip first entry to append. - - Returns: - list[str]: Paths that were expanded from the path with wildcards. - """ - paths = [] - replacement = '{0}*'.format(separator) - - iteration = 0 - while iteration < count: - if skip_first and iteration == 0: - path += replacement - else: - path += replacement - paths.append(path) - iteration += 1 - - return paths diff --git a/tests/engine/artifact_filters.py b/tests/engine/artifact_filters.py index 225918afe9..53d03742e8 100644 --- a/tests/engine/artifact_filters.py +++ b/tests/engine/artifact_filters.py @@ -26,11 +26,32 @@ from tests import test_lib as shared_test_lib -class BuildFindSpecsFromFileTest(shared_test_lib.BaseTestCase): - """Tests for the BuildFindSpecsFromFile function.""" +class ArtifactDefinitionsFilterHelperTest(shared_test_lib.BaseTestCase): + """Tests for artifact definitions filter helper.""" # pylint: disable=protected-access + def _CreateTestArtifactDefinitionsFilterHelper( + self, artifact_definitions, knowledge_base): + """Creates an artifact definitions filter helper for testing. + + Args: + artifact_definitions (list[str]): artifact definition names to filter. + knowledge_base (KnowledgeBase): contains information from the source + data needed for filtering. + + Returns: + ArtifactDefinitionsFilterHelper: artifact definitions filter helper. + """ + registry = artifacts_registry.ArtifactDefinitionsRegistry() + reader = artifacts_reader.YamlArtifactsReader() + + test_artifacts_path = self._GetTestFilePath(['artifacts']) + registry.ReadFromDirectory(reader, test_artifacts_path) + + return artifact_filters.ArtifactDefinitionsFilterHelper( + registry, artifact_definitions, knowledge_base) + @shared_test_lib.skipUnlessHasTestFile(['artifacts']) @shared_test_lib.skipUnlessHasTestFile(['System.evtx']) @shared_test_lib.skipUnlessHasTestFile(['testdir', 'filter_1.txt']) @@ -38,43 +59,39 @@ class BuildFindSpecsFromFileTest(shared_test_lib.BaseTestCase): def testBuildFindSpecsWithFileSystem(self): """Tests the BuildFindSpecs function for file type artifacts.""" knowledge_base = knowledge_base_engine.KnowledgeBase() + testuser1 = artifacts.UserAccountArtifact( identifier='1000', user_directory='C:\\\\Users\\\\testuser1', username='testuser1') + knowledge_base.AddUserAccount(testuser1) + testuser2 = artifacts.UserAccountArtifact( identifier='1001', user_directory='C:\\\\Users\\\\testuser2', username='testuser2') - knowledge_base.AddUserAccount(testuser1) knowledge_base.AddUserAccount(testuser2) - artifact_definitions = ['TestFiles', 'TestFiles2'] - registry = artifacts_registry.ArtifactDefinitionsRegistry() - reader = artifacts_reader.YamlArtifactsReader() - - registry.ReadFromDirectory(reader, self._GetTestFilePath(['artifacts'])) - - test_filter_file = artifact_filters.ArtifactDefinitionsFilterHelper( - registry, artifact_definitions, knowledge_base) + test_filter_file = self._CreateTestArtifactDefinitionsFilterHelper( + ['TestFiles', 'TestFiles2'], knowledge_base) environment_variable = artifacts.EnvironmentVariableArtifact( case_sensitive=False, name='SystemDrive', value='C:') test_filter_file.BuildFindSpecs( environment_variables=[environment_variable]) - find_specs = knowledge_base.GetValue( - artifact_filters.ArtifactDefinitionsFilterHelper.ARTIFACT_FILTERS) + find_specs_per_source_type = knowledge_base.GetValue( + test_filter_file._KNOWLEDGE_BASE_VALUE) + find_specs = sorted(find_specs_per_source_type.get( + artifact_types.TYPE_INDICATOR_FILE, [])) # Should build 15 FindSpec entries. - self.assertEqual(len(find_specs[artifact_types.TYPE_INDICATOR_FILE]), 15) + self.assertEqual(len(find_specs), 15) # Last entry in find_specs list should be testuser2. - path_segments = ['Users', 'testuser2', 'Documents', 'WindowsPowerShell', - 'profile\\.ps1'] - self.assertEqual( - find_specs[artifact_types.TYPE_INDICATOR_FILE][2]._location_segments, - path_segments) + path_segments = [ + 'Users', 'testuser2', 'Documents', 'WindowsPowerShell', 'profile\\.ps1'] + self.assertEqual(find_specs[11]._location_segments, path_segments) path_spec = path_spec_factory.Factory.NewPathSpec( dfvfs_definitions.TYPE_INDICATOR_OS, location='.') @@ -82,8 +99,7 @@ def testBuildFindSpecsWithFileSystem(self): searcher = file_system_searcher.FileSystemSearcher( file_system, path_spec) - path_spec_generator = searcher.Find( - find_specs=find_specs[artifact_types.TYPE_INDICATOR_FILE]) + path_spec_generator = searcher.Find(find_specs=find_specs) self.assertIsNotNone(path_spec_generator) path_specs = list(path_spec_generator) @@ -99,22 +115,16 @@ def testBuildFindSpecsWithFileSystem(self): def testBuildFindSpecsWithRegistry(self): """Tests the BuildFindSpecs function on Windows Registry artifacts.""" knowledge_base = knowledge_base_engine.KnowledgeBase() - - artifact_definitions = ['TestRegistry'] - registry = artifacts_registry.ArtifactDefinitionsRegistry() - reader = artifacts_reader.YamlArtifactsReader() - - registry.ReadFromDirectory(reader, self._GetTestFilePath(['artifacts'])) - - test_filter_file = artifact_filters.ArtifactDefinitionsFilterHelper( - registry, artifact_definitions, knowledge_base) + test_filter_file = self._CreateTestArtifactDefinitionsFilterHelper( + ['TestRegistry'], knowledge_base) test_filter_file.BuildFindSpecs(environment_variables=None) - find_specs = knowledge_base.GetValue( - artifact_filters.ArtifactDefinitionsFilterHelper.ARTIFACT_FILTERS) + find_specs_per_source_type = knowledge_base.GetValue( + test_filter_file._KNOWLEDGE_BASE_VALUE) + find_specs = sorted(find_specs_per_source_type.get( + artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY, [])) - self.assertEqual( - len(find_specs[artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY]), 1) + self.assertEqual(len(find_specs), 1) win_registry_reader = ( windows_registry_parser.FileObjectWinRegistryFileReader()) @@ -130,35 +140,35 @@ def testBuildFindSpecsWithRegistry(self): win_registry.MapFile(key_path_prefix, registry_file) searcher = dfwinreg_registry_searcher.WinRegistrySearcher(win_registry) - key_paths = list(searcher.Find(find_specs=find_specs[ - artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY])) + key_paths = list(searcher.Find(find_specs=find_specs)) self.assertIsNotNone(key_paths) # Three key paths found. self.assertEqual(len(key_paths), 3) - def test_CheckKeyCompatibility(self): + def testCheckKeyCompatibility(self): """Tests the _CheckKeyCompatibility function""" + knowledge_base = knowledge_base_engine.KnowledgeBase() + test_filter_file = self._CreateTestArtifactDefinitionsFilterHelper( + [], knowledge_base) # Compatible Key. - key = 'HKEY_LOCAL_MACHINE\\SYSTEM\\CurrentControlSet\\Control' - compatible_key = ( - artifact_filters.ArtifactDefinitionsFilterHelper._CheckKeyCompatibility( - key)) + key_path = 'HKEY_LOCAL_MACHINE\\SYSTEM\\CurrentControlSet\\Control' + compatible_key = test_filter_file._CheckKeyCompatibility(key_path) self.assertTrue(compatible_key) # NOT a Compatible Key. - key = 'HKEY_USERS\\S-1-5-18' - compatible_key = ( - artifact_filters.ArtifactDefinitionsFilterHelper._CheckKeyCompatibility( - key)) + key_path = 'HKEY_USERS\\S-1-5-18' + compatible_key = test_filter_file._CheckKeyCompatibility(key_path) self.assertFalse(compatible_key) - def testBuildFindSpecsFromFileArtifact(self): """Tests the BuildFindSpecsFromFileArtifact function for file artifacts.""" - find_specs = {} + knowledge_base = knowledge_base_engine.KnowledgeBase() + test_filter_file = self._CreateTestArtifactDefinitionsFilterHelper( + [], knowledge_base) + separator = '\\' user_accounts = [] @@ -167,41 +177,31 @@ def testBuildFindSpecsFromFileArtifact(self): environment_variable = [artifacts.EnvironmentVariableArtifact( case_sensitive=False, name='SystemRoot', value='C:\\Windows')] - (artifact_filters.ArtifactDefinitionsFilterHelper. - BuildFindSpecsFromFileArtifact( - path_entry, separator, environment_variable, user_accounts, - find_specs)) + find_specs = test_filter_file.BuildFindSpecsFromFileArtifact( + path_entry, separator, environment_variable, user_accounts) # Should build 1 find_spec. - self.assertEqual(len(find_specs[artifact_types.TYPE_INDICATOR_FILE]), 1) + self.assertEqual(len(find_specs), 1) # Location segments should be equivalent to \Windows\test_data\*.evtx. path_segments = ['Windows', 'test\\_data', '.*\\.evtx'] - self.assertEqual( - find_specs[artifact_types.TYPE_INDICATOR_FILE][0]._location_segments, - path_segments) + self.assertEqual(find_specs[0]._location_segments, path_segments) # Test expansion of globs. - find_specs = {} path_entry = '\\test_data\\**' - (artifact_filters.ArtifactDefinitionsFilterHelper. - BuildFindSpecsFromFileArtifact( - path_entry, separator, environment_variable, user_accounts, - find_specs)) + find_specs = test_filter_file.BuildFindSpecsFromFileArtifact( + path_entry, separator, environment_variable, user_accounts) # Glob expansion should by default recurse ten levels. - self.assertEqual(len(find_specs[artifact_types.TYPE_INDICATOR_FILE]), 10) + self.assertEqual(len(find_specs), 10) # Last entry in find_specs list should be 10 levels of depth. path_segments = [ 'test\\_data', '.*', '.*', '.*', '.*', '.*', '.*', '.*', '.*', '.*', '.*'] - self.assertEqual( - find_specs[artifact_types.TYPE_INDICATOR_FILE][9]._location_segments, - path_segments) + self.assertEqual(find_specs[9]._location_segments, path_segments) # Test expansion of user home directories - find_specs = {} separator = '/' testuser1 = artifacts.UserAccountArtifact( user_directory='/homes/testuser1', username='testuser1') @@ -210,22 +210,17 @@ def testBuildFindSpecsFromFileArtifact(self): user_accounts = [testuser1, testuser2] path_entry = '%%users.homedir%%/.thumbnails/**3' - (artifact_filters.ArtifactDefinitionsFilterHelper. - BuildFindSpecsFromFileArtifact( - path_entry, separator, environment_variable, user_accounts, - find_specs)) + find_specs = test_filter_file.BuildFindSpecsFromFileArtifact( + path_entry, separator, environment_variable, user_accounts) # Six total find specs should be created for testuser1 and testuser2. - self.assertEqual(len(find_specs[artifact_types.TYPE_INDICATOR_FILE]), 6) + self.assertEqual(len(find_specs), 6) # Last entry in find_specs list should be testuser2 with a depth of 3 path_segments = ['home', 'testuser2', '\\.thumbnails', '.*', '.*', '.*'] - self.assertEqual( - find_specs[artifact_types.TYPE_INDICATOR_FILE][5]._location_segments, - path_segments) + self.assertEqual(find_specs[5]._location_segments, path_segments) # Test Windows path with profile directories and globs with a depth of 4. - find_specs = {} separator = '\\' testuser1 = artifacts.UserAccountArtifact( user_directory='\\Users\\\\testuser1', username='testuser1') @@ -234,19 +229,15 @@ def testBuildFindSpecsFromFileArtifact(self): user_accounts = [testuser1, testuser2] path_entry = '%%users.homedir%%\\AppData\\**4' - (artifact_filters.ArtifactDefinitionsFilterHelper. - BuildFindSpecsFromFileArtifact( - path_entry, separator, environment_variable, user_accounts, - find_specs)) + find_specs = test_filter_file.BuildFindSpecsFromFileArtifact( + path_entry, separator, environment_variable, user_accounts) # Eight find specs should be created for testuser1 and testuser2. - self.assertEqual(len(find_specs[artifact_types.TYPE_INDICATOR_FILE]), 8) + self.assertEqual(len(find_specs), 8) # Last entry in find_specs list should be testuser2, with a depth of 4. path_segments = ['Users', 'testuser2', 'AppData', '.*', '.*', '.*', '.*'] - self.assertEqual( - find_specs[artifact_types.TYPE_INDICATOR_FILE][7]._location_segments, - path_segments) + self.assertEqual(find_specs[7]._location_segments, path_segments) if __name__ == '__main__': diff --git a/tests/engine/path_helper.py b/tests/engine/path_helper.py index f882fd8c7b..b89aafa2c7 100644 --- a/tests/engine/path_helper.py +++ b/tests/engine/path_helper.py @@ -19,6 +19,125 @@ class PathHelperTest(shared_test_lib.BaseTestCase): """Tests for the path helper.""" + def testAppendPathEntries(self): + """Tests the AppendPathEntries function.""" + separator = '\\' + path = '\\Windows\\Test' + + # Test depth of ten skipping first entry. + # The path will have 9 entries as the default depth for ** is 10, but the + # first entry is being skipped. + count = 10 + skip_first = True + paths = path_helper.PathHelper.AppendPathEntries( + path, separator, count, skip_first) + + # Nine paths returned + self.assertEqual(len(paths), 9) + + # Nine paths in total, each one level deeper than the previous. + check_paths = sorted([ + '\\Windows\\Test\\*\\*', + '\\Windows\\Test\\*\\*\\*', + '\\Windows\\Test\\*\\*\\*\\*', + '\\Windows\\Test\\*\\*\\*\\*\\*', + '\\Windows\\Test\\*\\*\\*\\*\\*\\*', + '\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*', + '\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*\\*', + '\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*\\*\\*', + '\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*\\*\\*\\*']) + self.assertEqual(sorted(paths), check_paths) + + # Now test with skip_first set to False, but only a depth of 4. + # the path will have a total of 4 entries. + count = 4 + skip_first = False + paths = path_helper.PathHelper.AppendPathEntries( + path, separator, count, skip_first) + + # Four paths returned + self.assertEqual(len(paths), 4) + + # Four paths in total, each one level deeper than the previous. + check_paths = sorted([ + '\\Windows\\Test\\*', + '\\Windows\\Test\\*\\*', + '\\Windows\\Test\\*\\*\\*', + '\\Windows\\Test\\*\\*\\*\\*']) + self.assertEqual(sorted(paths), check_paths) + + def testExpandRecursiveGlobs(self): + """Tests the _ExpandRecursiveGlobs function.""" + separator = '/' + + # Test a path with a trailing /, which means first directory is skipped. + # The path will have 9 entries as the default depth for ** is 10, but the + # first entry is being skipped. + path = '/etc/sysconfig/**/' + paths = path_helper.PathHelper.ExpandRecursiveGlobs(path, separator) + + # Nine paths returned + self.assertEqual(len(paths), 9) + + # Nine paths in total, each one level deeper than the previous. + check_paths = sorted([ + '/etc/sysconfig/*/*', + '/etc/sysconfig/*/*/*', + '/etc/sysconfig/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*/*/*/*/*']) + self.assertEqual(sorted(paths), check_paths) + + # Now test with no trailing separator, but only a depth of 4. + # the path will have a total of 4 entries. + path = '/etc/sysconfig/**4' + paths = path_helper.PathHelper.ExpandRecursiveGlobs(path, separator) + + # Four paths returned + self.assertEqual(len(paths), 4) + + # Four paths in total, each one level deeper than the previous. + check_paths = sorted([ + '/etc/sysconfig/*', + '/etc/sysconfig/*/*', + '/etc/sysconfig/*/*/*', + '/etc/sysconfig/*/*/*/*']) + self.assertEqual(sorted(paths), check_paths) + + def testExpandUsersHomeDirectoryPath(self): + """Tests the ExpandUsersHomeDirectoryPath function.""" + user_account_artifact1 = artifacts.UserAccountArtifact( + user_directory='C:\\Users\\Test1', username='Test1') + user_account_artifact2 = artifacts.UserAccountArtifact( + user_directory='C:\\Users\\Test2', username='Test2') + + path = '%%users.homedir%%\\Profile' + expanded_paths = path_helper.PathHelper.ExpandUsersHomeDirectoryPath( + path, [user_account_artifact1, user_account_artifact2]) + + expected_expanded_paths = [ + '\\Users\\Test1\\Profile', + '\\Users\\Test2\\Profile'] + self.assertEqual(expanded_paths, expected_expanded_paths) + + path = 'C:\\Temp' + expanded_paths = path_helper.PathHelper.ExpandUsersHomeDirectoryPath( + path, [user_account_artifact1, user_account_artifact2]) + + expected_expanded_paths = ['\\Temp'] + self.assertEqual(expanded_paths, expected_expanded_paths) + + path = 'C:\\Temp\\%%users.homedir%%' + expanded_paths = path_helper.PathHelper.ExpandUsersHomeDirectoryPath( + path, [user_account_artifact1, user_account_artifact2]) + + expected_expanded_paths = ['\\Temp\\%%users.homedir%%'] + self.assertEqual(expanded_paths, expected_expanded_paths) + def testExpandWindowsPath(self): """Tests the ExpandWindowsPath function.""" environment_variable = artifacts.EnvironmentVariableArtifact( @@ -146,95 +265,6 @@ def testGetRelativePathForPathSpec(self): qcow_path_spec) self.assertIsNone(display_name) - def testAppendPathEntries(self): - """Tests the AppendPathEntries function.""" - separator = '\\' - path = '\\Windows\\Test' - - # Test depth of ten skipping first entry. - # The path will have 9 entries as the default depth for ** is 10, but the - # first entry is being skipped. - count = 10 - skip_first = True - paths = path_helper.PathHelper.AppendPathEntries( - path, separator, count, skip_first) - - # Nine paths returned - self.assertEqual(len(paths), 9) - - # Nine paths in total, each one level deeper than the previous. - check_paths = [ - '\\Windows\\Test\\*\\*', - '\\Windows\\Test\\*\\*\\*', - '\\Windows\\Test\\*\\*\\*\\*', - '\\Windows\\Test\\*\\*\\*\\*\\*', - '\\Windows\\Test\\*\\*\\*\\*\\*\\*', - '\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*', - '\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*\\*', - '\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*\\*\\*', - '\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*\\*\\*\\*'] - self.assertItemsEqual(paths, check_paths) - - # Now test with skip_first set to False, but only a depth of 4. - # the path will have a total of 4 entries. - count = 4 - skip_first = False - paths = path_helper.PathHelper.AppendPathEntries( - path, separator, count, skip_first) - - # Four paths returned - self.assertEqual(len(paths), 4) - - # Four paths in total, each one level deeper than the previous. - check_paths = [ - '\\Windows\\Test\\*', - '\\Windows\\Test\\*\\*', - '\\Windows\\Test\\*\\*\\*', - '\\Windows\\Test\\*\\*\\*\\*'] - self.assertItemsEqual(paths, check_paths) - - def testExpandRecursiveGlobs(self): - """Tests the _ExpandRecursiveGlobs function.""" - separator = '/' - - # Test a path with a trailing /, which means first directory is skipped. - # The path will have 9 entries as the default depth for ** is 10, but the - # first entry is being skipped. - path = '/etc/sysconfig/**/' - paths = path_helper.PathHelper.ExpandRecursiveGlobs(path, separator) - - # Nine paths returned - self.assertEqual(len(paths), 9) - - # Nine paths in total, each one level deeper than the previous. - check_paths = [ - '/etc/sysconfig/*/*', - '/etc/sysconfig/*/*/*', - '/etc/sysconfig/*/*/*/*', - '/etc/sysconfig/*/*/*/*/*', - '/etc/sysconfig/*/*/*/*/*/*', - '/etc/sysconfig/*/*/*/*/*/*/*', - '/etc/sysconfig/*/*/*/*/*/*/*/*', - '/etc/sysconfig/*/*/*/*/*/*/*/*/*', - '/etc/sysconfig/*/*/*/*/*/*/*/*/*/*'] - self.assertItemsEqual(paths, check_paths) - - # Now test with no trailing separator, but only a depth of 4. - # the path will have a total of 4 entries. - path = '/etc/sysconfig/**4' - paths = path_helper.PathHelper.ExpandRecursiveGlobs(path, separator) - - # Four paths returned - self.assertEqual(len(paths), 4) - - # Four paths in total, each one level deeper than the previous. - check_paths = [ - '/etc/sysconfig/*', - '/etc/sysconfig/*/*', - '/etc/sysconfig/*/*/*', - '/etc/sysconfig/*/*/*/*'] - self.assertItemsEqual(paths, check_paths) - if __name__ == '__main__': unittest.main() From 10c448a8aeea690a8e20dc0a563dfea206e6c85d Mon Sep 17 00:00:00 2001 From: Joachim Metz Date: Sat, 9 Jun 2018 20:19:12 +0200 Subject: [PATCH 17/17] Changes for review --- tests/engine/artifact_filters.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/engine/artifact_filters.py b/tests/engine/artifact_filters.py index 53d03742e8..41a43e0e88 100644 --- a/tests/engine/artifact_filters.py +++ b/tests/engine/artifact_filters.py @@ -82,16 +82,18 @@ def testBuildFindSpecsWithFileSystem(self): environment_variables=[environment_variable]) find_specs_per_source_type = knowledge_base.GetValue( test_filter_file._KNOWLEDGE_BASE_VALUE) - find_specs = sorted(find_specs_per_source_type.get( - artifact_types.TYPE_INDICATOR_FILE, [])) + find_specs = find_specs_per_source_type.get( + artifact_types.TYPE_INDICATOR_FILE, []) # Should build 15 FindSpec entries. self.assertEqual(len(find_specs), 15) - # Last entry in find_specs list should be testuser2. + # Last find_spec should contain the testuser2 profile path. + location_segments = sorted([ + find_spec._location_segments for find_spec in find_specs]) path_segments = [ 'Users', 'testuser2', 'Documents', 'WindowsPowerShell', 'profile\\.ps1'] - self.assertEqual(find_specs[11]._location_segments, path_segments) + self.assertEqual(location_segments[2], path_segments) path_spec = path_spec_factory.Factory.NewPathSpec( dfvfs_definitions.TYPE_INDICATOR_OS, location='.') @@ -121,8 +123,8 @@ def testBuildFindSpecsWithRegistry(self): test_filter_file.BuildFindSpecs(environment_variables=None) find_specs_per_source_type = knowledge_base.GetValue( test_filter_file._KNOWLEDGE_BASE_VALUE) - find_specs = sorted(find_specs_per_source_type.get( - artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY, [])) + find_specs = find_specs_per_source_type.get( + artifact_types.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY, []) self.assertEqual(len(find_specs), 1)