Skip to content

Commit

Permalink
Test code for using urllib or requests modules instead of curl
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Nilsson committed Feb 29, 2024
1 parent f6fb187 commit e7d7c6f
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 47 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.7.2.1
3.7.2.2
2 changes: 1 addition & 1 deletion pilot/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
RELEASE = '3' # released number should be fixed at 3 for Pilot 3
VERSION = '7' # version number is '1' for first release, '0' until then, increased for bigger updates
REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '1' # build number should be reset to '1' for every new development cycle
BUILD = '2' # build number should be reset to '1' for every new development cycle

SUCCESS = 0
FAILURE = 1
Expand Down
76 changes: 43 additions & 33 deletions pilot/util/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def execute(executable: Any, **kwargs: dict) -> Any:
stderr = ''

# Acquire the lock before creating the subprocess
process = None
with execute_lock:
process = subprocess.Popen(exe,
bufsize=-1,
Expand Down Expand Up @@ -122,11 +123,13 @@ def execute(executable: Any, **kwargs: dict) -> Any:
# (not strictly necessary when process.communicate() is used)
try:
# wait for the process to complete with a timeout of 60 seconds
process.wait(timeout=60)
if process:
process.wait(timeout=60)
except subprocess.TimeoutExpired:
# Handle the case where the process did not complete within the timeout
print("process did not complete within the timeout of 60s - terminating")
process.terminate()
if process:
logger.warning("process did not complete within the timeout of 60s - terminating")
process.terminate()

# remove any added \n
if stdout and stdout.endswith('\n'):
Expand All @@ -152,8 +155,9 @@ def _timeout_handler():
nonlocal exit_code # Use nonlocal to modify the outer variable
logger.warning("subprocess execution timed out")
exit_code = -2
process.terminate() # Terminate the subprocess if it's still running
logger.info(f'process terminated after {timeout_seconds}s')
if process:
process.terminate() # Terminate the subprocess if it's still running
logger.info(f'process terminated after {timeout_seconds}s')

obscure = kwargs.get('obscure', '') # if this string is set, hide it in the log message
if not kwargs.get('mute', False):
Expand All @@ -162,43 +166,49 @@ def _timeout_handler():
exe = ['/usr/bin/python'] + executable.split() if kwargs.get('mode', 'bash') == 'python' else ['/bin/bash', '-c', executable]

# Create the subprocess with stdout and stderr redirection to files
process = subprocess.Popen(exe,
stdout=stdout_file,
stderr=stderr_file,
cwd=kwargs.get('cwd', os.getcwd()),
preexec_fn=os.setsid,
encoding='utf-8',
errors='replace')

# Set up a timer for the timeout
timeout_timer = threading.Timer(timeout_seconds, _timeout_handler)
# Acquire the lock before creating the subprocess
process = None
with execute_lock:
process = subprocess.Popen(exe,
stdout=stdout_file,
stderr=stderr_file,
cwd=kwargs.get('cwd', os.getcwd()),
preexec_fn=os.setsid,
encoding='utf-8',
errors='replace')

try:
# Start the timer
timeout_timer.start()
# Set up a timer for the timeout
timeout_timer = threading.Timer(timeout_seconds, _timeout_handler)

# wait for the process to finish
try:
# wait for the process to complete with a timeout (this will likely never happen since a timer is used)
process.wait(timeout=timeout_seconds + 10)
except subprocess.TimeoutExpired:
# Handle the case where the process did not complete within the timeout
timeout_seconds = timeout_seconds + 10
logger.warning(f"process wait did not complete within the timeout of {timeout_seconds}s - terminating")
exit_code = -2
process.terminate()
except Exception as exc:
logger.warning(f'execution caught: {exc}')
finally:
# Cancel the timer to avoid it firing after the subprocess has completed
timeout_timer.cancel()
# Start the timer
timeout_timer.start()

# wait for the process to finish
try:
# wait for the process to complete with a timeout (this will likely never happen since a timer is used)
process.wait(timeout=timeout_seconds + 10)
except subprocess.TimeoutExpired:
# Handle the case where the process did not complete within the timeout
timeout_seconds = timeout_seconds + 10
logger.warning(f"process wait did not complete within the timeout of {timeout_seconds}s - terminating")
exit_code = -2
process.terminate()
except Exception as exc:
logger.warning(f'execution caught: {exc}')
finally:
# Cancel the timer to avoid it firing after the subprocess has completed
timeout_timer.cancel()

if exit_code == -2:
# the process was terminated due to a time-out
exit_code = errors.COMMANDTIMEDOUT
else:
# get the exit code after a normal finish
exit_code = process.returncode
if process:
exit_code = process.returncode
else:
exit_code = -1

return exit_code

Expand Down
163 changes: 158 additions & 5 deletions pilot/util/https.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,24 @@
# Authors:
# - Daniel Drizhuk, [email protected], 2017
# - Mario Lassnig, [email protected], 2017
# - Paul Nilsson, [email protected], 2017-23
# - Paul Nilsson, [email protected], 2017-24

"""Functions for https interactions."""

try:
import certifi
except ImportError:
certifi = None
import json
import logging
import os
import pipes
import platform
import random
try:
import requests
except ImportError:
requests = None
import socket
import ssl
import sys
Expand Down Expand Up @@ -420,6 +428,7 @@ def get_urlopen_output(req: Any, context: Any) -> (int, str):
"""
exitcode = -1
output = ""
logger.debug('ok about to open url')
try:
output = urllib.request.urlopen(req, context=context)
except urllib.error.HTTPError as exc:
Expand All @@ -428,7 +437,7 @@ def get_urlopen_output(req: Any, context: Any) -> (int, str):
logger.warning(f'connection error: {exc.reason}')
else:
exitcode = 0

logger.debug(f'ok url opened: exitcode={exitcode}')
return exitcode, output


Expand Down Expand Up @@ -545,12 +554,20 @@ def get_panda_server(url: str, port: str, update_server: bool = True) -> str:
if not update_server:
return pandaserver

# set a timeout of 10 seconds to prevent potential hanging due to problems with DNS resolution, or if the DNS
# server is slow to respond
socket.setdefaulttimeout(10)

# add randomization for PanDA server
default = 'pandaserver.cern.ch'
if default in pandaserver:
rnd = random.choice([socket.getfqdn(vv) for vv in set([v[-1][0] for v in socket.getaddrinfo(default, 25443, socket.AF_INET)])])
pandaserver = pandaserver.replace(default, rnd)
logger.debug(f'updated {default} to {pandaserver}')
try:
rnd = random.choice([socket.getfqdn(vv) for vv in set([v[-1][0] for v in socket.getaddrinfo(default, 25443, socket.AF_INET)])])
except socket.herror as exc:
logger.warning(f'failed to get address from socket: {exc} - will use default server ({pandaserver})')
else:
pandaserver = pandaserver.replace(default, rnd)
logger.debug(f'updated {default} to {pandaserver}')

return pandaserver

Expand Down Expand Up @@ -609,3 +626,139 @@ def get_server_command(url: str, port: str, cmd: str = 'getJob') -> str:
# randomize server name
url = get_panda_server(url, port)
return f'{url}/server/panda/{cmd}'


def request2_bad(url: str, data: dict = {}) -> str:
"""
Send a request using HTTPS.
:param url: the URL of the resource (str)
:param data: data to send (dict)
:return: server response (str).
"""

# convert the dictionary to a JSON string
data_json = json.dumps(data).encode('utf-8')

# Create an SSLContext object
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
logger.debug(f'capath={_ctx.capath}')
logger.debug(f'cacert={_ctx.cacert}')
ssl_context.load_verify_locations(_ctx.capath)
ssl_context.load_cert_chain(_ctx.cacert)
# define additional headers
headers = {
"Content-Type": "application/json",
"User-Agent": _ctx.user_agent,
}

# create a request object with the SSL context
request = urllib.request.Request(url, data=data_json, headers=headers, method='POST')

# perform the HTTP request with the SSL context
try:
response = urllib.request.urlopen(request, context=ssl_context)
ret = response.read().decode('utf-8')
except (urllib.error.URLError, urllib.error.HTTPError) as exc:
logger.warning(f'failed to send request: {exc}')
ret = ""

return ret


def request2(url: str, data: dict = {}) -> str:
"""
Send a request using HTTPS (using urllib module).
:param url: the URL of the resource (str)
:param data: data to send (dict)
:return: server response (str).
"""
# https might not have been set up if running in a [middleware] container
if not _ctx.cacert:
logger.debug('setting up unset https')
https_setup(None, get_pilot_version())

# define additional headers
headers = {
"Content-Type": "application/json",
"User-Agent": _ctx.user_agent,
}

logger.debug(f'headers={headers}')

# Encode data as JSON
data_json = json.dumps(data).encode('utf-8')
#data_json = urllib.parse.quote(json.dumps(data))
#data_json = data_json.encode('utf-8')

logger.debug(f'data_json={data_json}')

# Set up the request
req = urllib.request.Request(url, data_json, headers=headers)

# Create a context with certificate verification
logger.debug(f'cacert={_ctx.cacert}') # /alrb/x509up_u25606_prod
logger.debug(f'capath={_ctx.capath}') # /cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase/etc/grid-security-emi/certificates
#context = ssl.create_default_context(cafile=_ctx.cacert, capath=_ctx.capath)
#logger.debug(f'context={context}')

ssl_context = ssl.create_default_context(capath=_ctx.capath, cafile=_ctx.cacert)
# Send the request securely
try:
with urllib.request.urlopen(req, context=ssl_context) as response:
# Handle the response here
logger.debug(response.status, response.reason)
logger.debug(response.read().decode('utf-8'))
ret = response.read().decode('utf-8')
except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError) as exc:
logger.warning(f'failed to send request: {exc}')
ret = ""

return ret


def request3(url: str, data: dict = {}) -> str:
"""
Send a request using HTTPS (using requests module).
:param url: the URL of the resource (str)
:param data: data to send (dict)
:return: server response (str).
"""
if not requests:
logger.warning('cannot use requests module (not available)')
return ""
if not certifi:
logger.warning('cannot use certifi module (not available)')
return ""

# https might not have been set up if running in a [middleware] container
if not _ctx.cacert:
logger.debug('setting up unset https')
https_setup(None, get_pilot_version())

# define additional headers
headers = {
"Content-Type": "application/json",
"User-Agent": _ctx.user_agent,
}

# Convert the dictionary to a JSON string
data_json = json.dumps(data)

# Use the requests module to make the HTTP request
try:
# certifi.where() = /cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase/x86_64/python/3.11.7-x86_64-el9/
# lib/python3.11/site-packages/certifi/cacert.pem
# _ctx.cacert = /alrb/x509up_u25606_prod
response = requests.post(url, data=data_json, headers=headers, verify=_ctx.cacert, cert=certifi.where(), timeout=120)
response.raise_for_status() # Raise an error for bad responses (4xx and 5xx)

# Handle the response as needed
ret = response.text
except (requests.exceptions.RequestException, requests.exceptions.Timeout) as exc:
logger.warning(f'failed to send request: {exc}')
ret = ""

return ret
Loading

0 comments on commit e7d7c6f

Please sign in to comment.