From e7d7c6f54980ad742dd634a8786352b8ed1631d4 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Thu, 29 Feb 2024 09:44:20 +0100 Subject: [PATCH] Test code for using urllib or requests modules instead of curl --- PILOTVERSION | 2 +- pilot/util/constants.py | 2 +- pilot/util/container.py | 76 ++++++++++-------- pilot/util/https.py | 163 ++++++++++++++++++++++++++++++++++++-- pilot/util/tracereport.py | 33 ++++++-- 5 files changed, 229 insertions(+), 47 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index f682aa60..d7797926 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.7.2.1 \ No newline at end of file +3.7.2.2 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index b9168e08..774d36e3 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '7' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '1' # build number should be reset to '1' for every new development cycle +BUILD = '2' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/container.py b/pilot/util/container.py index ede58cc5..7725f115 100644 --- a/pilot/util/container.py +++ b/pilot/util/container.py @@ -89,6 +89,7 @@ def execute(executable: Any, **kwargs: dict) -> Any: stderr = '' # Acquire the lock before creating the subprocess + process = None with execute_lock: process = subprocess.Popen(exe, bufsize=-1, @@ -122,11 +123,13 @@ def execute(executable: Any, **kwargs: dict) -> Any: # (not strictly necessary when process.communicate() is used) try: # wait for the process to complete with a timeout of 60 seconds - process.wait(timeout=60) + if process: + process.wait(timeout=60) except subprocess.TimeoutExpired: # Handle the case where the process did not complete within the timeout - print("process did not complete within the timeout of 60s - terminating") - process.terminate() + if process: + logger.warning("process did not complete within the timeout of 60s - terminating") + process.terminate() # remove any added \n if stdout and stdout.endswith('\n'): @@ -152,8 +155,9 @@ def _timeout_handler(): nonlocal exit_code # Use nonlocal to modify the outer variable logger.warning("subprocess execution timed out") exit_code = -2 - process.terminate() # Terminate the subprocess if it's still running - logger.info(f'process terminated after {timeout_seconds}s') + if process: + process.terminate() # Terminate the subprocess if it's still running + logger.info(f'process terminated after {timeout_seconds}s') obscure = kwargs.get('obscure', '') # if this string is set, hide it in the log message if not kwargs.get('mute', False): @@ -162,43 +166,49 @@ def _timeout_handler(): exe = ['/usr/bin/python'] + executable.split() if kwargs.get('mode', 'bash') == 'python' else ['/bin/bash', '-c', executable] # Create the subprocess with stdout and stderr redirection to files - process = subprocess.Popen(exe, - stdout=stdout_file, - stderr=stderr_file, - cwd=kwargs.get('cwd', os.getcwd()), - preexec_fn=os.setsid, - encoding='utf-8', - errors='replace') - - # Set up a timer for the timeout - timeout_timer = threading.Timer(timeout_seconds, _timeout_handler) + # Acquire the lock before creating the subprocess + process = None + with execute_lock: + process = subprocess.Popen(exe, + stdout=stdout_file, + stderr=stderr_file, + cwd=kwargs.get('cwd', os.getcwd()), + preexec_fn=os.setsid, + encoding='utf-8', + errors='replace') - try: - # Start the timer - timeout_timer.start() + # Set up a timer for the timeout + timeout_timer = threading.Timer(timeout_seconds, _timeout_handler) - # wait for the process to finish try: - # wait for the process to complete with a timeout (this will likely never happen since a timer is used) - process.wait(timeout=timeout_seconds + 10) - except subprocess.TimeoutExpired: - # Handle the case where the process did not complete within the timeout - timeout_seconds = timeout_seconds + 10 - logger.warning(f"process wait did not complete within the timeout of {timeout_seconds}s - terminating") - exit_code = -2 - process.terminate() - except Exception as exc: - logger.warning(f'execution caught: {exc}') - finally: - # Cancel the timer to avoid it firing after the subprocess has completed - timeout_timer.cancel() + # Start the timer + timeout_timer.start() + + # wait for the process to finish + try: + # wait for the process to complete with a timeout (this will likely never happen since a timer is used) + process.wait(timeout=timeout_seconds + 10) + except subprocess.TimeoutExpired: + # Handle the case where the process did not complete within the timeout + timeout_seconds = timeout_seconds + 10 + logger.warning(f"process wait did not complete within the timeout of {timeout_seconds}s - terminating") + exit_code = -2 + process.terminate() + except Exception as exc: + logger.warning(f'execution caught: {exc}') + finally: + # Cancel the timer to avoid it firing after the subprocess has completed + timeout_timer.cancel() if exit_code == -2: # the process was terminated due to a time-out exit_code = errors.COMMANDTIMEDOUT else: # get the exit code after a normal finish - exit_code = process.returncode + if process: + exit_code = process.returncode + else: + exit_code = -1 return exit_code diff --git a/pilot/util/https.py b/pilot/util/https.py index b33245e3..65cf9335 100644 --- a/pilot/util/https.py +++ b/pilot/util/https.py @@ -19,16 +19,24 @@ # Authors: # - Daniel Drizhuk, d.drizhuk@gmail.com, 2017 # - Mario Lassnig, mario.lassnig@cern.ch, 2017 -# - Paul Nilsson, paul.nilsson@cern.ch, 2017-23 +# - Paul Nilsson, paul.nilsson@cern.ch, 2017-24 """Functions for https interactions.""" +try: + import certifi +except ImportError: + certifi = None import json import logging import os import pipes import platform import random +try: + import requests +except ImportError: + requests = None import socket import ssl import sys @@ -420,6 +428,7 @@ def get_urlopen_output(req: Any, context: Any) -> (int, str): """ exitcode = -1 output = "" + logger.debug('ok about to open url') try: output = urllib.request.urlopen(req, context=context) except urllib.error.HTTPError as exc: @@ -428,7 +437,7 @@ def get_urlopen_output(req: Any, context: Any) -> (int, str): logger.warning(f'connection error: {exc.reason}') else: exitcode = 0 - + logger.debug(f'ok url opened: exitcode={exitcode}') return exitcode, output @@ -545,12 +554,20 @@ def get_panda_server(url: str, port: str, update_server: bool = True) -> str: if not update_server: return pandaserver + # set a timeout of 10 seconds to prevent potential hanging due to problems with DNS resolution, or if the DNS + # server is slow to respond + socket.setdefaulttimeout(10) + # add randomization for PanDA server default = 'pandaserver.cern.ch' if default in pandaserver: - rnd = random.choice([socket.getfqdn(vv) for vv in set([v[-1][0] for v in socket.getaddrinfo(default, 25443, socket.AF_INET)])]) - pandaserver = pandaserver.replace(default, rnd) - logger.debug(f'updated {default} to {pandaserver}') + try: + rnd = random.choice([socket.getfqdn(vv) for vv in set([v[-1][0] for v in socket.getaddrinfo(default, 25443, socket.AF_INET)])]) + except socket.herror as exc: + logger.warning(f'failed to get address from socket: {exc} - will use default server ({pandaserver})') + else: + pandaserver = pandaserver.replace(default, rnd) + logger.debug(f'updated {default} to {pandaserver}') return pandaserver @@ -609,3 +626,139 @@ def get_server_command(url: str, port: str, cmd: str = 'getJob') -> str: # randomize server name url = get_panda_server(url, port) return f'{url}/server/panda/{cmd}' + + +def request2_bad(url: str, data: dict = {}) -> str: + """ + Send a request using HTTPS. + + :param url: the URL of the resource (str) + :param data: data to send (dict) + :return: server response (str). + """ + + # convert the dictionary to a JSON string + data_json = json.dumps(data).encode('utf-8') + + # Create an SSLContext object + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + logger.debug(f'capath={_ctx.capath}') + logger.debug(f'cacert={_ctx.cacert}') + ssl_context.load_verify_locations(_ctx.capath) + ssl_context.load_cert_chain(_ctx.cacert) + # define additional headers + headers = { + "Content-Type": "application/json", + "User-Agent": _ctx.user_agent, + } + + # create a request object with the SSL context + request = urllib.request.Request(url, data=data_json, headers=headers, method='POST') + + # perform the HTTP request with the SSL context + try: + response = urllib.request.urlopen(request, context=ssl_context) + ret = response.read().decode('utf-8') + except (urllib.error.URLError, urllib.error.HTTPError) as exc: + logger.warning(f'failed to send request: {exc}') + ret = "" + + return ret + + +def request2(url: str, data: dict = {}) -> str: + """ + Send a request using HTTPS (using urllib module). + + :param url: the URL of the resource (str) + :param data: data to send (dict) + :return: server response (str). + """ + # https might not have been set up if running in a [middleware] container + if not _ctx.cacert: + logger.debug('setting up unset https') + https_setup(None, get_pilot_version()) + + # define additional headers + headers = { + "Content-Type": "application/json", + "User-Agent": _ctx.user_agent, + } + + logger.debug(f'headers={headers}') + + # Encode data as JSON + data_json = json.dumps(data).encode('utf-8') + #data_json = urllib.parse.quote(json.dumps(data)) + #data_json = data_json.encode('utf-8') + + logger.debug(f'data_json={data_json}') + + # Set up the request + req = urllib.request.Request(url, data_json, headers=headers) + + # Create a context with certificate verification + logger.debug(f'cacert={_ctx.cacert}') # /alrb/x509up_u25606_prod + logger.debug(f'capath={_ctx.capath}') # /cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase/etc/grid-security-emi/certificates + #context = ssl.create_default_context(cafile=_ctx.cacert, capath=_ctx.capath) + #logger.debug(f'context={context}') + + ssl_context = ssl.create_default_context(capath=_ctx.capath, cafile=_ctx.cacert) + # Send the request securely + try: + with urllib.request.urlopen(req, context=ssl_context) as response: + # Handle the response here + logger.debug(response.status, response.reason) + logger.debug(response.read().decode('utf-8')) + ret = response.read().decode('utf-8') + except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError) as exc: + logger.warning(f'failed to send request: {exc}') + ret = "" + + return ret + + +def request3(url: str, data: dict = {}) -> str: + """ + Send a request using HTTPS (using requests module). + + :param url: the URL of the resource (str) + :param data: data to send (dict) + :return: server response (str). + """ + if not requests: + logger.warning('cannot use requests module (not available)') + return "" + if not certifi: + logger.warning('cannot use certifi module (not available)') + return "" + + # https might not have been set up if running in a [middleware] container + if not _ctx.cacert: + logger.debug('setting up unset https') + https_setup(None, get_pilot_version()) + + # define additional headers + headers = { + "Content-Type": "application/json", + "User-Agent": _ctx.user_agent, + } + + # Convert the dictionary to a JSON string + data_json = json.dumps(data) + + # Use the requests module to make the HTTP request + try: + # certifi.where() = /cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase/x86_64/python/3.11.7-x86_64-el9/ + # lib/python3.11/site-packages/certifi/cacert.pem + # _ctx.cacert = /alrb/x509up_u25606_prod + response = requests.post(url, data=data_json, headers=headers, verify=_ctx.cacert, cert=certifi.where(), timeout=120) + response.raise_for_status() # Raise an error for bad responses (4xx and 5xx) + + # Handle the response as needed + ret = response.text + except (requests.exceptions.RequestException, requests.exceptions.Timeout) as exc: + logger.warning(f'failed to send request: {exc}') + ret = "" + + return ret diff --git a/pilot/util/tracereport.py b/pilot/util/tracereport.py index d4e17850..46cb3e11 100644 --- a/pilot/util/tracereport.py +++ b/pilot/util/tracereport.py @@ -28,11 +28,12 @@ from json import dumps from os import environ, getuid +from pilot.common.exception import FileHandlingFailure from pilot.util.config import config from pilot.util.constants import get_pilot_version, get_rucio_client_version from pilot.util.container import execute, execute2 -from pilot.common.exception import FileHandlingFailure from pilot.util.filehandling import append_to_file, write_file +# from pilot.util.https import request3 import logging logger = logging.getLogger(__name__) @@ -102,16 +103,27 @@ def init(self, job): self.update(data) self['timeStart'] = time.time() - hostname = os.environ.get('PANDA_HOSTNAME', socket.gethostname()) + # set a timeout of 10 seconds to prevent potential hanging due to problems with DNS resolution, or if the DNS + # server is slow to respond + socket.setdefaulttimeout(10) + + try: + hostname = os.environ.get('PANDA_HOSTNAME', socket.gethostname()) + except socket.herror as exc: + logger.warning(f'unable to detect hostname for trace report: {exc}') + hostname = os.environ.get('PANDA_HOSTNAME', 'unknown') + try: self['hostname'] = socket.gethostbyaddr(hostname)[0] - except Exception: - logger.debug("unable to detect hostname for trace report") + except socket.herror as exc: + logger.warning(f'unable to detect hostname by address for trace report: {exc}') + self['hostname'] = 'unknown' try: self['ip'] = socket.gethostbyname(hostname) - except Exception: - logger.debug("unable to detect host IP for trace report") + except socket.herror as exc: + logger.debug(f"unable to detect host IP for trace report: {exc}") + self['ip'] = '0.0.0.0' if job.jobdefinitionid: s = 'ppilot_%s' % job.jobdefinitionid @@ -119,7 +131,7 @@ def init(self, job): else: #self['uuid'] = commands.getoutput('uuidgen -t 2> /dev/null').replace('-', '') # all LFNs of one request have the same uuid cmd = 'uuidgen -t 2> /dev/null' - exit_code, stdout, stderr = execute(cmd) + exit_code, stdout, stderr = execute(cmd, timeout=10) self['uuid'] = stdout.replace('-', '') def get_value(self, key): @@ -187,6 +199,13 @@ def send(self): ssl_certificate = self.get_ssl_certificate() + #ret = request3(url, data) + #if ret: + # logger.info("tracing report sent") + # return True + #else: + # logger.warning("failed to send tracing report - using old curl command") + # create the command command = 'curl' if self.ipv == 'IPv4':