Skip to content

Commit

Permalink
Fix handling for other Python versions
Browse files Browse the repository at this point in the history
- fix wrapped functions in pathlib (accessor not available in most Python versions)
- fix open_code handling
- make module path comparisons case-insensitive under Windows and macOS
  • Loading branch information
mrbean-bremen committed Aug 12, 2024
1 parent 39fdac8 commit 4b6aecd
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 85 deletions.
12 changes: 6 additions & 6 deletions pyfakefs/fake_filesystem_unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ def __init__(
set_uid(1)
set_gid(1)

self._skip_names = self.SKIPNAMES.copy()
self.skip_names = self.SKIPNAMES.copy()
# save the original open function for use in pytest plugin
self.original_open = open
self.patch_open_code = patch_open_code
Expand All @@ -582,7 +582,7 @@ def __init__(
cast(ModuleType, m).__name__ if inspect.ismodule(m) else cast(str, m)
for m in additional_skip_names
]
self._skip_names.update(skip_names)
self.skip_names.update(skip_names)

self._fake_module_classes: Dict[str, Any] = {}
self._unfaked_module_classes: Dict[str, Any] = {}
Expand Down Expand Up @@ -628,8 +628,8 @@ def __init__(
if patched_module_names != self.PATCHED_MODULE_NAMES:
self.__class__.PATCHED_MODULE_NAMES = patched_module_names
clear_cache = True
if self._skip_names != self.ADDITIONAL_SKIP_NAMES:
self.__class__.ADDITIONAL_SKIP_NAMES = self._skip_names
if self.skip_names != self.ADDITIONAL_SKIP_NAMES:
self.__class__.ADDITIONAL_SKIP_NAMES = self.skip_names
clear_cache = True
if patch_default_args != self.PATCH_DEFAULT_ARGS:
self.__class__.PATCH_DEFAULT_ARGS = patch_default_args
Expand Down Expand Up @@ -875,7 +875,7 @@ def _find_modules(self) -> None:
pass
continue
skipped = module in self.SKIPMODULES or any(
[sn.startswith(module.__name__) for sn in self._skip_names]
[sn.startswith(module.__name__) for sn in self.skip_names]
)
module_items = module.__dict__.copy().items()

Expand Down Expand Up @@ -922,7 +922,7 @@ def _refresh(self) -> None:
for name in self._fake_module_classes:
self.fake_modules[name] = self._fake_module_classes[name](self.fs)
if hasattr(self.fake_modules[name], "skip_names"):
self.fake_modules[name].skip_names = self._skip_names
self.fake_modules[name].skip_names = self.skip_names
self.fake_modules[PATH_MODULE] = self.fake_modules["os"].path
for name in self._unfaked_module_classes:
self.unfaked_modules[name] = self._unfaked_module_classes[name]()
Expand Down
11 changes: 7 additions & 4 deletions pyfakefs/fake_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,14 @@ def lockf(

def __getattribute__(self, name):
"""Forwards any unfaked calls to the standard fcntl module."""
filesystem = object.__getattribute__(self, "filesystem")
fs: FakeFilesystem = object.__getattribute__(self, "filesystem")
fnctl_module = object.__getattribute__(self, "_fcntl_module")
if filesystem.patcher:
skip_names = filesystem.patcher._skip_names
if is_called_from_skipped_module(skip_names=skip_names):
if fs.patcher:
skip_names = fs.patcher.skip_names
if is_called_from_skipped_module(
skip_names=skip_names,
case_sensitive=fs.is_case_sensitive,
):
# remove the `self` argument for FakeOsModule methods
return getattr(fnctl_module, name)

Expand Down
5 changes: 4 additions & 1 deletion pyfakefs/fake_open.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ def fake_open(
"""Redirect the call to FakeFileOpen.
See FakeFileOpen.call() for description.
"""
if is_called_from_skipped_module(skip_names=skip_names):
if is_called_from_skipped_module(
skip_names=skip_names,
case_sensitive=filesystem.is_case_sensitive,
):
return io_open( # pytype: disable=wrong-arg-count
file,
mode,
Expand Down
8 changes: 6 additions & 2 deletions pyfakefs/fake_os.py
Original file line number Diff line number Diff line change
Expand Up @@ -1421,9 +1421,13 @@ def wrapped(*args, **kwargs):

if not should_use_original and args:
self = args[0]
fs: FakeFilesystem = self.filesystem
if self.filesystem.patcher:
skip_names = self.filesystem.patcher._skip_names
if is_called_from_skipped_module(skip_names=skip_names):
skip_names = fs.patcher.skip_names
if is_called_from_skipped_module(
skip_names=skip_names,
case_sensitive=fs.is_case_sensitive,
):
should_use_original = True

if should_use_original:
Expand Down
7 changes: 5 additions & 2 deletions pyfakefs/fake_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,8 +570,11 @@ def wrapped(*args, **kwargs):
self = args[0]
should_use_original = self.os.use_original
if not should_use_original and self.filesystem.patcher:
skip_names = self.filesystem.patcher._skip_names
if is_called_from_skipped_module(skip_names=skip_names):
skip_names = self.filesystem.patcher.skip_names
if is_called_from_skipped_module(
skip_names=skip_names,
case_sensitive=self.filesystem.is_case_sensitive,
):
should_use_original = True

if should_use_original:
Expand Down
111 changes: 50 additions & 61 deletions pyfakefs/fake_pathlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,62 +93,47 @@ def init_module(filesystem):
setattr(FakePathlibModule.PureWindowsPath, parser_name, fake_pure_nt_os.path)


def _wrap_strfunc(strfunc):
@functools.wraps(strfunc)
def _wrap_strfunc(fake_fct, original_fct):
@functools.wraps(fake_fct)
def _wrapped(pathobj, *args, **kwargs):
should_use_original = False
if pathobj.filesystem.patcher:
skip_names = pathobj.filesystem.patcher._skip_names
if is_called_from_skipped_module(skip_names=skip_names):
should_use_original = True

if should_use_original:
return getattr(pathobj._original_accessor, strfunc.__name__)(
str(pathobj),
*args,
**kwargs,
)
return strfunc(pathobj.filesystem, str(pathobj), *args, **kwargs)
fs: FakeFilesystem = pathobj.filesystem
if fs.patcher:
if is_called_from_skipped_module(
skip_names=fs.patcher.skip_names,
case_sensitive=fs.is_case_sensitive,
):
return original_fct(str(pathobj), *args, **kwargs)
return fake_fct(fs, str(pathobj), *args, **kwargs)

return staticmethod(_wrapped)


def _wrap_binary_strfunc(strfunc):
@functools.wraps(strfunc)
def _wrap_binary_strfunc(fake_fct, original_fct):
@functools.wraps(fake_fct)
def _wrapped(pathobj1, pathobj2, *args):
should_use_original = False
if pathobj1.filesystem.patcher:
skip_names = pathobj1.filesystem.patcher._skip_names
if is_called_from_skipped_module(skip_names=skip_names):
should_use_original = True

if should_use_original:
return getattr(pathobj1._original_accessor, strfunc.__name__)(
str(pathobj1),
str(pathobj2),
*args,
)
return strfunc(pathobj1.filesystem, str(pathobj1), str(pathobj2), *args)
fs: FakeFilesystem = pathobj1.filesystem
if fs.patcher:
if is_called_from_skipped_module(
skip_names=fs.patcher.skip_names,
case_sensitive=fs.is_case_sensitive,
):
return original_fct(str(pathobj1), str(pathobj2), *args)
return fake_fct(fs, str(pathobj1), str(pathobj2), *args)

return staticmethod(_wrapped)


def _wrap_binary_strfunc_reverse(strfunc):
@functools.wraps(strfunc)
def _wrap_binary_strfunc_reverse(fake_fct, original_fct):
@functools.wraps(fake_fct)
def _wrapped(pathobj1, pathobj2, *args):
should_use_original = False
if pathobj2.filesystem.patcher:
skip_names = pathobj2.filesystem.patcher._skip_names
if is_called_from_skipped_module(skip_names=skip_names):
should_use_original = True

if should_use_original:
return getattr(pathobj2._original_accessor, strfunc.__name__)(
str(pathobj2),
str(pathobj1),
*args,
)
return strfunc(pathobj2.filesystem, str(pathobj2), str(pathobj1), *args)
fs: FakeFilesystem = pathobj2.filesystem
if fs.patcher:
if is_called_from_skipped_module(
skip_names=fs.patcher.skip_names,
case_sensitive=fs.is_case_sensitive,
):
return original_fct(str(pathobj2), str(pathobj1), *args)
return fake_fct(fs, str(pathobj2), str(pathobj1), *args)

return staticmethod(_wrapped)

Expand All @@ -164,20 +149,21 @@ class _FakeAccessor(accessor): # type: ignore[valid-type, misc]
methods.
"""

stat = _wrap_strfunc(FakeFilesystem.stat)
stat = _wrap_strfunc(FakeFilesystem.stat, os.stat)

lstat = _wrap_strfunc(
lambda fs, path: FakeFilesystem.stat(fs, path, follow_symlinks=False)
lambda fs, path: FakeFilesystem.stat(fs, path, follow_symlinks=False), os.lstat
)

listdir = _wrap_strfunc(FakeFilesystem.listdir)
scandir = _wrap_strfunc(fake_scandir.scandir)
listdir = _wrap_strfunc(FakeFilesystem.listdir, os.listdir)
scandir = _wrap_strfunc(fake_scandir.scandir, os.scandir)

if hasattr(os, "lchmod"):
lchmod = _wrap_strfunc(
lambda fs, path, mode: FakeFilesystem.chmod(
fs, path, mode, follow_symlinks=False
)
),
os.lchmod,
)
else:

Expand All @@ -200,47 +186,51 @@ def chmod(self, pathobj, *args, **kwargs):
)
return pathobj.filesystem.chmod(str(pathobj), *args, **kwargs)

mkdir = _wrap_strfunc(FakeFilesystem.makedir)
mkdir = _wrap_strfunc(FakeFilesystem.makedir, os.mkdir)

unlink = _wrap_strfunc(FakeFilesystem.remove)
unlink = _wrap_strfunc(FakeFilesystem.remove, os.unlink)

rmdir = _wrap_strfunc(FakeFilesystem.rmdir)
rmdir = _wrap_strfunc(FakeFilesystem.rmdir, os.rmdir)

rename = _wrap_binary_strfunc(FakeFilesystem.rename)
rename = _wrap_binary_strfunc(FakeFilesystem.rename, os.rename)

replace = _wrap_binary_strfunc(
lambda fs, old_path, new_path: FakeFilesystem.rename(
fs, old_path, new_path, force_replace=True
)
),
os.replace,
)

symlink = _wrap_binary_strfunc_reverse(
lambda fs, fpath, target, target_is_dir: FakeFilesystem.create_symlink(
fs, fpath, target, create_missing_dirs=False
)
),
os.symlink,
)

if (3, 8) <= sys.version_info:
link_to = _wrap_binary_strfunc(
lambda fs, file_path, link_target: FakeFilesystem.link(
fs, file_path, link_target
)
),
os.link,
)

if sys.version_info >= (3, 10):
link = _wrap_binary_strfunc(
lambda fs, file_path, link_target: FakeFilesystem.link(
fs, file_path, link_target
)
),
os.link,
)

# this will use the fake filesystem because os is patched
def getcwd(self):
return os.getcwd()

readlink = _wrap_strfunc(FakeFilesystem.readlink)
readlink = _wrap_strfunc(FakeFilesystem.readlink, os.readlink)

utime = _wrap_strfunc(FakeFilesystem.utime)
utime = _wrap_strfunc(FakeFilesystem.utime, os.utime)


_fake_accessor = _FakeAccessor()
Expand Down Expand Up @@ -613,7 +603,6 @@ def _from_parsed_parts(cls, drv, root, parts):

def _init(self, template=None):
"""Initializer called from base class."""
self._original_accessor = self._accessor
# only needed until Python 3.10
self._accessor = _fake_accessor
# only needed until Python 3.8
Expand Down
32 changes: 23 additions & 9 deletions pyfakefs/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,24 +440,38 @@ def putvalue(self, value: bytes) -> None:
self._bytestream.write(value)


def is_called_from_skipped_module(skip_names: list) -> bool:
def is_called_from_skipped_module(skip_names: list, case_sensitive: bool) -> bool:
def starts_with(path, string):
if case_sensitive:
return path.startswith(string)
return path.lower().startswith(string.lower())

stack = traceback.extract_stack()
from_open_code = (

# handle the case that we try to call the original `open_code`
# (since Python 3.12)
# The stack in this case is:
# -1: helpers.is_called_from_skipped_module: 'stack = traceback.extract_stack()'
# -2: fake_open.fake_open: 'if is_called_from_skipped_module('
# -3: fake_io.open: 'return fake_open('
# -4: fake_io.open_code : 'return self._io_module.open_code(path)'
if (
sys.version_info >= (3, 12)
and stack[0].name == "open_code"
and stack[0].line == "return self._io_module.open_code(path)"
)
and stack[-4].name == "open_code"
and stack[-4].line == "return self._io_module.open_code(path)"
):
return True

caller_filename = next(
(
frame.filename
for frame in stack[::-1]
if not frame.filename.startswith("<frozen ")
and not frame.filename.startswith(STDLIB_PATH)
and not starts_with(frame.filename, STDLIB_PATH)
and (
not frame.filename.startswith(PYFAKEFS_PATH)
not starts_with(frame.filename, PYFAKEFS_PATH)
or any(
frame.filename.startswith(test_path)
starts_with(frame.filename, test_path)
for test_path in PYFAKEFS_TEST_PATHS
)
)
Expand All @@ -469,7 +483,7 @@ def is_called_from_skipped_module(skip_names: list) -> bool:
caller_module_name = os.path.splitext(caller_filename)[0]
caller_module_name = caller_module_name.replace(os.sep, ".")

if from_open_code or any(
if any(
[
caller_module_name == sn or caller_module_name.endswith("." + sn)
for sn in skip_names
Expand Down

0 comments on commit 4b6aecd

Please sign in to comment.