Skip to content

Commit

Permalink
Merge pull request #17 from PanDAWMS/next
Browse files Browse the repository at this point in the history
3.2.0.28
  • Loading branch information
PalNilsson authored Feb 24, 2022
2 parents f2ce9f3 + ace8405 commit 85be13f
Show file tree
Hide file tree
Showing 30 changed files with 468 additions and 298 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.1.1.10
3.2.0.28
107 changes: 38 additions & 69 deletions pilot/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#
# Authors:
# - Mario Lassnig, [email protected], 2017
# - Paul Nilsson, [email protected], 2017-2021
# - Paul Nilsson, [email protected], 2017-2022
# - Tobias Wegner, [email protected], 2017-2018
# - Alexey Anisenkov, [email protected], 2018-2019

Expand All @@ -17,11 +17,7 @@
import logging
import time

try:
from functools import reduce # Python 3
#except ModuleNotFoundError: # Python 3
except Exception:
pass
from functools import reduce

from pilot.info import infosys
from pilot.common.exception import PilotException, ErrorCodes, SizeTooLarge, NoLocalSpace, ReplicasNotFound, FileHandlingFailure
Expand Down Expand Up @@ -76,12 +72,8 @@ def __init__(self, infosys_instance=None, acopytools=None, logger=None, default_
self.logger = logger
self.infosys = infosys_instance or infosys

try:
if isinstance(acopytools, basestring): # Python 2 # noqa: F821
acopytools = {'default': [acopytools]} if acopytools else {}
except Exception:
if isinstance(acopytools, str): # Python 3
acopytools = {'default': [acopytools]} if acopytools else {}
if isinstance(acopytools, str):
acopytools = {'default': [acopytools]} if acopytools else {}

if isinstance(acopytools, (list, tuple)):
acopytools = {'default': acopytools} if acopytools else {}
Expand Down Expand Up @@ -114,8 +106,7 @@ def set_acopytools(self):
if not self.acopytools: # resolve from queuedata.acopytools using infosys
self.acopytools = (self.infosys.queuedata.acopytools or {}).copy()
if not self.acopytools: # resolve from queuedata.copytools using infosys
self.acopytools = dict(default=list((self.infosys.queuedata.copytools or {}).keys())) # Python 2/3
#self.acopytools = dict(default=(self.infosys.queuedata.copytools or {}).keys()) # Python 2
self.acopytools = dict(default=list((self.infosys.queuedata.copytools or {}).keys()))

@staticmethod
def get_default_copytools(default_copytools):
Expand All @@ -125,12 +116,8 @@ def get_default_copytools(default_copytools):
:param default_copytools:
:return: default copytools (string).
"""
try:
if isinstance(default_copytools, basestring): # Python 2 # noqa: F821
default_copytools = [default_copytools] if default_copytools else []
except Exception:
if isinstance(default_copytools, str): # Python 3
default_copytools = [default_copytools] if default_copytools else []
if isinstance(default_copytools, str):
default_copytools = [default_copytools] if default_copytools else []
return default_copytools

@classmethod
Expand Down Expand Up @@ -166,12 +153,8 @@ def prepare_inputddms(self, files, activities=None):
"""

activities = activities or 'read_lan'
try:
if isinstance(activities, basestring): # Python 2 # noqa: F821
activities = [activities]
except Exception:
if isinstance(activities, str): # Python 3
activities = [activities]
if isinstance(activities, str):
activities = [activities]

astorages = self.infosys.queuedata.astorages if self.infosys and self.infosys.queuedata else {}

Expand Down Expand Up @@ -336,11 +319,7 @@ def add_replicas(self, fdat, replica):
fdat.replicas = [] # reset replicas list

# sort replicas by priority value
try:
sorted_replicas = sorted(replica.get('pfns', {}).iteritems(), key=lambda x: x[1]['priority']) # Python 2
except Exception:
sorted_replicas = sorted(iter(list(replica.get('pfns', {}).items())),
key=lambda x: x[1]['priority']) # Python 3
sorted_replicas = sorted(iter(list(replica.get('pfns', {}).items())), key=lambda x: x[1]['priority'])

# prefer replicas from inputddms first
xreplicas = self.sort_replicas(sorted_replicas, fdat.inputddms)
Expand Down Expand Up @@ -431,12 +410,8 @@ def transfer(self, files, activity='default', **kwargs): # noqa: C901

self.trace_report.update(relativeStart=time.time(), transferStart=time.time())

try:
if isinstance(activity, basestring): # Python 2 # noqa: F821
activity = [activity]
except Exception:
if isinstance(activity, str): # Python 3
activity = [activity]
if isinstance(activity, str):
activity = [activity]
if 'default' not in activity:
activity.append('default')

Expand All @@ -461,19 +436,11 @@ def transfer(self, files, activity='default', **kwargs): # noqa: C901
if os.environ.get('PILOT_ES_EXECUTOR_TYPE', 'generic') == 'raythena':
fspec.status = 'no_transfer'

try:
fspec.ddm_activity = filter(None, ['read_lan' if fspec.ddmendpoint in fspec.inputddms else None, 'read_wan']) # Python 2
except Exception:
fspec.ddm_activity = [_f for _f in
['read_lan' if fspec.ddmendpoint in fspec.inputddms else None, 'read_wan'] if
_f] # Python 3
fspec.ddm_activity = [_f for _f in
['read_lan' if fspec.ddmendpoint in fspec.inputddms else None, 'read_wan'] if _f]
else:
try:
fspec.ddm_activity = filter(None, ['write_lan' if fspec.ddmendpoint in fspec.inputddms else None, 'write_wan']) # Python 2
except Exception:
fspec.ddm_activity = [_f for _f in
['write_lan' if fspec.ddmendpoint in fspec.inputddms else None, 'write_wan']
if _f] # Python 3
fspec.ddm_activity = [_f for _f in
['write_lan' if fspec.ddmendpoint in fspec.inputddms else None, 'write_wan'] if _f]
caught_errors = []

for name in copytools:
Expand All @@ -491,7 +458,7 @@ def transfer(self, files, activity='default', **kwargs): # noqa: C901

module = self.copytool_modules[name]['module_name']
self.logger.info('trying to use copytool=%s for activity=%s', name, activity)
copytool = __import__('pilot.copytool.%s' % module, globals(), locals(), [module], 0) # Python 2/3
copytool = __import__('pilot.copytool.%s' % module, globals(), locals(), [module], 0)
#self.trace_report.update(protocol=name)

except PilotException as exc:
Expand Down Expand Up @@ -928,14 +895,23 @@ def check_availablespace(self, files):
self.logger.info("total input file size=%s B within allowed limit=%s B (zero value means unlimited)", totalsize, maxinputsize)

# get available space
available_space = convert_mb_to_b(get_local_disk_space(os.getcwd()))
self.logger.info("locally available space: %d B", available_space)

# are we within the limit?
if totalsize > available_space:
error = "not enough local space for staging input files and run the job (need %d B, but only have %d B)" % \
(totalsize, available_space)
raise NoLocalSpace(error)
try:
disk_space = get_local_disk_space(os.getcwd())
except PilotException as exc:
diagnostics = exc.get_detail()
self.logger.warning(f'exception caught while executing df: {diagnostics} (ignoring)')
else:
if disk_space:
available_space = convert_mb_to_b(disk_space)
self.logger.info("locally available space: %d B", available_space)

# are we within the limit?
if totalsize > available_space:
error = "not enough local space for staging input files and run the job (need %d B, but only have %d B)" % \
(totalsize, available_space)
raise NoLocalSpace(error)
else:
self.logger.warning('get_local_disk_space() returned None')


class StageOutClient(StagingClient):
Expand All @@ -954,12 +930,8 @@ def prepare_destinations(self, files, activities):
if not self.infosys.queuedata: # infosys is not initialized: not able to fix destination if need, nothing to do
return files

try:
if isinstance(activities, (str, unicode)): # Python 2 # noqa: F821
activities = [activities]
except Exception:
if isinstance(activities, str): # Python 3
activities = [activities]
if isinstance(activities, str):
activities = [activities]

if not activities:
raise PilotException("Failed to resolve destination: passed empty activity list. Internal error.",
Expand Down Expand Up @@ -1009,15 +981,12 @@ def get_path(self, scope, lfn, prefix='rucio'):
# <prefix=rucio>/<scope>/md5(<scope>:<lfn>)[0:2]/md5(<scope:lfn>)[2:4]/<lfn>

s = '%s:%s' % (scope, lfn)
hash_hex = hashlib.md5(s.encode('utf-8')).hexdigest() # Python 2/3
hash_hex = hashlib.md5(s.encode('utf-8')).hexdigest()

#paths = [prefix] + scope.split('.') + [hash_hex[0:2], hash_hex[2:4], lfn]
# exclude prefix from the path: this should be properly considered in protocol/AGIS for today
paths = scope.split('.') + [hash_hex[0:2], hash_hex[2:4], lfn]
try:
paths = filter(None, paths) # remove empty parts to avoid double /-chars, Python 2
except Exception:
paths = [_f for _f in paths if _f] # remove empty parts to avoid double /-chars, Python 3
paths = [_f for _f in paths if _f] # remove empty parts to avoid double /-chars

return '/'.join(paths)

Expand Down
2 changes: 2 additions & 0 deletions pilot/common/errorcodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,8 @@ def resolve_transform_error(self, exit_code, stderr):

if exit_code == 251 and "Not mounting requested bind point" in stderr:
exit_code = self.SINGULARITYBINDPOINTFAILURE
elif exit_code == 251:
exit_code = self.UNKNOWNTRFFAILURE
elif exit_code == 255 and "No more available loop devices" in stderr:
exit_code = self.SINGULARITYNOLOOPDEVICES
elif exit_code == 255 and "Failed to mount image" in stderr:
Expand Down
5 changes: 2 additions & 3 deletions pilot/control/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
# http://www.apache.org/licenses/LICENSE-2.0
#
# Authors:
# - Paul Nilsson, [email protected], 2021
# - Paul Nilsson, [email protected], 2021-2022

#try:
# # import dask
# import dask_kubernetes
##except ModuleNotFoundError: # Python 3
#except Exception:
#except ModuleNotFoundError:
# pass

# from pilot.util.container import execute
Expand Down
10 changes: 3 additions & 7 deletions pilot/control/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,15 @@
# Authors:
# - Mario Lassnig, [email protected], 2016-2017
# - Daniel Drizhuk, [email protected], 2017
# - Paul Nilsson, [email protected], 2017-2021
# - Paul Nilsson, [email protected], 2017-2022
# - Wen Guan, [email protected], 2018
# - Alexey Anisenkov, [email protected], 2018

import copy as objectcopy
import os
import subprocess
import time

try:
import Queue as queue # noqa: N813
except Exception:
import queue # Python 3
import queue

from pilot.api.data import StageInClient, StageOutClient
from pilot.api.es_data import StageInESClient
Expand Down Expand Up @@ -370,7 +366,7 @@ def stage_out_auto(files):

tmp_executable = objectcopy.deepcopy(executable)

tmp_executable += ['--rse', _file['rse']]
tmp_executable += ['--rses', _file['rse']]

if 'no_register' in list(_file.keys()) and _file['no_register']: # Python 2/3
tmp_executable += ['--no-register']
Expand Down
8 changes: 2 additions & 6 deletions pilot/control/interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,12 @@
# http://www.apache.org/licenses/LICENSE-2.0
#
# Authors:
# - Paul Nilsson, [email protected], 2020-2021
# - Paul Nilsson, [email protected], 2020-2022

# Note: leave this module for now - the code might be useful for reuse

import time

try:
import Queue as queue # noqa: N813
except Exception:
import queue # Python 3
import queue

from pilot.common.exception import ExcThread
from pilot.util.processes import threads_aborted
Expand Down
Loading

0 comments on commit 85be13f

Please sign in to comment.