From fca48ea73cc379d1e3054c2472ebbe876334be35 Mon Sep 17 00:00:00 2001 From: Joachim Metz Date: Sat, 22 Jun 2019 19:42:50 +0200 Subject: [PATCH] Changes for globstar expansion #2481 (#2634) --- plaso/engine/artifact_filters.py | 4 +- plaso/engine/path_helper.py | 112 +++++++++----------- test_data/artifacts/artifacts_filters.yaml | 4 +- tests/engine/artifact_filters.py | 6 +- tests/engine/path_helper.py | 114 ++++++++------------- 5 files changed, 98 insertions(+), 142 deletions(-) diff --git a/plaso/engine/artifact_filters.py b/plaso/engine/artifact_filters.py index 47963d03ab..03917ab78f 100644 --- a/plaso/engine/artifact_filters.py +++ b/plaso/engine/artifact_filters.py @@ -147,7 +147,7 @@ def _BuildFindSpecsFromRegistrySourceKey(self, key_path): source type. """ find_specs = [] - for key_path_glob in path_helper.PathHelper.ExpandRecursiveGlobs( + for key_path_glob in path_helper.PathHelper.ExpandGlobStars( key_path, '\\'): logger.debug('building find spec from key path glob: {0:s}'.format( key_path_glob)) @@ -177,7 +177,7 @@ def _BuildFindSpecsFromFileSourcePath( list[dfvfs.FindSpec]: find specifications for the file source type. """ find_specs = [] - for path_glob in path_helper.PathHelper.ExpandRecursiveGlobs( + for path_glob in path_helper.PathHelper.ExpandGlobStars( source_path, path_separator): logger.debug('building find spec from path glob: {0:s}'.format( path_glob)) diff --git a/plaso/engine/path_helper.py b/plaso/engine/path_helper.py index d26b2347fe..1a7d8d2b88 100644 --- a/plaso/engine/path_helper.py +++ b/plaso/engine/path_helper.py @@ -3,8 +3,6 @@ from __future__ import unicode_literals -import re - from dfvfs.lib import definitions as dfvfs_definitions from plaso.engine import logger @@ -144,51 +142,15 @@ def _IsWindowsDrivePathSegment(cls, path_segment): return path_segment in ('%%ENVIRON_SYSTEMDRIVE%%', '%SYSTEMDRIVE%') @classmethod - def AppendPathEntries( - cls, path, path_separator, number_of_wildcards, skip_first): - """Appends glob wildcards to a path. - - This function will append glob wildcards "*" to a path, returning paths - with an additional glob wildcard up to the specified number. E.g. given - the path "/tmp" and a number of 2 wildcards, this function will return - "tmp/*", "tmp/*/*". When skip_first is true the path with the first - wildcard is not returned as a result. - - Args: - path (str): path to append glob wildcards to. - path_separator (str): path segment separator. - number_of_wildcards (int): number of glob wildcards to append. - skip_first (bool): True if the the first path with glob wildcard should - be skipped as a result. - - Returns: - list[str]: paths with glob wildcards. - """ - if path[-1] == path_separator: - path = path[:-1] + def ExpandGlobStars(cls, path, path_separator): + """Expands globstars "**" in a path. - if skip_first: - path = ''.join([path, path_separator, '*']) - number_of_wildcards -= 1 + A globstar "**" will recursively match all files and zero or more + directories and subdirectories. - paths = [] - for _ in range(0, number_of_wildcards): - path = ''.join([path, path_separator, '*']) - paths.append(path) - - return paths - - @classmethod - def ExpandRecursiveGlobs(cls, path, path_separator): - """Expands recursive like globs present in an artifact path. - - If a path ends in '**', with up to two optional digits such as '**10', - the '**' will recursively match all files and zero or more directories - from the specified path. The optional digits indicate the recursion depth. - By default recursion depth is 10 directories. - - If the glob is followed by the specified path segment separator, only - directories and subdirectories will be matched. + By default the maximum recursion depth is 10 subdirectories, a numeric + values after the globstar, such as "**5", can be used to define the maximum + recursion depth. Args: path (str): path to be expanded. @@ -197,26 +159,48 @@ def ExpandRecursiveGlobs(cls, path, path_separator): Returns: list[str]: String path expanded for each glob. """ - glob_regex = r'(.*)?{0:s}\*\*(\d{{1,2}})?({0:s})?$'.format( - re.escape(path_separator)) - - match = re.search(glob_regex, path) - if not match: - return [path] - - skip_first = False - if match.group(3): - skip_first = True - if match.group(2): - iterations = int(match.group(2)) - else: - iterations = cls._RECURSIVE_GLOB_LIMIT - logger.warning(( - 'Path "{0:s}" contains fully recursive glob, limiting to 10 ' - 'levels').format(path)) + expanded_paths = [] - return cls.AppendPathEntries( - match.group(1), path_separator, iterations, skip_first) + path_segments = path.split(path_separator) + last_segment_index = len(path_segments) - 1 + for segment_index, path_segment in enumerate(path_segments): + recursion_depth = None + if path_segment.startswith('**'): + if len(path_segment) == 2: + recursion_depth = 10 + else: + try: + recursion_depth = int(path_segment[2:], 10) + except (TypeError, ValueError): + logger.warning(( + 'Globstar with suffix "{0:s}" in path "{1:s}" not ' + 'supported.').format(path_segment, path)) + + elif '**' in path_segment: + logger.warning(( + 'Globstar with prefix "{0:s}" in path "{1:s}" not ' + 'supported.').format(path_segment, path)) + + if recursion_depth is not None: + if recursion_depth <= 1 or recursion_depth > cls._RECURSIVE_GLOB_LIMIT: + logger.warning(( + 'Globstar "{0:s}" in path "{1:s}" exceed recursion maximum ' + 'recursion depth, limiting to: {2:d}.').format( + path_segment, path, cls._RECURSIVE_GLOB_LIMIT)) + recursion_depth = cls._RECURSIVE_GLOB_LIMIT + + next_segment_index = segment_index + 1 + for expanded_path_segment in [ + ['*'] * depth for depth in range(1, recursion_depth + 1)]: + expanded_path_segments = list(path_segments[:segment_index]) + expanded_path_segments.extend(expanded_path_segment) + if next_segment_index <= last_segment_index: + expanded_path_segments.extend(path_segments[next_segment_index:]) + + expanded_path = path_separator.join(expanded_path_segments) + expanded_paths.append(expanded_path) + + return expanded_paths or [path] @classmethod def ExpandUsersVariablePath(cls, path, path_separator, user_accounts): diff --git a/test_data/artifacts/artifacts_filters.yaml b/test_data/artifacts/artifacts_filters.yaml index a14e682dda..4b2149e538 100644 --- a/test_data/artifacts/artifacts_filters.yaml +++ b/test_data/artifacts/artifacts_filters.yaml @@ -42,7 +42,7 @@ sources: - '%%users.homedir%%\Documents\WindowsPowerShell\profile.ps1' - '\test_data\testdir\filter_*.txt' - '\does_not_exist\some_file_*.txt' - - '\globbed\test\path\**\' + - '\globbed\test\path\**' - 'failing' separator: '\' labels: [System] @@ -109,4 +109,4 @@ sources: paths: ['\a_directory\*_file'] separator: '\' labels: [System] -supported_os: [Windows] \ No newline at end of file +supported_os: [Windows] diff --git a/tests/engine/artifact_filters.py b/tests/engine/artifact_filters.py index 93f0b85f4a..eedee43f76 100644 --- a/tests/engine/artifact_filters.py +++ b/tests/engine/artifact_filters.py @@ -103,9 +103,8 @@ def testBuildFindSpecsWithFileSystem(self): test_filters_helper.BuildFindSpecs( artifact_filter_names, environment_variables=[environment_variable]) - # There should be 15 file system find specifications. self.assertEqual( - len(test_filters_helper.included_file_system_find_specs), 15) + len(test_filters_helper.included_file_system_find_specs), 16) self.assertEqual(len(test_filters_helper.registry_find_specs), 0) # Last find_spec should contain the testuser2 profile path. @@ -157,9 +156,8 @@ def testBuildFindSpecsWithFileSystemAndGroup(self): test_filters_helper.BuildFindSpecs( artifact_filter_names, environment_variables=[environment_variable]) - # There should be 15 file system find specifications. self.assertEqual( - len(test_filters_helper.included_file_system_find_specs), 15) + len(test_filters_helper.included_file_system_find_specs), 16) self.assertEqual(len(test_filters_helper.registry_find_specs), 0) path_spec = path_spec_factory.Factory.NewPathSpec( diff --git a/tests/engine/path_helper.py b/tests/engine/path_helper.py index 1ca65188b3..895a26faf2 100644 --- a/tests/engine/path_helper.py +++ b/tests/engine/path_helper.py @@ -121,68 +121,14 @@ def testIsWindowsDrivePathSegment(self): result = path_helper.PathHelper._IsWindowsDrivePathSegment('Windows') self.assertFalse(result) - def testAppendPathEntries(self): - """Tests the AppendPathEntries function.""" - separator = '\\' - path = '\\Windows\\Test' - - # Test depth of ten skipping first entry. - # The path will have 9 entries as the default depth for ** is 10, but the - # first entry is being skipped. - count = 10 - skip_first = True - paths = path_helper.PathHelper.AppendPathEntries( - path, separator, count, skip_first) - - # Nine paths returned - self.assertEqual(len(paths), 9) - - # Nine paths in total, each one level deeper than the previous. - check_paths = sorted([ - '\\Windows\\Test\\*\\*', - '\\Windows\\Test\\*\\*\\*', - '\\Windows\\Test\\*\\*\\*\\*', - '\\Windows\\Test\\*\\*\\*\\*\\*', - '\\Windows\\Test\\*\\*\\*\\*\\*\\*', - '\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*', - '\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*\\*', - '\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*\\*\\*', - '\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*\\*\\*\\*']) - self.assertEqual(sorted(paths), check_paths) - - # Now test with skip_first set to False, but only a depth of 4. - # the path will have a total of 4 entries. - count = 4 - skip_first = False - paths = path_helper.PathHelper.AppendPathEntries( - path, separator, count, skip_first) - - # Four paths returned - self.assertEqual(len(paths), 4) + def testExpandGlobStars(self): + """Tests the ExpandGlobStars function.""" + paths = path_helper.PathHelper.ExpandGlobStars('/etc/sysconfig/**', '/') + + self.assertEqual(len(paths), 10) - # Four paths in total, each one level deeper than the previous. - check_paths = sorted([ - '\\Windows\\Test\\*', - '\\Windows\\Test\\*\\*', - '\\Windows\\Test\\*\\*\\*', - '\\Windows\\Test\\*\\*\\*\\*']) - self.assertEqual(sorted(paths), check_paths) - - def testExpandRecursiveGlobs(self): - """Tests the _ExpandRecursiveGlobs function.""" - separator = '/' - - # Test a path with a trailing /, which means first directory is skipped. - # The path will have 9 entries as the default depth for ** is 10, but the - # first entry is being skipped. - path = '/etc/sysconfig/**/' - paths = path_helper.PathHelper.ExpandRecursiveGlobs(path, separator) - - # Nine paths returned - self.assertEqual(len(paths), 9) - - # Nine paths in total, each one level deeper than the previous. - check_paths = sorted([ + expected_paths = sorted([ + '/etc/sysconfig/*', '/etc/sysconfig/*/*', '/etc/sysconfig/*/*/*', '/etc/sysconfig/*/*/*/*', @@ -192,23 +138,51 @@ def testExpandRecursiveGlobs(self): '/etc/sysconfig/*/*/*/*/*/*/*/*', '/etc/sysconfig/*/*/*/*/*/*/*/*/*', '/etc/sysconfig/*/*/*/*/*/*/*/*/*/*']) - self.assertEqual(sorted(paths), check_paths) + self.assertEqual(sorted(paths), expected_paths) - # Now test with no trailing separator, but only a depth of 4. - # the path will have a total of 4 entries. - path = '/etc/sysconfig/**4' - paths = path_helper.PathHelper.ExpandRecursiveGlobs(path, separator) + # Test globstar with recursion depth of 4. + paths = path_helper.PathHelper.ExpandGlobStars('/etc/sysconfig/**4', '/') - # Four paths returned self.assertEqual(len(paths), 4) - # Four paths in total, each one level deeper than the previous. - check_paths = sorted([ + expected_paths = sorted([ '/etc/sysconfig/*', '/etc/sysconfig/*/*', '/etc/sysconfig/*/*/*', '/etc/sysconfig/*/*/*/*']) - self.assertEqual(sorted(paths), check_paths) + self.assertEqual(sorted(paths), expected_paths) + + # Test globstar with unsupported recursion depth of 99. + paths = path_helper.PathHelper.ExpandGlobStars('/etc/sysconfig/**99', '/') + + self.assertEqual(len(paths), 10) + + expected_paths = sorted([ + '/etc/sysconfig/*', + '/etc/sysconfig/*/*', + '/etc/sysconfig/*/*/*', + '/etc/sysconfig/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*/*/*/*/*']) + self.assertEqual(sorted(paths), expected_paths) + + # Test globstar with prefix. + paths = path_helper.PathHelper.ExpandGlobStars('/etc/sysconfig/my**', '/') + + self.assertEqual(len(paths), 1) + + self.assertEqual(paths, ['/etc/sysconfig/my**']) + + # Test globstar with suffix. + paths = path_helper.PathHelper.ExpandGlobStars('/etc/sysconfig/**.exe', '/') + + self.assertEqual(len(paths), 1) + + self.assertEqual(paths, ['/etc/sysconfig/**.exe']) def testExpandUsersVariablePath(self): """Tests the ExpandUsersVariablePath function."""