Skip to content

Commit

Permalink
Merge branch 'main' into small_int_immortal
Browse files Browse the repository at this point in the history
  • Loading branch information
eendebakpt authored Nov 22, 2024
2 parents ea17aee + 7725c03 commit 312c790
Show file tree
Hide file tree
Showing 6 changed files with 492 additions and 249 deletions.
6 changes: 0 additions & 6 deletions Doc/library/shutil.rst
Original file line number Diff line number Diff line change
Expand Up @@ -491,12 +491,6 @@ Directory and files operations
or ends with an extension that is in ``PATHEXT``; and filenames that
have no extension can now be found.

.. versionchanged:: 3.12.1
On Windows, if *mode* includes ``os.X_OK``, executables with an
extension in ``PATHEXT`` will be preferred over executables without a
matching extension.
This brings behavior closer to that of Python 3.11.

.. exception:: Error

This exception collects exceptions that are raised during a multi-file
Expand Down
22 changes: 11 additions & 11 deletions Lib/shutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -1550,21 +1550,21 @@ def which(cmd, mode=os.F_OK | os.X_OK, path=None):
if sys.platform == "win32":
# PATHEXT is necessary to check on Windows.
pathext_source = os.getenv("PATHEXT") or _WIN_DEFAULT_PATHEXT
pathext = [ext for ext in pathext_source.split(os.pathsep) if ext]
pathext = pathext_source.split(os.pathsep)
pathext = [ext.rstrip('.') for ext in pathext if ext]

if use_bytes:
pathext = [os.fsencode(ext) for ext in pathext]

files = ([cmd] + [cmd + ext for ext in pathext])
files = [cmd + ext for ext in pathext]

# gh-109590. If we are looking for an executable, we need to look
# for a PATHEXT match. The first cmd is the direct match
# (e.g. python.exe instead of python)
# Check that direct match first if and only if the extension is in PATHEXT
# Otherwise check it last
suffix = os.path.splitext(files[0])[1].upper()
if mode & os.X_OK and not any(suffix == ext.upper() for ext in pathext):
files.append(files.pop(0))
# If X_OK in mode, simulate the cmd.exe behavior: look at direct
# match if and only if the extension is in PATHEXT.
# If X_OK not in mode, simulate the first result of where.exe:
# always look at direct match before a PATHEXT match.
normcmd = cmd.upper()
if not (mode & os.X_OK) or any(normcmd.endswith(ext.upper()) for ext in pathext):
files.insert(0, cmd)
else:
# On other platforms you don't have things like PATHEXT to tell you
# what file suffixes are executable, so just pass on cmd as-is.
Expand All @@ -1573,7 +1573,7 @@ def which(cmd, mode=os.F_OK | os.X_OK, path=None):
seen = set()
for dir in path:
normdir = os.path.normcase(dir)
if not normdir in seen:
if normdir not in seen:
seen.add(normdir)
for thefile in files:
name = os.path.join(dir, thefile)
Expand Down
2 changes: 1 addition & 1 deletion Lib/test/libregrtest/run_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ def get_running(workers: list[WorkerThread]) -> str | None:
running: list[str] = []
for worker in workers:
test_name = worker.test_name
if not test_name:
if test_name == _NOT_RUNNING:
continue
dt = time.monotonic() - worker.start_time
if dt >= PROGRESS_MIN_TIME:
Expand Down
188 changes: 188 additions & 0 deletions Lib/test/test_ctypes/test_win32_com_foreign_func.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
import ctypes
import gc
import sys
import unittest
from ctypes import POINTER, byref, c_void_p
from ctypes.wintypes import BYTE, DWORD, WORD

if sys.platform != "win32":
raise unittest.SkipTest("Windows-specific test")


from _ctypes import COMError
from ctypes import HRESULT


COINIT_APARTMENTTHREADED = 0x2
CLSCTX_SERVER = 5
S_OK = 0
OUT = 2
TRUE = 1
E_NOINTERFACE = -2147467262


class GUID(ctypes.Structure):
# https://learn.microsoft.com/en-us/windows/win32/api/guiddef/ns-guiddef-guid
_fields_ = [
("Data1", DWORD),
("Data2", WORD),
("Data3", WORD),
("Data4", BYTE * 8),
]


def create_proto_com_method(name, index, restype, *argtypes):
proto = ctypes.WINFUNCTYPE(restype, *argtypes)

def make_method(*args):
foreign_func = proto(index, name, *args)

def call(self, *args, **kwargs):
return foreign_func(self, *args, **kwargs)

return call

return make_method


def create_guid(name):
guid = GUID()
# https://learn.microsoft.com/en-us/windows/win32/api/combaseapi/nf-combaseapi-clsidfromstring
ole32.CLSIDFromString(name, byref(guid))
return guid


def is_equal_guid(guid1, guid2):
# https://learn.microsoft.com/en-us/windows/win32/api/objbase/nf-objbase-isequalguid
return ole32.IsEqualGUID(byref(guid1), byref(guid2))


ole32 = ctypes.oledll.ole32

IID_IUnknown = create_guid("{00000000-0000-0000-C000-000000000046}")
IID_IStream = create_guid("{0000000C-0000-0000-C000-000000000046}")
IID_IPersist = create_guid("{0000010C-0000-0000-C000-000000000046}")
CLSID_ShellLink = create_guid("{00021401-0000-0000-C000-000000000046}")

# https://learn.microsoft.com/en-us/windows/win32/api/unknwn/nf-unknwn-iunknown-queryinterface(refiid_void)
proto_query_interface = create_proto_com_method(
"QueryInterface", 0, HRESULT, POINTER(GUID), POINTER(c_void_p)
)
# https://learn.microsoft.com/en-us/windows/win32/api/unknwn/nf-unknwn-iunknown-addref
proto_add_ref = create_proto_com_method("AddRef", 1, ctypes.c_long)
# https://learn.microsoft.com/en-us/windows/win32/api/unknwn/nf-unknwn-iunknown-release
proto_release = create_proto_com_method("Release", 2, ctypes.c_long)
# https://learn.microsoft.com/en-us/windows/win32/api/objidl/nf-objidl-ipersist-getclassid
proto_get_class_id = create_proto_com_method(
"GetClassID", 3, HRESULT, POINTER(GUID)
)


class ForeignFunctionsThatWillCallComMethodsTests(unittest.TestCase):
def setUp(self):
# https://learn.microsoft.com/en-us/windows/win32/api/combaseapi/nf-combaseapi-coinitializeex
ole32.CoInitializeEx(None, COINIT_APARTMENTTHREADED)

def tearDown(self):
# https://learn.microsoft.com/en-us/windows/win32/api/combaseapi/nf-combaseapi-couninitialize
ole32.CoUninitialize()
gc.collect()

@staticmethod
def create_shelllink_persist(typ):
ppst = typ()
# https://learn.microsoft.com/en-us/windows/win32/api/combaseapi/nf-combaseapi-cocreateinstance
ole32.CoCreateInstance(
byref(CLSID_ShellLink),
None,
CLSCTX_SERVER,
byref(IID_IPersist),
byref(ppst),
)
return ppst

def test_without_paramflags_and_iid(self):
class IUnknown(c_void_p):
QueryInterface = proto_query_interface()
AddRef = proto_add_ref()
Release = proto_release()

class IPersist(IUnknown):
GetClassID = proto_get_class_id()

ppst = self.create_shelllink_persist(IPersist)

clsid = GUID()
hr_getclsid = ppst.GetClassID(byref(clsid))
self.assertEqual(S_OK, hr_getclsid)
self.assertEqual(TRUE, is_equal_guid(CLSID_ShellLink, clsid))

self.assertEqual(2, ppst.AddRef())
self.assertEqual(3, ppst.AddRef())

punk = IUnknown()
hr_qi = ppst.QueryInterface(IID_IUnknown, punk)
self.assertEqual(S_OK, hr_qi)
self.assertEqual(3, punk.Release())

with self.assertRaises(OSError) as e:
punk.QueryInterface(IID_IStream, IUnknown())
self.assertEqual(E_NOINTERFACE, e.exception.winerror)

self.assertEqual(2, ppst.Release())
self.assertEqual(1, ppst.Release())
self.assertEqual(0, ppst.Release())

def test_with_paramflags_and_without_iid(self):
class IUnknown(c_void_p):
QueryInterface = proto_query_interface(None)
AddRef = proto_add_ref()
Release = proto_release()

class IPersist(IUnknown):
GetClassID = proto_get_class_id(((OUT, "pClassID"),))

ppst = self.create_shelllink_persist(IPersist)

clsid = ppst.GetClassID()
self.assertEqual(TRUE, is_equal_guid(CLSID_ShellLink, clsid))

punk = IUnknown()
hr_qi = ppst.QueryInterface(IID_IUnknown, punk)
self.assertEqual(S_OK, hr_qi)
self.assertEqual(1, punk.Release())

with self.assertRaises(OSError) as e:
ppst.QueryInterface(IID_IStream, IUnknown())
self.assertEqual(E_NOINTERFACE, e.exception.winerror)

self.assertEqual(0, ppst.Release())

def test_with_paramflags_and_iid(self):
class IUnknown(c_void_p):
QueryInterface = proto_query_interface(None, IID_IUnknown)
AddRef = proto_add_ref()
Release = proto_release()

class IPersist(IUnknown):
GetClassID = proto_get_class_id(((OUT, "pClassID"),), IID_IPersist)

ppst = self.create_shelllink_persist(IPersist)

clsid = ppst.GetClassID()
self.assertEqual(TRUE, is_equal_guid(CLSID_ShellLink, clsid))

punk = IUnknown()
hr_qi = ppst.QueryInterface(IID_IUnknown, punk)
self.assertEqual(S_OK, hr_qi)
self.assertEqual(1, punk.Release())

with self.assertRaises(COMError) as e:
ppst.QueryInterface(IID_IStream, IUnknown())
self.assertEqual(E_NOINTERFACE, e.exception.hresult)

self.assertEqual(0, ppst.Release())


if __name__ == '__main__':
unittest.main()
Loading

0 comments on commit 312c790

Please sign in to comment.