From f386a5b7431d6f50e90b0cf09c54c43bbdb1b277 Mon Sep 17 00:00:00 2001 From: AktanKasymaliev Date: Mon, 23 May 2022 22:36:31 +0600 Subject: [PATCH 1/3] path for aws lambda --- patcher.py | 278 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 278 insertions(+) create mode 100644 patcher.py diff --git a/patcher.py b/patcher.py new file mode 100644 index 00000000..24cb3fa8 --- /dev/null +++ b/patcher.py @@ -0,0 +1,278 @@ +#!/usr/bin/env python3 +# this module is part of undetected_chromedriver + +import io +import logging +import os +import random +import re +import string +import sys +import time +import zipfile +from distutils.version import LooseVersion +from urllib.request import urlopen, urlretrieve +import secrets + + +logger = logging.getLogger(__name__) + +IS_POSIX = sys.platform.startswith(("darwin", "cygwin", "linux")) + + +class Patcher(object): + url_repo = "https://chromedriver.storage.googleapis.com" + zip_name = "chromedriver_%s.zip" + exe_name = "chromedriver%s" + + platform = sys.platform + if platform.endswith("win32"): + zip_name %= "win32" + exe_name %= ".exe" + if platform.endswith("linux"): + zip_name %= "linux64" + exe_name %= "" + if platform.endswith("darwin"): + zip_name %= "mac64" + exe_name %= "" + + if platform.endswith("win32"): + d = "~/appdata/roaming/undetected_chromedriver" + elif 'LAMBDA_TASK_ROOT' in os.environ: + d = "/tmp/undetected_chromedriver" + elif platform.startswith("linux"): + d = "~/.local/share/undetected_chromedriver" + elif platform.endswith("darwin"): + d = "~/Library/Application Support/undetected_chromedriver" + else: + d = "~/.undetected_chromedriver" + data_path = os.path.abspath(os.path.expanduser(d)) + + def __init__(self, executable_path=None, force=False, version_main: int = 0): + """ + + Args: + executable_path: None = automatic + a full file path to the chromedriver executable + force: False + terminate processes which are holding lock + version_main: 0 = auto + specify main chrome version (rounded, ex: 82) + """ + + self.force = force + self.executable_path = None + prefix = secrets.token_hex(8) + + if not os.path.exists(self.data_path): + os.makedirs(self.data_path, exist_ok=True) + + if not executable_path: + self.executable_path = os.path.join( + self.data_path, "_".join([prefix, self.exe_name]) + ) + + if not IS_POSIX: + if executable_path: + if not executable_path[-4:] == ".exe": + executable_path += ".exe" + + self.zip_path = os.path.join(self.data_path, prefix) + + if not executable_path: + self.executable_path = os.path.abspath( + os.path.join(".", self.executable_path) + ) + + self._custom_exe_path = False + + if executable_path: + self._custom_exe_path = True + self.executable_path = executable_path + self.version_main = version_main + self.version_full = None + + def auto(self, executable_path=None, force=False, version_main=None): + """""" + if executable_path: + self.executable_path = executable_path + self._custom_exe_path = True + + if self._custom_exe_path: + ispatched = self.is_binary_patched(self.executable_path) + if not ispatched: + return self.patch_exe() + else: + return + + if version_main: + self.version_main = version_main + if force is True: + self.force = force + + try: + os.unlink(self.executable_path) + except PermissionError: + if self.force: + self.force_kill_instances(self.executable_path) + return self.auto(force=not self.force) + try: + if self.is_binary_patched(): + # assumes already running AND patched + return True + except PermissionError: + pass + # return False + except FileNotFoundError: + pass + + release = self.fetch_release_number() + self.version_main = release.version[0] + self.version_full = release + self.unzip_package(self.fetch_package()) + return self.patch() + + def patch(self): + self.patch_exe() + return self.is_binary_patched() + + def fetch_release_number(self): + """ + Gets the latest major version available, or the latest major version of self.target_version if set explicitly. + :return: version string + :rtype: LooseVersion + """ + path = "/latest_release" + if self.version_main: + path += f"_{self.version_main}" + path = path.upper() + logger.debug("getting release number from %s" % path) + return LooseVersion(urlopen(self.url_repo + path).read().decode()) + + def parse_exe_version(self): + with io.open(self.executable_path, "rb") as f: + for line in iter(lambda: f.readline(), b""): + match = re.search(rb"platform_handle\x00content\x00([0-9.]*)", line) + if match: + return LooseVersion(match[1].decode()) + + def fetch_package(self): + """ + Downloads ChromeDriver from source + + :return: path to downloaded file + """ + u = "%s/%s/%s" % (self.url_repo, self.version_full.vstring, self.zip_name) + logger.debug("downloading from %s" % u) + # return urlretrieve(u, filename=self.data_path)[0] + return urlretrieve(u)[0] + + def unzip_package(self, fp): + """ + Does what it says + + :return: path to unpacked executable + """ + logger.debug("unzipping %s" % fp) + try: + os.unlink(self.zip_path) + except (FileNotFoundError, OSError): + pass + + os.makedirs(self.zip_path, mode=0o755, exist_ok=True) + with zipfile.ZipFile(fp, mode="r") as zf: + zf.extract(self.exe_name, self.zip_path) + os.rename(os.path.join(self.zip_path, self.exe_name), self.executable_path) + os.remove(fp) + os.rmdir(self.zip_path) + os.chmod(self.executable_path, 0o755) + return self.executable_path + + @staticmethod + def force_kill_instances(exe_name): + """ + kills running instances. + :param: executable name to kill, may be a path as well + + :return: True on success else False + """ + exe_name = os.path.basename(exe_name) + if IS_POSIX: + r = os.system("kill -f -9 $(pidof %s)" % exe_name) + else: + r = os.system("taskkill /f /im %s" % exe_name) + return not r + + @staticmethod + def gen_random_cdc(): + cdc = random.choices(string.ascii_lowercase, k=26) + cdc[-6:-4] = map(str.upper, cdc[-6:-4]) + cdc[2] = cdc[0] + cdc[3] = "_" + return "".join(cdc).encode() + + def is_binary_patched(self, executable_path=None): + """simple check if executable is patched. + + :return: False if not patched, else True + """ + executable_path = executable_path or self.executable_path + with io.open(executable_path, "rb") as fh: + for line in iter(lambda: fh.readline(), b""): + if b"cdc_" in line: + return False + else: + return True + + def patch_exe(self): + """ + Patches the ChromeDriver binary + + :return: False on failure, binary name on success + """ + logger.info("patching driver executable %s" % self.executable_path) + + linect = 0 + replacement = self.gen_random_cdc() + with io.open(self.executable_path, "r+b") as fh: + for line in iter(lambda: fh.readline(), b""): + if b"cdc_" in line: + fh.seek(-len(line), 1) + newline = re.sub(b"cdc_.{22}", replacement, line) + fh.write(newline) + linect += 1 + return linect + + def __repr__(self): + return "{0:s}({1:s})".format( + self.__class__.__name__, + self.executable_path, + ) + + def __del__(self): + + if self._custom_exe_path: + # if the driver binary is specified by user + # we assume it is important enough to not delete it + return + else: + timeout = 3 # stop trying after this many seconds + t = time.monotonic() + while True: + now = time.monotonic() + if now - t > timeout: + # we don't want to wait until the end of time + logger.debug( + "could not unlink %s in time (%d seconds)" + % (self.executable_path, timeout) + ) + break + try: + os.unlink(self.executable_path) + logger.debug("successfully unlinked %s" % self.executable_path) + break + except (OSError, RuntimeError, PermissionError): + time.sleep(0.1) + continue + except FileNotFoundError: + break From 444d9e4abaf6ca6d9c0baedde318083f63a7a7e8 Mon Sep 17 00:00:00 2001 From: AktanKasymaliev Date: Mon, 23 May 2022 23:07:24 +0600 Subject: [PATCH 2/3] paht for aws lambda --- undetected_chromedriver/patcher.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/undetected_chromedriver/patcher.py b/undetected_chromedriver/patcher.py index c7818c7b..24cb3fa8 100644 --- a/undetected_chromedriver/patcher.py +++ b/undetected_chromedriver/patcher.py @@ -38,6 +38,8 @@ class Patcher(object): if platform.endswith("win32"): d = "~/appdata/roaming/undetected_chromedriver" + elif 'LAMBDA_TASK_ROOT' in os.environ: + d = "/tmp/undetected_chromedriver" elif platform.startswith("linux"): d = "~/.local/share/undetected_chromedriver" elif platform.endswith("darwin"): From 5c9843872503e81ac5379a324dd55ffd2b655d0d Mon Sep 17 00:00:00 2001 From: AktanKasymaliev Date: Mon, 23 May 2022 23:18:06 +0600 Subject: [PATCH 3/3] path for aws lambda --- patcher.py | 278 ----------------------------------------------------- setup.py | 2 +- 2 files changed, 1 insertion(+), 279 deletions(-) delete mode 100644 patcher.py diff --git a/patcher.py b/patcher.py deleted file mode 100644 index 24cb3fa8..00000000 --- a/patcher.py +++ /dev/null @@ -1,278 +0,0 @@ -#!/usr/bin/env python3 -# this module is part of undetected_chromedriver - -import io -import logging -import os -import random -import re -import string -import sys -import time -import zipfile -from distutils.version import LooseVersion -from urllib.request import urlopen, urlretrieve -import secrets - - -logger = logging.getLogger(__name__) - -IS_POSIX = sys.platform.startswith(("darwin", "cygwin", "linux")) - - -class Patcher(object): - url_repo = "https://chromedriver.storage.googleapis.com" - zip_name = "chromedriver_%s.zip" - exe_name = "chromedriver%s" - - platform = sys.platform - if platform.endswith("win32"): - zip_name %= "win32" - exe_name %= ".exe" - if platform.endswith("linux"): - zip_name %= "linux64" - exe_name %= "" - if platform.endswith("darwin"): - zip_name %= "mac64" - exe_name %= "" - - if platform.endswith("win32"): - d = "~/appdata/roaming/undetected_chromedriver" - elif 'LAMBDA_TASK_ROOT' in os.environ: - d = "/tmp/undetected_chromedriver" - elif platform.startswith("linux"): - d = "~/.local/share/undetected_chromedriver" - elif platform.endswith("darwin"): - d = "~/Library/Application Support/undetected_chromedriver" - else: - d = "~/.undetected_chromedriver" - data_path = os.path.abspath(os.path.expanduser(d)) - - def __init__(self, executable_path=None, force=False, version_main: int = 0): - """ - - Args: - executable_path: None = automatic - a full file path to the chromedriver executable - force: False - terminate processes which are holding lock - version_main: 0 = auto - specify main chrome version (rounded, ex: 82) - """ - - self.force = force - self.executable_path = None - prefix = secrets.token_hex(8) - - if not os.path.exists(self.data_path): - os.makedirs(self.data_path, exist_ok=True) - - if not executable_path: - self.executable_path = os.path.join( - self.data_path, "_".join([prefix, self.exe_name]) - ) - - if not IS_POSIX: - if executable_path: - if not executable_path[-4:] == ".exe": - executable_path += ".exe" - - self.zip_path = os.path.join(self.data_path, prefix) - - if not executable_path: - self.executable_path = os.path.abspath( - os.path.join(".", self.executable_path) - ) - - self._custom_exe_path = False - - if executable_path: - self._custom_exe_path = True - self.executable_path = executable_path - self.version_main = version_main - self.version_full = None - - def auto(self, executable_path=None, force=False, version_main=None): - """""" - if executable_path: - self.executable_path = executable_path - self._custom_exe_path = True - - if self._custom_exe_path: - ispatched = self.is_binary_patched(self.executable_path) - if not ispatched: - return self.patch_exe() - else: - return - - if version_main: - self.version_main = version_main - if force is True: - self.force = force - - try: - os.unlink(self.executable_path) - except PermissionError: - if self.force: - self.force_kill_instances(self.executable_path) - return self.auto(force=not self.force) - try: - if self.is_binary_patched(): - # assumes already running AND patched - return True - except PermissionError: - pass - # return False - except FileNotFoundError: - pass - - release = self.fetch_release_number() - self.version_main = release.version[0] - self.version_full = release - self.unzip_package(self.fetch_package()) - return self.patch() - - def patch(self): - self.patch_exe() - return self.is_binary_patched() - - def fetch_release_number(self): - """ - Gets the latest major version available, or the latest major version of self.target_version if set explicitly. - :return: version string - :rtype: LooseVersion - """ - path = "/latest_release" - if self.version_main: - path += f"_{self.version_main}" - path = path.upper() - logger.debug("getting release number from %s" % path) - return LooseVersion(urlopen(self.url_repo + path).read().decode()) - - def parse_exe_version(self): - with io.open(self.executable_path, "rb") as f: - for line in iter(lambda: f.readline(), b""): - match = re.search(rb"platform_handle\x00content\x00([0-9.]*)", line) - if match: - return LooseVersion(match[1].decode()) - - def fetch_package(self): - """ - Downloads ChromeDriver from source - - :return: path to downloaded file - """ - u = "%s/%s/%s" % (self.url_repo, self.version_full.vstring, self.zip_name) - logger.debug("downloading from %s" % u) - # return urlretrieve(u, filename=self.data_path)[0] - return urlretrieve(u)[0] - - def unzip_package(self, fp): - """ - Does what it says - - :return: path to unpacked executable - """ - logger.debug("unzipping %s" % fp) - try: - os.unlink(self.zip_path) - except (FileNotFoundError, OSError): - pass - - os.makedirs(self.zip_path, mode=0o755, exist_ok=True) - with zipfile.ZipFile(fp, mode="r") as zf: - zf.extract(self.exe_name, self.zip_path) - os.rename(os.path.join(self.zip_path, self.exe_name), self.executable_path) - os.remove(fp) - os.rmdir(self.zip_path) - os.chmod(self.executable_path, 0o755) - return self.executable_path - - @staticmethod - def force_kill_instances(exe_name): - """ - kills running instances. - :param: executable name to kill, may be a path as well - - :return: True on success else False - """ - exe_name = os.path.basename(exe_name) - if IS_POSIX: - r = os.system("kill -f -9 $(pidof %s)" % exe_name) - else: - r = os.system("taskkill /f /im %s" % exe_name) - return not r - - @staticmethod - def gen_random_cdc(): - cdc = random.choices(string.ascii_lowercase, k=26) - cdc[-6:-4] = map(str.upper, cdc[-6:-4]) - cdc[2] = cdc[0] - cdc[3] = "_" - return "".join(cdc).encode() - - def is_binary_patched(self, executable_path=None): - """simple check if executable is patched. - - :return: False if not patched, else True - """ - executable_path = executable_path or self.executable_path - with io.open(executable_path, "rb") as fh: - for line in iter(lambda: fh.readline(), b""): - if b"cdc_" in line: - return False - else: - return True - - def patch_exe(self): - """ - Patches the ChromeDriver binary - - :return: False on failure, binary name on success - """ - logger.info("patching driver executable %s" % self.executable_path) - - linect = 0 - replacement = self.gen_random_cdc() - with io.open(self.executable_path, "r+b") as fh: - for line in iter(lambda: fh.readline(), b""): - if b"cdc_" in line: - fh.seek(-len(line), 1) - newline = re.sub(b"cdc_.{22}", replacement, line) - fh.write(newline) - linect += 1 - return linect - - def __repr__(self): - return "{0:s}({1:s})".format( - self.__class__.__name__, - self.executable_path, - ) - - def __del__(self): - - if self._custom_exe_path: - # if the driver binary is specified by user - # we assume it is important enough to not delete it - return - else: - timeout = 3 # stop trying after this many seconds - t = time.monotonic() - while True: - now = time.monotonic() - if now - t > timeout: - # we don't want to wait until the end of time - logger.debug( - "could not unlink %s in time (%d seconds)" - % (self.executable_path, timeout) - ) - break - try: - os.unlink(self.executable_path) - logger.debug("successfully unlinked %s" % self.executable_path) - break - except (OSError, RuntimeError, PermissionError): - time.sleep(0.1) - continue - except FileNotFoundError: - break diff --git a/setup.py b/setup.py index bf688f12..b5a7ca99 100644 --- a/setup.py +++ b/setup.py @@ -60,4 +60,4 @@ "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", ], -) +) \ No newline at end of file