Skip to content

Commit

Permalink
Merge pull request #71 from HSF/dev
Browse files Browse the repository at this point in the history
new version 0.10.5
  • Loading branch information
wguanicedew authored Mar 28, 2022
2 parents bd6dd2c + b7ebef5 commit 83ec1c4
Show file tree
Hide file tree
Showing 36 changed files with 314 additions and 132 deletions.
2 changes: 1 addition & 1 deletion atlas/lib/idds/atlas/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
# - Wen Guan, <[email protected]>, 2019 - 2021


release_version = "0.10.3"
release_version = "0.10.5"
14 changes: 7 additions & 7 deletions atlas/lib/idds/atlas/workflow/atlaspandawork.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ def create_processing(self, input_output_maps=[]):

def submit_panda_task(self, processing):
try:
from pandatools import Client
from pandaclient import Client

proc = processing['processing_metadata']['processing']
task_param = proc.processing_metadata['task_param']
Expand Down Expand Up @@ -436,7 +436,7 @@ def submit_processing(self, processing):

def poll_panda_task_status(self, processing):
if 'processing' in processing['processing_metadata']:
from pandatools import Client
from pandaclient import Client

proc = processing['processing_metadata']['processing']
status, task_status = Client.getTaskStatus(proc.workload_id)
Expand Down Expand Up @@ -468,7 +468,7 @@ def get_processing_status_from_panda_status(self, task_status):
return processing_status

def get_panda_task_id(self, processing):
from pandatools import Client
from pandaclient import Client

start_time = datetime.datetime.utcnow() - datetime.timedelta(hours=10)
start_time = start_time.strftime('%Y-%m-%d %H:%M:%S')
Expand All @@ -494,7 +494,7 @@ def get_panda_task_id(self, processing):
def poll_panda_task(self, processing=None, input_output_maps=None):
task_id = None
try:
from pandatools import Client
from pandaclient import Client

if processing:
proc = processing['processing_metadata']['processing']
Expand Down Expand Up @@ -534,7 +534,7 @@ def poll_panda_task(self, processing=None, input_output_maps=None):
def kill_processing(self, processing):
try:
if processing:
from pandatools import Client
from pandaclient import Client
proc = processing['processing_metadata']['processing']
task_id = proc.workload_id
# task_id = processing['processing_metadata']['task_id']
Expand All @@ -547,7 +547,7 @@ def kill_processing(self, processing):
def kill_processing_force(self, processing):
try:
if processing:
from pandatools import Client
from pandaclient import Client
proc = processing['processing_metadata']['processing']
task_id = proc.workload_id
# task_id = processing['processing_metadata']['task_id']
Expand All @@ -560,7 +560,7 @@ def kill_processing_force(self, processing):
def reactivate_processing(self, processing):
try:
if processing:
from pandatools import Client
from pandaclient import Client
# task_id = processing['processing_metadata']['task_id']
proc = processing['processing_metadata']['processing']
task_id = proc.workload_id
Expand Down
2 changes: 1 addition & 1 deletion atlas/lib/idds/atlas/workflowv2/atlasdagwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ def create_processing(self, input_output_maps):

def submit_panda_task(self, processing):
try:
from pandatools import Client
from pandaclient import Client

task_param = processing['processing_metadata']['task_param']
return_code = Client.insertTaskParams(task_param, verbose=True)
Expand Down
2 changes: 1 addition & 1 deletion atlas/lib/idds/atlas/workflowv2/atlaslocalpandawork.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ def get_rucio_download_client(self):
def poll_panda_task_output(self, processing=None, input_output_maps=None):
task_id = None
try:
from pandatools import Client
from pandaclient import Client

if processing:
output_metadata = {}
Expand Down
28 changes: 20 additions & 8 deletions atlas/lib/idds/atlas/workflowv2/atlaspandawork.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,17 @@ def parse_task_parameters(self, task_parameters):
log_col = {'scope': scope, 'name': name}
self.add_log_collections(log_col)

if not self.get_primary_output_collection():
all_colls = self.get_collections()
if all_colls:
one_coll = all_colls[0]
output_coll_scope = one_coll.scope
else:
output_coll_scope = 'pseudo.scope'
name = 'pseudo_output.' + datetime.datetime.utcnow().strftime("%Y_%m_%d_%H_%M_%S_%f") + str(random.randint(1, 1000))
output_coll = {'scope': output_coll_scope, 'name': name, 'type': CollectionType.PseudoDataset}
self.set_primary_output_collection(output_coll)

if not self.get_primary_input_collection():
output_colls = self.get_output_collections()
output_coll = output_colls[0]
Expand Down Expand Up @@ -454,7 +465,7 @@ def create_processing(self, input_output_maps=[]):

def submit_panda_task(self, processing):
try:
from pandatools import Client
from pandaclient import Client

proc = processing['processing_metadata']['processing']
task_param = proc.processing_metadata['task_param']
Expand Down Expand Up @@ -498,7 +509,7 @@ def submit_processing(self, processing):

def poll_panda_task_status(self, processing):
if 'processing' in processing['processing_metadata']:
from pandatools import Client
from pandaclient import Client

proc = processing['processing_metadata']['processing']
status, task_status = Client.getTaskStatus(proc.workload_id)
Expand Down Expand Up @@ -530,7 +541,7 @@ def get_processing_status_from_panda_status(self, task_status):
return processing_status

def get_panda_task_id(self, processing):
from pandatools import Client
from pandaclient import Client

start_time = datetime.datetime.utcnow() - datetime.timedelta(hours=10)
start_time = start_time.strftime('%Y-%m-%d %H:%M:%S')
Expand All @@ -556,7 +567,7 @@ def get_panda_task_id(self, processing):
def poll_panda_task(self, processing=None, input_output_maps=None):
task_id = None
try:
from pandatools import Client
from pandaclient import Client

if processing:
proc = processing['processing_metadata']['processing']
Expand Down Expand Up @@ -596,7 +607,7 @@ def poll_panda_task(self, processing=None, input_output_maps=None):
def kill_processing(self, processing):
try:
if processing:
from pandatools import Client
from pandaclient import Client
proc = processing['processing_metadata']['processing']
task_id = proc.workload_id
# task_id = processing['processing_metadata']['task_id']
Expand All @@ -609,7 +620,7 @@ def kill_processing(self, processing):
def kill_processing_force(self, processing):
try:
if processing:
from pandatools import Client
from pandaclient import Client
proc = processing['processing_metadata']['processing']
task_id = proc.workload_id
# task_id = processing['processing_metadata']['task_id']
Expand All @@ -622,7 +633,7 @@ def kill_processing_force(self, processing):
def reactivate_processing(self, processing):
try:
if processing:
from pandatools import Client
from pandaclient import Client
# task_id = processing['processing_metadata']['task_id']
proc = processing['processing_metadata']['processing']
task_id = proc.workload_id
Expand Down Expand Up @@ -774,7 +785,8 @@ def syn_work_status(self, registered_input_output_maps, all_updates_flushed=True
self.logger.debug("syn_work_status(%s): has_to_release_inputs: %s" % (str(self.get_processing_ids()), str(self.has_to_release_inputs())))
self.logger.debug("syn_work_status(%s): to_release_input_contents: %s" % (str(self.get_processing_ids()), str(to_release_input_contents)))

if self.is_processings_terminated() and self.is_input_collections_closed() and not self.has_new_inputs and not self.has_to_release_inputs() and not to_release_input_contents:
# if self.is_processings_terminated() and self.is_input_collections_closed() and not self.has_new_inputs and not self.has_to_release_inputs() and not to_release_input_contents:
if self.is_processings_terminated():
# if not self.is_all_outputs_flushed(registered_input_output_maps):
if not all_updates_flushed:
self.logger.warn("The work processings %s is terminated. but not all outputs are flushed. Wait to flush the outputs then finish the transform" % str(self.get_processing_ids()))
Expand Down
5 changes: 3 additions & 2 deletions atlas/tools/env/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ dependencies:
- flake8 # Wrapper around PyFlakes&pep8
- pytest # python testing tool
- nose # nose test tools
- stomp.py
- rucio-clients
- rucio-clients-atlas
- idds-common==0.10.3
- idds-workflow==0.10.3
- idds-common==0.10.5
- idds-workflow==0.10.5
2 changes: 1 addition & 1 deletion client/lib/idds/client/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
# - Wen Guan, <[email protected]>, 2019 - 2021


release_version = "0.10.3"
release_version = "0.10.5"
5 changes: 3 additions & 2 deletions client/tools/env/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ dependencies:
- pytest # python testing tool
- nose # nose test tools
- tabulate
- idds-common==0.10.3
- idds-workflow==0.10.3
- argcomplete
- idds-common==0.10.5
- idds-workflow==0.10.5
69 changes: 39 additions & 30 deletions common/lib/idds/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def config_has_section(section):
.
:returns: True/False.
"""
__CONFIG = get_config()
return __CONFIG.has_section(section)


Expand All @@ -47,6 +48,7 @@ def config_has_option(section, option):
.
:returns: True/False.
"""
__CONFIG = get_config()
return __CONFIG.has_option(section, option)


Expand All @@ -58,6 +60,7 @@ def config_list_options(section):
.
:returns: list of (name, value).
"""
__CONFIG = get_config()
return __CONFIG.items(section)


Expand All @@ -69,6 +72,7 @@ def config_get(section, option):
.
:returns: the configuration value.
"""
__CONFIG = get_config()
return __CONFIG.get(section, option)


Expand All @@ -80,6 +84,7 @@ def config_get_int(section, option):
.
:returns: the integer configuration value.
"""
__CONFIG = get_config()
return __CONFIG.getint(section, option)


Expand All @@ -91,6 +96,7 @@ def config_get_float(section, option):
.
:returns: the float configuration value.
"""
__CONFIG = get_config()
return __CONFIG.getfloat(section, option)


Expand All @@ -102,6 +108,7 @@ def config_get_bool(section, option):
.
:returns: the boolean configuration value.
"""
__CONFIG = get_config()
return __CONFIG.getboolean(section, option)


Expand Down Expand Up @@ -153,36 +160,38 @@ def get_local_config_value(configuration, section, name, current, default):
return value


__CONFIG = ConfigParser.SafeConfigParser()
def get_config():
__CONFIG = ConfigParser.SafeConfigParser()

__HAS_CONFIG = False
if os.environ.get('IDDS_CONFIG', None):
configfile = os.environ['IDDS_CONFIG']
if not __CONFIG.read(configfile) == [configfile]:
raise Exception('IDDS_CONFIG is defined as %s, ' % configfile,
'but could not load configurations from it.')
__HAS_CONFIG = True
else:
configfiles = ['%s/etc/idds/idds.cfg' % os.environ.get('IDDS_HOME', ''),
'/etc/idds/idds.cfg',
'%s/etc/idds/idds.cfg' % os.environ.get('VIRTUAL_ENV', '')]

for configfile in configfiles:
if __CONFIG.read(configfile) == [configfile]:
__HAS_CONFIG = True
# print("Configuration file %s is used" % configfile)
break

if not __HAS_CONFIG:
local_cfg = get_local_cfg_file()
if os.path.exists(local_cfg):
__CONFIG.read(local_cfg)
__HAS_CONFIG = False
if os.environ.get('IDDS_CONFIG', None):
configfile = os.environ['IDDS_CONFIG']
if not __CONFIG.read(configfile) == [configfile]:
raise Exception('IDDS_CONFIG is defined as %s, ' % configfile,
'but could not load configurations from it.')
__HAS_CONFIG = True
else:
raise Exception("Could not load configuration file."
"For iDDS client, please run 'idds setup' to create local config file."
"For an iDDS server, IDDS looks for a configuration file, in order:"
"\n\t${IDDS_CONFIG}"
"\n\t${IDDS_HOME}/etc/idds/idds.cfg"
"\n\t/etc/idds/idds.cfg"
"\n\t${VIRTUAL_ENV}/etc/idds/idds.cfg")
configfiles = ['%s/etc/idds/idds.cfg' % os.environ.get('IDDS_HOME', ''),
'/etc/idds/idds.cfg',
'%s/etc/idds/idds.cfg' % os.environ.get('VIRTUAL_ENV', '')]

for configfile in configfiles:
if __CONFIG.read(configfile) == [configfile]:
__HAS_CONFIG = True
# print("Configuration file %s is used" % configfile)
break

if not __HAS_CONFIG:
local_cfg = get_local_cfg_file()
if os.path.exists(local_cfg):
__CONFIG.read(local_cfg)
__HAS_CONFIG = True
else:
raise Exception("Could not load configuration file."
"For iDDS client, please run 'idds setup' to create local config file."
"For an iDDS server, IDDS looks for a configuration file, in order:"
"\n\t${IDDS_CONFIG}"
"\n\t${IDDS_HOME}/etc/idds/idds.cfg"
"\n\t/etc/idds/idds.cfg"
"\n\t${VIRTUAL_ENV}/etc/idds/idds.cfg")
return __CONFIG
3 changes: 2 additions & 1 deletion common/lib/idds/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# http://www.apache.org/licenses/LICENSE-2.0OA
#
# Authors:
# - Wen Guan, <[email protected]>, 2019 - 2020
# - Wen Guan, <[email protected]>, 2019 - 2022

"""
Constants.
Expand Down Expand Up @@ -314,6 +314,7 @@ class ProcessingStatus(IDDSEnum):
TimeOut = 23
ToFinish = 24
ToForceFinish = 25
Broken = 26


class ProcessingLocking(IDDSEnum):
Expand Down
10 changes: 10 additions & 0 deletions common/lib/idds/common/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,3 +252,13 @@ def __init__(self, *args, **kwargs):
super(AuthenticationPending, self).__init__(*args, **kwargs)
self._message = "Authentication pending."
self.error_code = 601


class AuthenticationNotSupported(IDDSException):
"""
Authentication not supported
"""
def __init__(self, *args, **kwargs):
super(AuthenticationNotSupported, self).__init__(*args, **kwargs)
self._message = "Authentication not supported."
self.error_code = 602
2 changes: 1 addition & 1 deletion common/lib/idds/common/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
# - Wen Guan, <[email protected]>, 2019 - 2021


release_version = "0.10.3"
release_version = "0.10.5"
2 changes: 1 addition & 1 deletion doma/lib/idds/doma/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
# - Wen Guan, <[email protected]>, 2020 - 2021


release_version = "0.10.3"
release_version = "0.10.5"
Loading

0 comments on commit 83ec1c4

Please sign in to comment.