Skip to content

Commit

Permalink
Changes for globstar expansion #2481 (#2634)
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz authored and Onager committed Jun 22, 2019
1 parent 2392ffc commit fca48ea
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 142 deletions.
4 changes: 2 additions & 2 deletions plaso/engine/artifact_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def _BuildFindSpecsFromRegistrySourceKey(self, key_path):
source type.
"""
find_specs = []
for key_path_glob in path_helper.PathHelper.ExpandRecursiveGlobs(
for key_path_glob in path_helper.PathHelper.ExpandGlobStars(
key_path, '\\'):
logger.debug('building find spec from key path glob: {0:s}'.format(
key_path_glob))
Expand Down Expand Up @@ -177,7 +177,7 @@ def _BuildFindSpecsFromFileSourcePath(
list[dfvfs.FindSpec]: find specifications for the file source type.
"""
find_specs = []
for path_glob in path_helper.PathHelper.ExpandRecursiveGlobs(
for path_glob in path_helper.PathHelper.ExpandGlobStars(
source_path, path_separator):
logger.debug('building find spec from path glob: {0:s}'.format(
path_glob))
Expand Down
112 changes: 48 additions & 64 deletions plaso/engine/path_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

from __future__ import unicode_literals

import re

from dfvfs.lib import definitions as dfvfs_definitions

from plaso.engine import logger
Expand Down Expand Up @@ -144,51 +142,15 @@ def _IsWindowsDrivePathSegment(cls, path_segment):
return path_segment in ('%%ENVIRON_SYSTEMDRIVE%%', '%SYSTEMDRIVE%')

@classmethod
def AppendPathEntries(
cls, path, path_separator, number_of_wildcards, skip_first):
"""Appends glob wildcards to a path.
This function will append glob wildcards "*" to a path, returning paths
with an additional glob wildcard up to the specified number. E.g. given
the path "/tmp" and a number of 2 wildcards, this function will return
"tmp/*", "tmp/*/*". When skip_first is true the path with the first
wildcard is not returned as a result.
Args:
path (str): path to append glob wildcards to.
path_separator (str): path segment separator.
number_of_wildcards (int): number of glob wildcards to append.
skip_first (bool): True if the the first path with glob wildcard should
be skipped as a result.
Returns:
list[str]: paths with glob wildcards.
"""
if path[-1] == path_separator:
path = path[:-1]
def ExpandGlobStars(cls, path, path_separator):
"""Expands globstars "**" in a path.
if skip_first:
path = ''.join([path, path_separator, '*'])
number_of_wildcards -= 1
A globstar "**" will recursively match all files and zero or more
directories and subdirectories.
paths = []
for _ in range(0, number_of_wildcards):
path = ''.join([path, path_separator, '*'])
paths.append(path)

return paths

@classmethod
def ExpandRecursiveGlobs(cls, path, path_separator):
"""Expands recursive like globs present in an artifact path.
If a path ends in '**', with up to two optional digits such as '**10',
the '**' will recursively match all files and zero or more directories
from the specified path. The optional digits indicate the recursion depth.
By default recursion depth is 10 directories.
If the glob is followed by the specified path segment separator, only
directories and subdirectories will be matched.
By default the maximum recursion depth is 10 subdirectories, a numeric
values after the globstar, such as "**5", can be used to define the maximum
recursion depth.
Args:
path (str): path to be expanded.
Expand All @@ -197,26 +159,48 @@ def ExpandRecursiveGlobs(cls, path, path_separator):
Returns:
list[str]: String path expanded for each glob.
"""
glob_regex = r'(.*)?{0:s}\*\*(\d{{1,2}})?({0:s})?$'.format(
re.escape(path_separator))

match = re.search(glob_regex, path)
if not match:
return [path]

skip_first = False
if match.group(3):
skip_first = True
if match.group(2):
iterations = int(match.group(2))
else:
iterations = cls._RECURSIVE_GLOB_LIMIT
logger.warning((
'Path "{0:s}" contains fully recursive glob, limiting to 10 '
'levels').format(path))
expanded_paths = []

return cls.AppendPathEntries(
match.group(1), path_separator, iterations, skip_first)
path_segments = path.split(path_separator)
last_segment_index = len(path_segments) - 1
for segment_index, path_segment in enumerate(path_segments):
recursion_depth = None
if path_segment.startswith('**'):
if len(path_segment) == 2:
recursion_depth = 10
else:
try:
recursion_depth = int(path_segment[2:], 10)
except (TypeError, ValueError):
logger.warning((
'Globstar with suffix "{0:s}" in path "{1:s}" not '
'supported.').format(path_segment, path))

elif '**' in path_segment:
logger.warning((
'Globstar with prefix "{0:s}" in path "{1:s}" not '
'supported.').format(path_segment, path))

if recursion_depth is not None:
if recursion_depth <= 1 or recursion_depth > cls._RECURSIVE_GLOB_LIMIT:
logger.warning((
'Globstar "{0:s}" in path "{1:s}" exceed recursion maximum '
'recursion depth, limiting to: {2:d}.').format(
path_segment, path, cls._RECURSIVE_GLOB_LIMIT))
recursion_depth = cls._RECURSIVE_GLOB_LIMIT

next_segment_index = segment_index + 1
for expanded_path_segment in [
['*'] * depth for depth in range(1, recursion_depth + 1)]:
expanded_path_segments = list(path_segments[:segment_index])
expanded_path_segments.extend(expanded_path_segment)
if next_segment_index <= last_segment_index:
expanded_path_segments.extend(path_segments[next_segment_index:])

expanded_path = path_separator.join(expanded_path_segments)
expanded_paths.append(expanded_path)

return expanded_paths or [path]

@classmethod
def ExpandUsersVariablePath(cls, path, path_separator, user_accounts):
Expand Down
4 changes: 2 additions & 2 deletions test_data/artifacts/artifacts_filters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ sources:
- '%%users.homedir%%\Documents\WindowsPowerShell\profile.ps1'
- '\test_data\testdir\filter_*.txt'
- '\does_not_exist\some_file_*.txt'
- '\globbed\test\path\**\'
- '\globbed\test\path\**'
- 'failing'
separator: '\'
labels: [System]
Expand Down Expand Up @@ -109,4 +109,4 @@ sources:
paths: ['\a_directory\*_file']
separator: '\'
labels: [System]
supported_os: [Windows]
supported_os: [Windows]
6 changes: 2 additions & 4 deletions tests/engine/artifact_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,8 @@ def testBuildFindSpecsWithFileSystem(self):
test_filters_helper.BuildFindSpecs(
artifact_filter_names, environment_variables=[environment_variable])

# There should be 15 file system find specifications.
self.assertEqual(
len(test_filters_helper.included_file_system_find_specs), 15)
len(test_filters_helper.included_file_system_find_specs), 16)
self.assertEqual(len(test_filters_helper.registry_find_specs), 0)

# Last find_spec should contain the testuser2 profile path.
Expand Down Expand Up @@ -157,9 +156,8 @@ def testBuildFindSpecsWithFileSystemAndGroup(self):
test_filters_helper.BuildFindSpecs(
artifact_filter_names, environment_variables=[environment_variable])

# There should be 15 file system find specifications.
self.assertEqual(
len(test_filters_helper.included_file_system_find_specs), 15)
len(test_filters_helper.included_file_system_find_specs), 16)
self.assertEqual(len(test_filters_helper.registry_find_specs), 0)

path_spec = path_spec_factory.Factory.NewPathSpec(
Expand Down
114 changes: 44 additions & 70 deletions tests/engine/path_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,68 +121,14 @@ def testIsWindowsDrivePathSegment(self):
result = path_helper.PathHelper._IsWindowsDrivePathSegment('Windows')
self.assertFalse(result)

def testAppendPathEntries(self):
"""Tests the AppendPathEntries function."""
separator = '\\'
path = '\\Windows\\Test'

# Test depth of ten skipping first entry.
# The path will have 9 entries as the default depth for ** is 10, but the
# first entry is being skipped.
count = 10
skip_first = True
paths = path_helper.PathHelper.AppendPathEntries(
path, separator, count, skip_first)

# Nine paths returned
self.assertEqual(len(paths), 9)

# Nine paths in total, each one level deeper than the previous.
check_paths = sorted([
'\\Windows\\Test\\*\\*',
'\\Windows\\Test\\*\\*\\*',
'\\Windows\\Test\\*\\*\\*\\*',
'\\Windows\\Test\\*\\*\\*\\*\\*',
'\\Windows\\Test\\*\\*\\*\\*\\*\\*',
'\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*',
'\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*\\*',
'\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*\\*\\*',
'\\Windows\\Test\\*\\*\\*\\*\\*\\*\\*\\*\\*\\*'])
self.assertEqual(sorted(paths), check_paths)

# Now test with skip_first set to False, but only a depth of 4.
# the path will have a total of 4 entries.
count = 4
skip_first = False
paths = path_helper.PathHelper.AppendPathEntries(
path, separator, count, skip_first)

# Four paths returned
self.assertEqual(len(paths), 4)
def testExpandGlobStars(self):
"""Tests the ExpandGlobStars function."""
paths = path_helper.PathHelper.ExpandGlobStars('/etc/sysconfig/**', '/')

self.assertEqual(len(paths), 10)

# Four paths in total, each one level deeper than the previous.
check_paths = sorted([
'\\Windows\\Test\\*',
'\\Windows\\Test\\*\\*',
'\\Windows\\Test\\*\\*\\*',
'\\Windows\\Test\\*\\*\\*\\*'])
self.assertEqual(sorted(paths), check_paths)

def testExpandRecursiveGlobs(self):
"""Tests the _ExpandRecursiveGlobs function."""
separator = '/'

# Test a path with a trailing /, which means first directory is skipped.
# The path will have 9 entries as the default depth for ** is 10, but the
# first entry is being skipped.
path = '/etc/sysconfig/**/'
paths = path_helper.PathHelper.ExpandRecursiveGlobs(path, separator)

# Nine paths returned
self.assertEqual(len(paths), 9)

# Nine paths in total, each one level deeper than the previous.
check_paths = sorted([
expected_paths = sorted([
'/etc/sysconfig/*',
'/etc/sysconfig/*/*',
'/etc/sysconfig/*/*/*',
'/etc/sysconfig/*/*/*/*',
Expand All @@ -192,23 +138,51 @@ def testExpandRecursiveGlobs(self):
'/etc/sysconfig/*/*/*/*/*/*/*/*',
'/etc/sysconfig/*/*/*/*/*/*/*/*/*',
'/etc/sysconfig/*/*/*/*/*/*/*/*/*/*'])
self.assertEqual(sorted(paths), check_paths)
self.assertEqual(sorted(paths), expected_paths)

# Now test with no trailing separator, but only a depth of 4.
# the path will have a total of 4 entries.
path = '/etc/sysconfig/**4'
paths = path_helper.PathHelper.ExpandRecursiveGlobs(path, separator)
# Test globstar with recursion depth of 4.
paths = path_helper.PathHelper.ExpandGlobStars('/etc/sysconfig/**4', '/')

# Four paths returned
self.assertEqual(len(paths), 4)

# Four paths in total, each one level deeper than the previous.
check_paths = sorted([
expected_paths = sorted([
'/etc/sysconfig/*',
'/etc/sysconfig/*/*',
'/etc/sysconfig/*/*/*',
'/etc/sysconfig/*/*/*/*'])
self.assertEqual(sorted(paths), check_paths)
self.assertEqual(sorted(paths), expected_paths)

# Test globstar with unsupported recursion depth of 99.
paths = path_helper.PathHelper.ExpandGlobStars('/etc/sysconfig/**99', '/')

self.assertEqual(len(paths), 10)

expected_paths = sorted([
'/etc/sysconfig/*',
'/etc/sysconfig/*/*',
'/etc/sysconfig/*/*/*',
'/etc/sysconfig/*/*/*/*',
'/etc/sysconfig/*/*/*/*/*',
'/etc/sysconfig/*/*/*/*/*/*',
'/etc/sysconfig/*/*/*/*/*/*/*',
'/etc/sysconfig/*/*/*/*/*/*/*/*',
'/etc/sysconfig/*/*/*/*/*/*/*/*/*',
'/etc/sysconfig/*/*/*/*/*/*/*/*/*/*'])
self.assertEqual(sorted(paths), expected_paths)

# Test globstar with prefix.
paths = path_helper.PathHelper.ExpandGlobStars('/etc/sysconfig/my**', '/')

self.assertEqual(len(paths), 1)

self.assertEqual(paths, ['/etc/sysconfig/my**'])

# Test globstar with suffix.
paths = path_helper.PathHelper.ExpandGlobStars('/etc/sysconfig/**.exe', '/')

self.assertEqual(len(paths), 1)

self.assertEqual(paths, ['/etc/sysconfig/**.exe'])

def testExpandUsersVariablePath(self):
"""Tests the ExpandUsersVariablePath function."""
Expand Down

0 comments on commit fca48ea

Please sign in to comment.