diff --git a/atlas/lib/idds/atlas/workflow/atlaspandawork.py b/atlas/lib/idds/atlas/workflow/atlaspandawork.py index d79f1352..bc856374 100644 --- a/atlas/lib/idds/atlas/workflow/atlaspandawork.py +++ b/atlas/lib/idds/atlas/workflow/atlaspandawork.py @@ -78,7 +78,7 @@ def my_condition(self): return False def load_panda_config(self): - panda_config = ConfigParser.SafeConfigParser() + panda_config = ConfigParser.ConfigParser() if os.environ.get('IDDS_PANDA_CONFIG', None): configfile = os.environ['IDDS_PANDA_CONFIG'] if panda_config.read(configfile) == [configfile]: diff --git a/atlas/lib/idds/atlas/workflowv2/atlasdagwork.py b/atlas/lib/idds/atlas/workflowv2/atlasdagwork.py index ce36e153..7e6b7885 100644 --- a/atlas/lib/idds/atlas/workflowv2/atlasdagwork.py +++ b/atlas/lib/idds/atlas/workflowv2/atlasdagwork.py @@ -90,7 +90,7 @@ def jobs_to_idd_ds_status(self, jobstatus): return ContentStatus.Processing def load_panda_config(self): - panda_config = ConfigParser.SafeConfigParser() + panda_config = ConfigParser.ConfigParser() if os.environ.get('IDDS_PANDA_CONFIG', None): configfile = os.environ['IDDS_PANDA_CONFIG'] if panda_config.read(configfile) == [configfile]: diff --git a/atlas/lib/idds/atlas/workflowv2/atlaspandawork.py b/atlas/lib/idds/atlas/workflowv2/atlaspandawork.py index c41e4a47..fc27c68c 100644 --- a/atlas/lib/idds/atlas/workflowv2/atlaspandawork.py +++ b/atlas/lib/idds/atlas/workflowv2/atlaspandawork.py @@ -66,6 +66,11 @@ def __init__(self, task_parameters=None, self.panda_url = None self.panda_url_ssl = None self.panda_monitor = None + self.panda_auth = None + self.panda_auth_vo = None + self.panda_config_root = None + self.pandacache_url = None + self.panda_verify_host = None self.task_type = 'test' self.task_parameters = None @@ -83,7 +88,7 @@ def my_condition(self): return False def load_panda_config(self): - panda_config = ConfigParser.SafeConfigParser() + panda_config = ConfigParser.ConfigParser() if os.environ.get('IDDS_PANDA_CONFIG', None): configfile = os.environ['IDDS_PANDA_CONFIG'] if panda_config.read(configfile) == [configfile]: @@ -110,58 +115,36 @@ def load_panda_urls(self): self.panda_config_root = None if panda_config.has_section('panda'): - if panda_config.has_option('panda', 'panda_monitor_url'): + if 'PANDA_MONITOR_URL' not in os.environ and panda_config.has_option('panda', 'panda_monitor_url'): self.panda_monitor = panda_config.get('panda', 'panda_monitor_url') os.environ['PANDA_MONITOR_URL'] = self.panda_monitor # self.logger.debug("Panda monitor url: %s" % str(self.panda_monitor)) - if panda_config.has_option('panda', 'panda_url'): + if 'PANDA_URL' not in os.environ and panda_config.has_option('panda', 'panda_url'): self.panda_url = panda_config.get('panda', 'panda_url') os.environ['PANDA_URL'] = self.panda_url # self.logger.debug("Panda url: %s" % str(self.panda_url)) - if panda_config.has_option('panda', 'panda_url_ssl'): - self.panda_url_ssl = panda_config.get('panda', 'panda_url_ssl') - os.environ['PANDA_URL_SSL'] = self.panda_url_ssl - # self.logger.debug("Panda url ssl: %s" % str(self.panda_url_ssl)) - if panda_config.has_option('panda', 'pandacache_url'): + if 'PANDACACHE_URL' not in os.environ and panda_config.has_option('panda', 'pandacache_url'): self.pandacache_url = panda_config.get('panda', 'pandacache_url') os.environ['PANDACACHE_URL'] = self.pandacache_url # self.logger.debug("Pandacache url: %s" % str(self.pandacache_url)) - if panda_config.has_option('panda', 'panda_verify_host'): + if 'PANDA_VERIFY_HOST' not in os.environ and panda_config.has_option('panda', 'panda_verify_host'): self.panda_verify_host = panda_config.get('panda', 'panda_verify_host') os.environ['PANDA_VERIFY_HOST'] = self.panda_verify_host # self.logger.debug("Panda verify host: %s" % str(self.panda_verify_host)) - if panda_config.has_option('panda', 'panda_auth'): + if 'PANDA_URL_SSL' not in os.environ and panda_config.has_option('panda', 'panda_url_ssl'): + self.panda_url_ssl = panda_config.get('panda', 'panda_url_ssl') + os.environ['PANDA_URL_SSL'] = self.panda_url_ssl + # self.logger.debug("Panda url ssl: %s" % str(self.panda_url_ssl)) + if 'PANDA_AUTH' not in os.environ and panda_config.has_option('panda', 'panda_auth'): self.panda_auth = panda_config.get('panda', 'panda_auth') os.environ['PANDA_AUTH'] = self.panda_auth - if panda_config.has_option('panda', 'panda_auth_vo'): + if 'PANDA_AUTH_VO' not in os.environ and panda_config.has_option('panda', 'panda_auth_vo'): self.panda_auth_vo = panda_config.get('panda', 'panda_auth_vo') os.environ['PANDA_AUTH_VO'] = self.panda_auth_vo - if panda_config.has_option('panda', 'panda_config_root'): + if 'PANDA_CONFIG_ROOT' not in os.environ and panda_config.has_option('panda', 'panda_config_root'): self.panda_config_root = panda_config.get('panda', 'panda_config_root') os.environ['PANDA_CONFIG_ROOT'] = self.panda_config_root - if not self.panda_monitor and 'PANDA_MONITOR_URL' in os.environ and os.environ['PANDA_MONITOR_URL']: - self.panda_monitor = os.environ['PANDA_MONITOR_URL'] - # self.logger.debug("Panda monitor url: %s" % str(self.panda_monitor)) - if not self.panda_url and 'PANDA_URL' in os.environ and os.environ['PANDA_URL']: - self.panda_url = os.environ['PANDA_URL'] - # self.logger.debug("Panda url: %s" % str(self.panda_url)) - if not self.panda_url_ssl and 'PANDA_URL_SSL' in os.environ and os.environ['PANDA_URL_SSL']: - self.panda_url_ssl = os.environ['PANDA_URL_SSL'] - # self.logger.debug("Panda url ssl: %s" % str(self.panda_url_ssl)) - if not self.pandacache_url and 'PANDACACHE_URL' in os.environ and os.environ['PANDACACHE_URL']: - self.pandacache_url = os.environ['PANDACACHE_URL'] - # self.logger.debug("Pandacache url: %s" % str(self.pandacache_url)) - if not self.panda_verify_host and 'PANDA_VERIFY_HOST' in os.environ and os.environ['PANDA_VERIFY_HOST']: - self.panda_verify_host = os.environ['PANDA_VERIFY_HOST'] - # self.logger.debug("Panda verify host: %s" % str(self.panda_verify_host)) - if not self.panda_auth and 'PANDA_AUTH' in os.environ and os.environ['PANDA_AUTH']: - self.panda_auth = os.environ['PANDA_AUTH'] - if not self.panda_auth_vo and 'PANDA_AUTH_VO' in os.environ and os.environ['PANDA_AUTH_VO']: - self.panda_auth_vo = os.environ['PANDA_AUTH_VO'] - if not self.panda_config_root and 'PANDA_CONFIG_ROOT' in os.environ and os.environ['PANDA_CONFIG_ROOT']: - self.panda_config_root = os.environ['PANDA_CONFIG_ROOT'] - def set_agent_attributes(self, attrs, req_attributes=None): if self.class_name not in attrs or 'life_time' not in attrs[self.class_name] or int(attrs[self.class_name]['life_time']) <= 0: attrs['life_time'] = None diff --git a/client/bin/idds b/client/bin/idds index 4da1082f..78b0d41f 100755 --- a/client/bin/idds +++ b/client/bin/idds @@ -32,7 +32,6 @@ def setup(args): cm.setup_local_configuration(local_config_root=args.local_config_root, config=args.config, host=args.host, auth_type=args.auth_type, - auth_type_host=args.auth_type_host, x509_proxy=args.x509_proxy, vo=args.vo, oidc_token=args.oidc_token) @@ -154,7 +153,7 @@ def get_parser(): # setup setup_parser = subparsers.add_parser('setup', help='Setup local configuration') setup_parser.set_defaults(function=setup) - setup_parser.add_argument('--host', dest="auth_type_host", metavar="ADDRESS", help="The iDDS Rest host for the current auth type. For example: https://hostname:443/idds") + setup_parser.add_argument('--host', dest="host", metavar="ADDRESS", help="The iDDS Rest host. For example: https://hostname:443/idds") setup_parser.add_argument('--auth_type', dest='auth_type', action='store', choices=['x509_proxy', 'oidc'], default=None, help='The auth_type in [x509_proxy, oidc]. Default is x509_proxy.') setup_parser.add_argument('--x509_proxy', dest='x509_proxy', action='store', default=None, help='The x509 proxy path. Default is /tmp/x509up_u%d.' % os.geteuid()) setup_parser.add_argument('--vo', dest='vo', action='store', default=None, help='The virtual organization for authentication.') diff --git a/client/lib/idds/client/clientmanager.py b/client/lib/idds/client/clientmanager.py index 8858c779..c38e41f4 100644 --- a/client/lib/idds/client/clientmanager.py +++ b/client/lib/idds/client/clientmanager.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2020 - 2021 +# - Wen Guan, , 2020 - 2022 """ @@ -37,7 +37,8 @@ from idds.client.version import release_version from idds.client.client import Client from idds.common import exceptions -from idds.common.config import get_local_cfg_file, get_local_config_root, get_local_config_value +from idds.common.config import (get_local_cfg_file, get_local_config_root, + get_local_config_value, get_main_config_file) from idds.common.constants import RequestType, RequestStatus, ProcessingStatus # from idds.common.utils import get_rest_host, exception_handler from idds.common.utils import exception_handler @@ -61,35 +62,26 @@ def __init__(self, host=None, timeout=600, setup_client=False): self.local_config_root = None self.config = None self.auth_type = None - self.auth_type_host = None self.x509_proxy = None self.oidc_token = None self.vo = None - self.configuration = ConfigParser.SafeConfigParser() + self.configuration = ConfigParser.ConfigParser() self.client = None # if setup_client: # self.setup_client() def setup_client(self, auth_setup=False): - self.setup_local_configuration(host=self.host) + self.get_local_configuration() if self.host is None: local_cfg = self.get_local_cfg_file() - if self.auth_type is None: - self.auth_type = 'x509_proxy' - self.host = self.get_config_value(local_cfg, self.auth_type, 'host', current=self.host, default=None) - if self.host is None: - self.host = self.get_config_value(local_cfg, 'rest', 'host', current=self.host, default=None) + self.host = self.get_config_value(local_cfg, 'rest', 'host', current=self.host, default=None) if self.client is None: - if self.auth_type_host is not None: - client_host = self.auth_type_host - else: - client_host = self.host if self.auth_type is None: self.auth_type = 'x509_proxy' - self.client = Client(host=client_host, + self.client = Client(host=self.host, auth={'auth_type': self.auth_type, 'client_proxy': self.x509_proxy, 'oidc_token': self.oidc_token, @@ -106,34 +98,67 @@ def get_local_cfg_file(self): return local_cfg def get_config_value(self, configuration, section, name, current, default): + name_envs = {'host': 'IDDS_HOST', + 'local_config_root': 'IDDS_LOCAL_CONFIG_ROOT', + 'config': 'IDDS_CONFIG', + 'auth_type': 'IDDS_AUTH_TYPE', + 'oidc_token': 'IDDS_OIDC_TOKEN', + 'vo': 'IDDS_VO', + 'auth_no_verify': 'IDDS_AUTH_NO_VERIFY'} + + if not section: + section = self.get_section(name) + + if name in name_envs: + env_value = os.environ.get(name_envs[name], None) + if env_value and len(env_value.strip()) > 0: + return env_value + if configuration and type(configuration) in [str]: - config = ConfigParser.SafeConfigParser() + config = ConfigParser.ConfigParser() config.read(configuration) configuration = config + value = get_local_config_value(configuration, section, name, current, default) return value + def get_section(self, name): + name_sections = {'config': 'common', + 'auth_type': 'common', + 'host': 'rest', + 'x509_proxy': 'x509_proxy', + 'oidc_token': 'oidc', + 'vo': 'oidc'} + if name in name_sections: + return name_sections[name] + return 'common' + def get_local_configuration(self): local_cfg = self.get_local_cfg_file() - config = ConfigParser.SafeConfigParser() - if not local_cfg: - logging.debug("local configuration file does not exist, will only load idds default value.") - if local_cfg and os.path.exists(local_cfg): - config.read(local_cfg) + main_cfg = get_main_config_file() + config = ConfigParser.ConfigParser() + if local_cfg and os.path.exists(local_cfg) and main_cfg: + config.read((main_cfg, local_cfg)) + else: + if main_cfg: + config.read(main_cfg) + elif local_cfg and os.path.exists(local_cfg): + config.read(local_cfg) + else: + logging.debug("No local configuration nor IDDS_CONFIG, will only load idds default value.") if self.get_local_config_root(): - self.config = self.get_config_value(config, section='common', name='config', current=self.config, + self.config = self.get_config_value(config, section=None, name='config', current=self.config, default=os.path.join(self.get_local_config_root(), 'idds.cfg')) else: - self.config = self.get_config_value(config, section='common', name='config', current=self.config, + self.config = self.get_config_value(config, section=None, name='config', current=self.config, default=None) - self.auth_type = self.get_config_value(config, 'common', 'auth_type', current=self.auth_type, default='x509_proxy') + self.auth_type = self.get_config_value(config, None, 'auth_type', current=self.auth_type, default='x509_proxy') - self.host = self.get_config_value(config, 'rest', 'host', current=self.host, default=None) - self.auth_type_host = self.get_config_value(config, self.auth_type, 'host', current=self.auth_type_host, default=None) + self.host = self.get_config_value(config, None, 'host', current=self.host, default=None) - self.x509_proxy = self.get_config_value(config, 'x509_proxy', 'x509_proxy', current=self.x509_proxy, + self.x509_proxy = self.get_config_value(config, None, 'x509_proxy', current=self.x509_proxy, default='/tmp/x509up_u%d' % os.geteuid()) if not self.x509_proxy or not os.path.exists(self.x509_proxy): proxy = get_proxy_path() @@ -141,26 +166,45 @@ def get_local_configuration(self): self.x509_proxy = proxy if self.get_local_config_root(): - self.oidc_token = self.get_config_value(config, 'oidc', 'oidc_token', current=self.oidc_token, - default=os.path.join(self.get_local_config_root(), '.oidc_token')) + self.oidc_token = self.get_config_value(config, None, 'oidc_token', current=self.oidc_token, + default=os.path.join(self.get_local_config_root(), '.token')) else: - self.oidc_token = self.get_config_value(config, 'oidc', 'oidc_token', current=self.oidc_token, + self.oidc_token = self.get_config_value(config, None, 'oidc_token', current=self.oidc_token, default=None) - self.vo = self.get_config_value(config, self.auth_type, 'vo', current=self.vo, default=None) + self.vo = self.get_config_value(config, None, 'vo', current=self.vo, default=None) self.configuration = config + def set_local_configuration(self, name, value): + if value: + section = self.get_section(name) + if self.configuration and not self.configuration.has_section(section): + self.configuration.add_section(section) + if name in ['oidc_refresh_lifetime']: + value = str(value) + elif name in ['oidc_auto', 'oidc_polling']: + value = str(value).lower() + if self.configuration: + self.configuration.set(section, name, value) + def save_local_configuration(self): local_cfg = self.get_local_cfg_file() if not local_cfg: logging.debug("local configuration file does not exist, will not store current setup.") else: + self.set_local_configuration(name='config', value=self.config) + self.set_local_configuration(name='auth_type', value=self.auth_type) + self.set_local_configuration(name='host', value=self.host) + self.set_local_configuration(name='x509_proxy', value=self.x509_proxy) + self.set_local_configuration(name='oidc_token', value=self.oidc_token) + self.set_local_configuration(name='vo', value=self.vo) + with open(local_cfg, 'w') as configfile: self.configuration.write(configfile) def setup_local_configuration(self, local_config_root=None, config=None, host=None, - auth_type=None, auth_type_host=None, x509_proxy=None, + auth_type=None, x509_proxy=None, oidc_token=None, vo=None): if 'IDDS_CONFIG' in os.environ and os.environ['IDDS_CONFIG']: @@ -174,7 +218,6 @@ def setup_local_configuration(self, local_config_root=None, config=None, host=No self.config = config self.host = host self.auth_type = auth_type - self.auth_type_host = auth_type_host self.x509_proxy = x509_proxy self.oidc_token = oidc_token self.vo = vo diff --git a/common/lib/idds/common/authentication.py b/common/lib/idds/common/authentication.py index 1ed0a549..0dcfc78f 100644 --- a/common/lib/idds/common/authentication.py +++ b/common/lib/idds/common/authentication.py @@ -61,7 +61,7 @@ def __init__(self, timeout=None): self.max_expires_in = self.config.getint('common', 'max_expires_in') def load_auth_server_config(self): - config = ConfigParser.SafeConfigParser() + config = ConfigParser.ConfigParser() if os.environ.get('IDDS_AUTH_CONFIG', None): configfile = os.environ['IDDS_AUTH_CONFIG'] if config.read(configfile) == [configfile]: @@ -239,7 +239,7 @@ def get_public_key(self, token, jwks_uri): if j.get('kid') == kid: jwk = j if jwk is None: - raise jwt.exceptions.InvalidTokenError('JWK not found for kid={0}'.format(kid, str(jwks))) + raise jwt.exceptions.InvalidTokenError('JWK not found for kid={0}: {1}'.format(kid, str(jwks))) public_num = RSAPublicNumbers(n=decode_value(jwk['n']), e=decode_value(jwk['e'])) public_key = public_num.public_key(default_backend()) diff --git a/common/lib/idds/common/config.py b/common/lib/idds/common/config.py index 9bf1769d..fad6e1ed 100644 --- a/common/lib/idds/common/config.py +++ b/common/lib/idds/common/config.py @@ -128,7 +128,7 @@ def config_get_bool(section, option): def get_local_config_root(local_config_root=None): if 'IDDS_LOCAL_CONFIG_ROOT' in os.environ and os.environ['IDDS_LOCAL_CONFIG_ROOT']: if local_config_root is None: - print("IDDS_LOCAL_CONFIG_ROOT is set. Will use it.") + # print("IDDS_LOCAL_CONFIG_ROOT is set. Will use it.") local_config_root = os.environ['IDDS_LOCAL_CONFIG_ROOT'] else: print("local_config_root is set to %s. Ignore IDDS_LOCAL_CONFIG_ROOT" % local_config_root) @@ -151,6 +151,10 @@ def get_local_config_root(local_config_root=None): os.makedirs(local_config_root, exist_ok=True) except Exception as ex: print("Failed to create %s: %s", local_config_root, str(ex)) + local_config_root = None + if local_config_root and not os.access(local_config_root, os.W_OK): + print("No write permission on local config root %s, set it to None" % (local_config_root)) + local_config_root = None return local_config_root @@ -175,21 +179,27 @@ def get_local_config_value(configuration, section, name, current, default): value = current elif value is None: value = default - - if configuration and not configuration.has_section(section): - configuration.add_section(section) - if value is not None: - if name in ['oidc_refresh_lifetime']: - value = str(value) - elif name in ['oidc_auto', 'oidc_polling']: - value = str(value).lower() - if configuration: - configuration.set(section, name, value) return value -def get_config(): - __CONFIG = ConfigParser.SafeConfigParser() +def get_main_config_file(): + __CONFIG = ConfigParser.ConfigParser() + if os.environ.get('IDDS_CONFIG', None): + configfile = os.environ['IDDS_CONFIG'] + if __CONFIG.read(configfile) == [configfile]: + return configfile + else: + configfiles = ['%s/etc/idds/idds.cfg' % os.environ.get('IDDS_HOME', ''), + '/etc/idds/idds.cfg', + '%s/etc/idds/idds.cfg' % os.environ.get('VIRTUAL_ENV', '')] + for configfile in configfiles: + if __CONFIG.read(configfile) == [configfile]: + return configfile + return None + + +def get_main_config(): + __CONFIG = ConfigParser.ConfigParser() __HAS_CONFIG = False if os.environ.get('IDDS_CONFIG', None): @@ -208,6 +218,11 @@ def get_config(): __HAS_CONFIG = True # print("Configuration file %s is used" % configfile) break + return __CONFIG, __HAS_CONFIG + + +def get_config(): + __CONFIG, __HAS_CONFIG = get_main_config() if not __HAS_CONFIG: local_cfg = get_local_cfg_file() diff --git a/common/lib/idds/common/exceptions.py b/common/lib/idds/common/exceptions.py index 4c54a629..b1617c17 100644 --- a/common/lib/idds/common/exceptions.py +++ b/common/lib/idds/common/exceptions.py @@ -262,3 +262,13 @@ def __init__(self, *args, **kwargs): super(AuthenticationNotSupported, self).__init__(*args, **kwargs) self._message = "Authentication not supported." self.error_code = 602 + + +class AuthenticationNoPermission(IDDSException): + """ + Authentication No Permission + """ + def __init__(self, *args, **kwargs): + super(AuthenticationNoPermission, self).__init__(*args, **kwargs) + self._message = "Authentication no permission." + self.error_code = 603 diff --git a/doma/lib/idds/doma/workflow/domapandawork.py b/doma/lib/idds/doma/workflow/domapandawork.py index 0cf013f9..8205ca94 100644 --- a/doma/lib/idds/doma/workflow/domapandawork.py +++ b/doma/lib/idds/doma/workflow/domapandawork.py @@ -99,7 +99,7 @@ def my_condition(self): return False def load_panda_config(self): - panda_config = ConfigParser.SafeConfigParser() + panda_config = ConfigParser.ConfigParser() if os.environ.get('IDDS_PANDA_CONFIG', None): configfile = os.environ['IDDS_PANDA_CONFIG'] if panda_config.read(configfile) == [configfile]: @@ -266,8 +266,8 @@ def is_all_dependency_tasks_available(self, inputs_dependency, task_name_to_coll for input_d in inputs_dependency: task_name = input_d['task'] if (task_name not in task_name_to_coll_map # noqa: W503 - or 'outputs' not in task_name_to_coll_map[task_name] # noqa: W503 - or not task_name_to_coll_map[task_name]['outputs']): # noqa: W503 + or 'outputs' not in task_name_to_coll_map[task_name] # noqa: W503 + or not task_name_to_coll_map[task_name]['outputs']): # noqa: W503 return False return True diff --git a/doma/lib/idds/doma/workflowv2/domapandawork.py b/doma/lib/idds/doma/workflowv2/domapandawork.py index 5f3f53c3..861c6582 100644 --- a/doma/lib/idds/doma/workflowv2/domapandawork.py +++ b/doma/lib/idds/doma/workflowv2/domapandawork.py @@ -73,6 +73,8 @@ def __init__(self, executable=None, arguments=None, parameters=None, setup=None, self.panda_auth = None self.panda_auth_vo = None self.panda_config_root = None + self.pandacache_url = None + self.panda_verify_host = None self.dependency_map = dependency_map self.dependency_map_deleted = [] @@ -90,7 +92,7 @@ def __init__(self, executable=None, arguments=None, parameters=None, setup=None, self.task_type = task_type self.maxWalltime = maxwalltime self.maxAttempt = maxattempt if maxattempt else 5 - self.core_count = core_count + self.core_count = core_count if core_count else 1 self.task_log = task_log self.encode_command_line = encode_command_line @@ -117,7 +119,7 @@ def my_condition(self): return False def load_panda_config(self): - panda_config = ConfigParser.SafeConfigParser() + panda_config = ConfigParser.ConfigParser() if os.environ.get('IDDS_PANDA_CONFIG', None): configfile = os.environ['IDDS_PANDA_CONFIG'] if panda_config.read(configfile) == [configfile]: @@ -144,58 +146,36 @@ def load_panda_urls(self): self.panda_verify_host = None if panda_config.has_section('panda'): - if panda_config.has_option('panda', 'panda_monitor_url'): + if 'PANDA_MONITOR_URL' not in os.environ and panda_config.has_option('panda', 'panda_monitor_url'): self.panda_monitor = panda_config.get('panda', 'panda_monitor_url') os.environ['PANDA_MONITOR_URL'] = self.panda_monitor # self.logger.debug("Panda monitor url: %s" % str(self.panda_monitor)) - if panda_config.has_option('panda', 'panda_url'): + if 'PANDA_URL' not in os.environ and panda_config.has_option('panda', 'panda_url'): self.panda_url = panda_config.get('panda', 'panda_url') os.environ['PANDA_URL'] = self.panda_url # self.logger.debug("Panda url: %s" % str(self.panda_url)) - if panda_config.has_option('panda', 'pandacache_url'): + if 'PANDACACHE_URL' not in os.environ and panda_config.has_option('panda', 'pandacache_url'): self.pandacache_url = panda_config.get('panda', 'pandacache_url') os.environ['PANDACACHE_URL'] = self.pandacache_url # self.logger.debug("Pandacache url: %s" % str(self.pandacache_url)) - if panda_config.has_option('panda', 'panda_verify_host'): + if 'PANDA_VERIFY_HOST' not in os.environ and panda_config.has_option('panda', 'panda_verify_host'): self.panda_verify_host = panda_config.get('panda', 'panda_verify_host') os.environ['PANDA_VERIFY_HOST'] = self.panda_verify_host # self.logger.debug("Panda verify host: %s" % str(self.panda_verify_host)) - if panda_config.has_option('panda', 'panda_url_ssl'): + if 'PANDA_URL_SSL' not in os.environ and panda_config.has_option('panda', 'panda_url_ssl'): self.panda_url_ssl = panda_config.get('panda', 'panda_url_ssl') os.environ['PANDA_URL_SSL'] = self.panda_url_ssl # self.logger.debug("Panda url ssl: %s" % str(self.panda_url_ssl)) - if panda_config.has_option('panda', 'panda_auth'): + if 'PANDA_AUTH' not in os.environ and panda_config.has_option('panda', 'panda_auth'): self.panda_auth = panda_config.get('panda', 'panda_auth') os.environ['PANDA_AUTH'] = self.panda_auth - if panda_config.has_option('panda', 'panda_auth_vo'): + if 'PANDA_AUTH_VO' not in os.environ and panda_config.has_option('panda', 'panda_auth_vo'): self.panda_auth_vo = panda_config.get('panda', 'panda_auth_vo') os.environ['PANDA_AUTH_VO'] = self.panda_auth_vo - if panda_config.has_option('panda', 'panda_config_root'): + if 'PANDA_CONFIG_ROOT' not in os.environ and panda_config.has_option('panda', 'panda_config_root'): self.panda_config_root = panda_config.get('panda', 'panda_config_root') os.environ['PANDA_CONFIG_ROOT'] = self.panda_config_root - if not self.panda_monitor and 'PANDA_MONITOR_URL' in os.environ and os.environ['PANDA_MONITOR_URL']: - self.panda_monitor = os.environ['PANDA_MONITOR_URL'] - # self.logger.debug("Panda monitor url: %s" % str(self.panda_monitor)) - if not self.panda_url and 'PANDA_URL' in os.environ and os.environ['PANDA_URL']: - self.panda_url = os.environ['PANDA_URL'] - # self.logger.debug("Panda url: %s" % str(self.panda_url)) - if not self.panda_url_ssl and 'PANDA_URL_SSL' in os.environ and os.environ['PANDA_URL_SSL']: - self.panda_url_ssl = os.environ['PANDA_URL_SSL'] - # self.logger.debug("Panda url ssl: %s" % str(self.panda_url_ssl)) - if not self.pandacache_url and 'PANDACACHE_URL' in os.environ and os.environ['PANDACACHE_URL']: - self.pandacache_url = os.environ['PANDACACHE_URL'] - # self.logger.debug("Pandacache url: %s" % str(self.pandacache_url)) - if not self.panda_verify_host and 'PANDA_VERIFY_HOST' in os.environ and os.environ['PANDA_VERIFY_HOST']: - self.panda_verify_host = os.environ['PANDA_VERIFY_HOST'] - # self.logger.debug("Panda verify host: %s" % str(self.panda_verify_host)) - if not self.panda_auth and 'PANDA_AUTH' in os.environ and os.environ['PANDA_AUTH']: - self.panda_auth = os.environ['PANDA_AUTH'] - if not self.panda_auth_vo and 'PANDA_AUTH_VO' in os.environ and os.environ['PANDA_AUTH_VO']: - self.panda_auth_vo = os.environ['PANDA_AUTH_VO'] - if not self.panda_config_root and 'PANDA_CONFIG_ROOT' in os.environ and os.environ['PANDA_CONFIG_ROOT']: - self.panda_config_root = os.environ['PANDA_CONFIG_ROOT'] - def set_agent_attributes(self, attrs, req_attributes=None): if 'life_time' not in attrs[self.class_name] or int(attrs[self.class_name]['life_time']) <= 0: attrs['life_time'] = None @@ -324,8 +304,8 @@ def is_all_dependency_tasks_available(self, inputs_dependency, task_name_to_coll for input_d in inputs_dependency: task_name = input_d['task'] if (task_name not in task_name_to_coll_map # noqa: W503 - or 'outputs' not in task_name_to_coll_map[task_name] # noqa: W503 - or not task_name_to_coll_map[task_name]['outputs']): # noqa: W503 + or 'outputs' not in task_name_to_coll_map[task_name] # noqa: W503 + or not task_name_to_coll_map[task_name]['outputs']): # noqa: W503 return False return True @@ -1139,7 +1119,7 @@ def poll_processing_updates(self, processing, input_output_maps): elif proc.tosuspend: self.logger.info("Suspending processing (processing id: %s, jediTaskId: %s)" % (processing['processing_id'], proc.workload_id)) # self.kill_processing_force(processing) - self.kill_processing(processing) + self.kill_processing_force(processing) proc.tosuspend = False proc.polling_retries = 0 elif proc.toresume: diff --git a/main/lib/idds/agents/transporter/transporter.py b/main/lib/idds/agents/transporter/transporter.py index 070a6174..06168709 100644 --- a/main/lib/idds/agents/transporter/transporter.py +++ b/main/lib/idds/agents/transporter/transporter.py @@ -325,11 +325,11 @@ def process_output_collection(self, coll): elif content_status_keys == [ContentStatus.FinalFailed] or content_status_keys == [ContentStatus.FinalFailed.value]: coll_status = CollectionStatus.Failed elif (len(content_status_keys) == 2 # noqa: W503 - and (ContentStatus.FinalFailed in content_status_keys or ContentStatus.FinalFailed.value in content_status_keys) # noqa: W503 - and (ContentStatus.Available in content_status_keys or ContentStatus.Available.value in content_status_keys)): # noqa: W503 + and (ContentStatus.FinalFailed in content_status_keys or ContentStatus.FinalFailed.value in content_status_keys) # noqa: W503, E128 + and (ContentStatus.Available in content_status_keys or ContentStatus.Available.value in content_status_keys)): # noqa: W503, E128, E125 coll_status = CollectionStatus.SubClosed elif (ContentStatus.New in content_status_keys or ContentStatus.New.value in content_status_keys # noqa: W503 - or ContentStatus.Failed in content_status_keys or ContentStatus.Failed.value in content_status_keys): # noqa: W503 + or ContentStatus.Failed in content_status_keys or ContentStatus.Failed.value in content_status_keys): # noqa: W503, E128, E125 # coll_status = CollectionStatus.Processing coll_status = CollectionStatus.Failed else: diff --git a/main/lib/idds/core/transforms.py b/main/lib/idds/core/transforms.py index 08ffdc54..aacba801 100644 --- a/main/lib/idds/core/transforms.py +++ b/main/lib/idds/core/transforms.py @@ -443,13 +443,13 @@ def release_inputs_by_collection_old(to_release_inputs): to_release_names_missing = [] for to_release_content in to_release_contents: if (to_release_content['status'] in [ContentStatus.Available] # noqa: W503 - or to_release_content['substatus'] in [ContentStatus.Available]): # noqa: W503 + or to_release_content['substatus'] in [ContentStatus.Available]): # noqa: W503 to_release_names_available.append(to_release_content['name']) elif (to_release_content['status'] in [ContentStatus.FakeAvailable] # noqa: W503 - or to_release_content['substatus'] in [ContentStatus.FakeAvailable]): # noqa: W503 + or to_release_content['substatus'] in [ContentStatus.FakeAvailable]): # noqa: W503, E128 to_release_names_fake_available.append(to_release_content['name']) elif (to_release_content['status'] in [ContentStatus.FinalFailed] # noqa: W503 - or to_release_content['substatus'] in [ContentStatus.FinalFailed]): # noqa: W503 + or to_release_content['substatus'] in [ContentStatus.FinalFailed]): # noqa: W503, E128 to_release_names_final_failed.append(to_release_content['name']) elif (to_release_content['status'] in [ContentStatus.Missing] # noqa: W503 or to_release_content['substatus'] in [ContentStatus.Missing]): # noqa: W503 @@ -461,7 +461,7 @@ def release_inputs_by_collection_old(to_release_inputs): for content in contents: if (content['content_relation_type'] == ContentRelationType.InputDependency): # noqa: W503 if (content['status'] not in [ContentStatus.Available] # noqa: W503 - and content['name'] in to_release_names_available): # noqa: W503 + and content['name'] in to_release_names_available): # noqa: W503 update_content = {'content_id': content['content_id'], 'substatus': ContentStatus.Available, 'status': ContentStatus.Available} @@ -610,7 +610,7 @@ def get_work_name_to_coll_map(request_id): work_name_to_coll_map = {} for tf in tfs: if ('transform_metadata' in tf and tf['transform_metadata'] - and 'work_name' in tf['transform_metadata'] and tf['transform_metadata']['work_name']): # noqa: W503 + and 'work_name' in tf['transform_metadata'] and tf['transform_metadata']['work_name']): # noqa: W503 work_name = tf['transform_metadata']['work_name'] transform_id = tf['transform_id'] if work_name not in work_name_to_coll_map: diff --git a/main/lib/idds/rest/v1/controller.py b/main/lib/idds/rest/v1/controller.py index c418a5ea..6a6931de 100644 --- a/main/lib/idds/rest/v1/controller.py +++ b/main/lib/idds/rest/v1/controller.py @@ -38,6 +38,11 @@ def delete(self): def get_request(sel): return request + def get_username(self): + if 'username' in request.environ and request.environ['username']: + return request.environ['username'] + return None + def generate_message(self, exc_cls=None, exc_msg=None): if exc_cls is None and exc_msg is None: return None diff --git a/main/lib/idds/rest/v1/messages.py b/main/lib/idds/rest/v1/messages.py index 13906cae..6685fc87 100644 --- a/main/lib/idds/rest/v1/messages.py +++ b/main/lib/idds/rest/v1/messages.py @@ -17,6 +17,7 @@ from idds.common.constants import (HTTP_STATUS_CODE, MessageType, MessageStatus, MessageSource, MessageDestination) from idds.common.utils import json_loads +from idds.core.requests import get_requests from idds.core.messages import add_message, retrieve_messages from idds.rest.v1.controller import IDDSController @@ -40,6 +41,27 @@ def get(self, request_id, workload_id): if workload_id == 'null': workload_id = None + if request_id is None: + raise Exception("request_id should not be None") + except Exception as error: + print(error) + print(format_exc()) + return self.generate_http_response(HTTP_STATUS_CODE.BadRequest, exc_cls=exceptions.BadRequest.__name__, exc_msg=str(error)) + + try: + username = self.get_username() + reqs = get_requests(request_id=request_id, workload_id=workload_id, with_request=True) + for req in reqs: + if req['username'] and req['username'] != username: + raise exceptions.AuthenticationNoPermission("User %s has no permission to update request %s" % (username, req['request_id'])) + except exceptions.AuthenticationNoPermission as error: + return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=error.__class__.__name__, exc_msg=error) + except Exception as error: + print(error) + print(format_exc()) + return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=exceptions.CoreException.__name__, exc_msg=error) + + try: msgs = retrieve_messages(request_id=request_id, workload_id=workload_id) rets = [] for msg in msgs: @@ -75,6 +97,19 @@ def post(self, request_id, workload_id): print(format_exc()) return self.generate_http_response(HTTP_STATUS_CODE.BadRequest, exc_cls=exceptions.BadRequest.__name__, exc_msg=str(error)) + try: + username = self.get_username() + reqs = get_requests(request_id=request_id, workload_id=workload_id, with_request=True) + for req in reqs: + if req['username'] and req['username'] != username: + raise exceptions.AuthenticationNoPermission("User %s has no permission to update request %s" % (username, req['request_id'])) + except exceptions.AuthenticationNoPermission as error: + return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=error.__class__.__name__, exc_msg=error) + except Exception as error: + print(error) + print(format_exc()) + return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=exceptions.CoreException.__name__, exc_msg=error) + try: msg = self.get_request().data and json_loads(self.get_request().data) # command = msg['command'] diff --git a/main/lib/idds/rest/v1/requests.py b/main/lib/idds/rest/v1/requests.py index a13b25d7..810ca398 100644 --- a/main/lib/idds/rest/v1/requests.py +++ b/main/lib/idds/rest/v1/requests.py @@ -115,6 +115,19 @@ def put(self, request_id): except ValueError: return self.generate_http_response(HTTP_STATUS_CODE.BadRequest, exc_cls=exceptions.BadRequest.__name__, exc_msg='Cannot decode json parameter dictionary') + try: + username = self.get_username() + reqs = get_requests(request_id=request_id, with_request=True) + for req in reqs: + if req['username'] and req['username'] != username: + raise exceptions.AuthenticationNoPermission("User %s has no permission to update request %s" % (username, req['request_id'])) + except exceptions.AuthenticationNoPermission as error: + return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=error.__class__.__name__, exc_msg=error) + except Exception as error: + print(error) + print(format_exc()) + return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=exceptions.CoreException.__name__, exc_msg=error) + try: # update_request(request_id, parameters) # msg = {'command': 'update_request', 'parameters': {'status': RequestStatus.ToSuspend}}) diff --git a/main/lib/idds/tests/auth_test_script.py b/main/lib/idds/tests/auth_test_script.py index ba3715a7..c33d4c0a 100644 --- a/main/lib/idds/tests/auth_test_script.py +++ b/main/lib/idds/tests/auth_test_script.py @@ -23,7 +23,7 @@ import sys import time -import unittest2 as unittest +import unittest2 as unittest # noqa F401 # from nose.tools import assert_equal from idds.common.utils import setup_logging from idds.common.authentication import OIDCAuthentication diff --git a/main/lib/idds/tests/find_dependencies.py b/main/lib/idds/tests/find_dependencies.py index 18f94e35..01217c70 100644 --- a/main/lib/idds/tests/find_dependencies.py +++ b/main/lib/idds/tests/find_dependencies.py @@ -1,5 +1,3 @@ -import sys -import datetime from idds.common.utils import json_dumps # noqa F401 from idds.common.constants import ContentStatus, ContentType, ContentRelationType, ContentLocking # noqa F401 @@ -10,7 +8,7 @@ from idds.core.processings import get_processings # noqa F401 from idds.core import transforms as core_transforms # noqa F401 from idds.core import catalog as core_catalog # noqa F401 -from idds.orm.contents import get_input_contents +from idds.orm.contents import get_input_contents # noqa F401 from idds.core.transforms import release_inputs_by_collection, release_inputs_by_collection_old # noqa F401 diff --git a/main/lib/idds/tests/jsonload_test.py b/main/lib/idds/tests/jsonload_test.py new file mode 100644 index 00000000..92192ac1 --- /dev/null +++ b/main/lib/idds/tests/jsonload_test.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python + +import json +import re +import os + + +is_unicode_defined = True +try: + _ = unicode('test') +except NameError: + is_unicode_defined = False + + +def is_string(value): + if is_unicode_defined and type(value) in [str, unicode] or type(value) in [str]: # noqa F821 + return True + else: + return False + + +def as_parse_env(dct): + print(dct) + for key in dct: + value = dct[key] + print(value) + print(type(value)) + if is_string(value) and value.startswith('$'): + env_match = re.search('\$\{*([^\}]+)\}*', value) # noqa W605 + env_name = env_match.group(1) + if env_name not in os.environ: + print("Error: %s is defined in configmap but is not defined in environments" % env_name) + else: + dct[key] = os.environ[env_name] + return dct + + +if __name__ == '__main__': + json_string = """ +{ "/opt/idds/config/panda.cfg": + {"panda": + {"panda_url_ssl": "${PANDA_URL_SSL}", + "panda_url": "${PANDA_URL}", + "panda_monitor_url": "${PANDA_MONITOR_URL}", + "# PANDA_AUTH_VO": "panda_dev", + "panda_auth": "${PANDA_AUTH}", + "panda_auth_vo": "${PANDA_AUTH_VO}", + "panda_config_root": "${PANDA_CONFIG_ROOT}", + "pandacache_url": "${PANDACACHE_URL}", + "panda_verify_host": "${PANDA_VERIFY_HOST}", + "test1": {"test2": "${TEST_ENV}"} + } + } +} +""" + print("test1") + test1 = json.loads(json_string) + print(test1) + print("test2") + test1 = json.loads(json_string, object_hook=as_parse_env) + print(test1) diff --git a/main/lib/idds/tests/rest_test.py b/main/lib/idds/tests/rest_test.py index d4e3494a..b08f885f 100644 --- a/main/lib/idds/tests/rest_test.py +++ b/main/lib/idds/tests/rest_test.py @@ -16,7 +16,7 @@ import pprint os.environ['IDDS_CONFIG'] = '/opt/idds/etc/idds/idds.cfg' # noqa: E402 -from idds.rest.v1.app import create_app +from idds.rest.v1.app import create_app # noqa E402 application = create_app() diff --git a/main/lib/idds/tests/test_domapanda_workflow.py b/main/lib/idds/tests/test_domapanda_workflow.py index 119c33ee..9f202bc7 100644 --- a/main/lib/idds/tests/test_domapanda_workflow.py +++ b/main/lib/idds/tests/test_domapanda_workflow.py @@ -26,7 +26,7 @@ # from rucio.common.exception import CannotAuthenticate # from idds.client.client import Client -from idds.client.clientmanager import ClientManager +from idds.client.clientmanager import ClientManager # noqa E402 # from idds.common.constants import RequestType, RequestStatus # from idds.common.utils import get_rest_host # from idds.tests.common import get_example_real_tape_stagein_request @@ -34,9 +34,9 @@ # from idds.workflowv2.work import Work, Parameter, WorkStatus # from idds.workflowv2.workflow import Condition, Workflow -from idds.workflowv2.workflow import Workflow, Condition +from idds.workflowv2.workflow import Workflow, Condition # noqa E402 # from idds.atlas.workflowv2.atlasstageinwork import ATLASStageinWork -from idds.doma.workflowv2.domapandawork import DomaPanDAWork +from idds.doma.workflowv2.domapandawork import DomaPanDAWork # noqa E402 task_queue = 'DOMA_LSST_GOOGLE_TEST' diff --git a/main/tools/env/merge_configmap.py b/main/tools/env/merge_configmap.py index cfdb882c..4835f420 100644 --- a/main/tools/env/merge_configmap.py +++ b/main/tools/env/merge_configmap.py @@ -13,11 +13,39 @@ import logging import os +import re import json import configparser +is_unicode_defined = True +try: + _ = unicode('test') +except NameError: + is_unicode_defined = False + + +def is_string(value): + if is_unicode_defined and type(value) in [str, unicode] or type(value) in [str]: # noqa F821 + return True + else: + return False + + +def as_parse_env(dct): + for key in dct: + value = dct[key] + if is_string(value) and value.startswith('$'): + env_match = re.search('\$\{*([^\}]+)\}*', value) # noqa W605 + env_name = env_match.group(1) + if env_name not in os.environ: + print("Error: %s is defined in configmap but is not defined in environments" % env_name) + else: + dct[key] = os.environ[env_name] + return dct + + def merge_configs(source_file_path, dest_file_path): """ Merge configuration file. @@ -26,7 +54,7 @@ def merge_configs(source_file_path, dest_file_path): if source_file_path and dest_file_path: if os.path.exists(source_file_path) and os.path.exists(dest_file_path): with open(source_file_path, 'r') as f: - data = json.load(f) + data = json.load(f, object_hook=as_parse_env) if dest_file_path in data: data_conf = data[dest_file_path] parser = configparser.ConfigParser() @@ -36,10 +64,24 @@ def merge_configs(source_file_path, dest_file_path): parser.write(dest_file) +def create_oidc_token(): + if 'PANDA_AUTH_ID_TOKEN' in os.environ: + config_root = os.environ.get('PANDA_CONFIG_ROOT', '/tmp') + token_file = os.path.join(config_root, '.token') + token = {"id_token": os.environ.get('PANDA_AUTH_ID_TOKEN', ""), + "token_type": "Bearer"} + + with open(token_file, 'w') as f: + f.write(json.dumps(token)) + + logging.getLogger().setLevel(logging.INFO) parser = argparse.ArgumentParser(description="Merge configuration file from configmap") parser.add_argument('-s', '--source', default=None, help='Source config file path (in .json format)') parser.add_argument('-d', '--destination', default=None, help='Destination file path') +parser.add_argument('-c', '--create_oidc_token', default=False, action='store_true', help='Create the oidc token based on environment') args = parser.parse_args() merge_configs(args.source, args.destination) +if args.create_oidc_token: + create_oidc_token() diff --git a/main/tools/env/setup_dev.sh b/main/tools/env/setup_dev.sh index e949e71e..1c2fe9e7 100644 --- a/main/tools/env/setup_dev.sh +++ b/main/tools/env/setup_dev.sh @@ -8,6 +8,8 @@ # Authors: # - Wen Guan, , 2019 +# export LANG=en_US.UTF-8 +# export LC_ALL=en_US.UTF-8 CurrentDir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" ToolsDir="$( dirname "$CurrentDir" )" diff --git a/monitor/data/conf.js b/monitor/data/conf.js index f8739468..b51b5d45 100644 --- a/monitor/data/conf.js +++ b/monitor/data/conf.js @@ -1,9 +1,9 @@ var appConfig = { - 'iddsAPI_request': "https://lxplus7107.cern.ch:443/idds/monitor_request/null/null", - 'iddsAPI_transform': "https://lxplus7107.cern.ch:443/idds/monitor_transform/null/null", - 'iddsAPI_processing': "https://lxplus7107.cern.ch:443/idds/monitor_processing/null/null", - 'iddsAPI_request_detail': "https://lxplus7107.cern.ch:443/idds/monitor/null/null/true/false/false", - 'iddsAPI_transform_detail': "https://lxplus7107.cern.ch:443/idds/monitor/null/null/false/true/false", - 'iddsAPI_processing_detail': "https://lxplus7107.cern.ch:443/idds/monitor/null/null/false/false/true" + 'iddsAPI_request': "https://lxplus787.cern.ch:443/idds/monitor_request/null/null", + 'iddsAPI_transform': "https://lxplus787.cern.ch:443/idds/monitor_transform/null/null", + 'iddsAPI_processing': "https://lxplus787.cern.ch:443/idds/monitor_processing/null/null", + 'iddsAPI_request_detail': "https://lxplus787.cern.ch:443/idds/monitor/null/null/true/false/false", + 'iddsAPI_transform_detail': "https://lxplus787.cern.ch:443/idds/monitor/null/null/false/true/false", + 'iddsAPI_processing_detail': "https://lxplus787.cern.ch:443/idds/monitor/null/null/false/false/true" } diff --git a/start-daemon.sh b/start-daemon.sh index df839ffe..43a0dd86 100755 --- a/start-daemon.sh +++ b/start-daemon.sh @@ -80,6 +80,10 @@ else -d /opt/idds/config/rucio.cfg fi +# generate oidc token from environment +echo "generate oidc token from environment PANDA_AUTH_ID_TOKEN if it exists." +python3 /opt/idds/tools/env/merge_configmap.py --create_oidc_token + if [ -f /opt/idds/config/idds/httpd-idds-443-py39-cc7.conf ]; then echo "httpd conf already mounted." else diff --git a/workflow/lib/idds/workflow/work.py b/workflow/lib/idds/workflow/work.py index a69545a9..b4d8965a 100644 --- a/workflow/lib/idds/workflow/work.py +++ b/workflow/lib/idds/workflow/work.py @@ -1367,11 +1367,11 @@ def should_release_inputs(self, processing=None, poll_operation_time_period=120) processing_model = processing.processing if (processing_model and processing_model['submitted_at'] # noqa: W503 - and processing_model['submitted_at'] + datetime.timedelta(seconds=int(poll_operation_time_period)) # noqa: W503 - < datetime.datetime.utcnow()): # noqa: W503 + and processing_model['submitted_at'] + datetime.timedelta(seconds=int(poll_operation_time_period)) # noqa: W503 + < datetime.datetime.utcnow()): # noqa: W503 if (processing and processing.status - and processing.status not in [ProcessingStatus.New, ProcessingStatus.New.value]): # noqa: W503 + and processing.status not in [ProcessingStatus.New, ProcessingStatus.New.value]): # noqa: W503 # and processing.status not in [ProcessingStatus.New, ProcessingStatus.New.value, # noqa: W503 # ProcessingStatus.Submitting, ProcessingStatus.Submitting.value]): # noqa: W503 return True diff --git a/workflow/lib/idds/workflowv2/work.py b/workflow/lib/idds/workflowv2/work.py index 1a3095ff..8e14595d 100644 --- a/workflow/lib/idds/workflowv2/work.py +++ b/workflow/lib/idds/workflowv2/work.py @@ -1703,11 +1703,11 @@ def should_release_inputs(self, processing=None, poll_operation_time_period=120) processing_model = processing.processing if (processing_model and processing_model['submitted_at'] # noqa: W503 - and processing_model['submitted_at'] + datetime.timedelta(seconds=int(poll_operation_time_period)) # noqa: W503 - < datetime.datetime.utcnow()): # noqa: W503 + and processing_model['submitted_at'] + datetime.timedelta(seconds=int(poll_operation_time_period)) # noqa: W503 + < datetime.datetime.utcnow()): # noqa: W503 if (processing and processing.status - and processing.status not in [ProcessingStatus.New, ProcessingStatus.New.value]): # noqa: W503 + and processing.status not in [ProcessingStatus.New, ProcessingStatus.New.value]): # noqa: W503 # and processing.status not in [ProcessingStatus.New, ProcessingStatus.New.value, # noqa: W503 # ProcessingStatus.Submitting, ProcessingStatus.Submitting.value]): # noqa: W503 return True