Skip to content

Commit

Permalink
Merge pull request #112 from wguanicedew/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
tmaeno authored Oct 19, 2022
2 parents 8edd31b + fd56c48 commit 708511d
Show file tree
Hide file tree
Showing 13 changed files with 250 additions and 125 deletions.
7 changes: 6 additions & 1 deletion atlas/lib/idds/atlas/workflowv2/atlaspandawork.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@
from idds.common import exceptions
from idds.common.constants import (TransformType, CollectionStatus, CollectionType,
ContentType, ProcessingStatus, WorkStatus)
from idds.common.utils import extract_scope_atlas
from idds.common.utils import extract_scope_atlas, setup_logging
from idds.workflowv2.work import Work, Processing
# from idds.workflowv2.workflow import Condition


setup_logging(__name__)


class ATLASPandaWork(Work):
def __init__(self, task_parameters=None,
work_tag='atlas', exec_type='panda', work_id=None,
Expand Down Expand Up @@ -234,6 +237,8 @@ def renew_parameter(self, parameter):
attr_value = getattr(self, attr)
new_parameter = new_parameter.replace(idds_attr, str(attr_value))
parameter = parameter.replace(idds_attr, str(attr_value))
else:
parameter = parameter[pos_start + len_idds:]
else:
parameter = parameter[pos_start + len_idds:]
else:
Expand Down
4 changes: 3 additions & 1 deletion doma/lib/idds/doma/workflowv2/domapandawork.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,8 +512,10 @@ def submit_panda_task(self, processing):
task_param['taskName'] = task_param['taskName'] + "_" + str(new_retries)
if self.has_dependency():
parent_tid = None
if self.parent_workload_id and int(self.parent_workload_id) > time.time() - 604800:
self.logger.info("parent_workload_id: %s" % self.parent_workload_id)
if self.parent_workload_id and int(self.parent_workload_id) < time.time() - 604800:
parent_tid = self.parent_workload_id
parent_tid = None # disable parent_tid for now
return_code = Client.insertTaskParams(task_param, verbose=True, parent_tid=parent_tid)
else:
return_code = Client.insertTaskParams(task_param, verbose=True)
Expand Down
5 changes: 3 additions & 2 deletions main/lib/idds/agents/carrier/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,10 @@ def handle_new_processing(self, processing):
log_prefix=log_prefix)

if not status:
raise exceptions.ProcessSubmitFailed(errors)
raise exceptions.ProcessSubmitFailed(str(errors))

parameters = {'status': ProcessingStatus.Submitting,
'substatus': ProcessingStatus.Submitting,
'locking': ProcessingLocking.Idle,
'processing_metadata': processing['processing_metadata']}
parameters = self.load_poll_period(processing, parameters)
Expand Down Expand Up @@ -119,7 +120,7 @@ def handle_new_processing(self, processing):
if new_poll_period > self.max_new_poll_period:
new_poll_period = self.max_new_poll_period

error = {'submit_err': {'msg': truncate_string('%s' % (ex), length=200)}}
error = {'submit_err': {'msg': truncate_string('%s' % str(ex), length=200)}}
parameters = {'status': pr_status,
'new_poll_period': new_poll_period,
'errors': processing['errors'] if processing['errors'] else {},
Expand Down
24 changes: 13 additions & 11 deletions main/lib/idds/agents/carrier/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,12 +401,13 @@ def handle_new_processing(processing, agent_attributes, logger=None, log_prefix=
new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents = ret_new_contents
new_contents = new_input_contents + new_output_contents + new_log_contents + new_input_dependency_contents

if new_input_contents:
msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', files=new_input_contents, relation_type='input')
ret_msgs = ret_msgs + msgs
if new_output_contents:
msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', files=new_input_contents, relation_type='output')
ret_msgs = ret_msgs + msgs
# not generate new messages
# if new_input_contents:
# msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', files=new_input_contents, relation_type='input')
# ret_msgs = ret_msgs + msgs
# if new_output_contents:
# msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', files=new_input_contents, relation_type='output')
# ret_msgs = ret_msgs + msgs
return True, processing, update_collections, new_contents, ret_msgs, errors


Expand Down Expand Up @@ -1222,14 +1223,14 @@ def sync_work_status(request_id, transform_id, workload_id, work):
for coll in output_collections:
if coll.total_files != coll.processed_files:
is_all_files_processed = False
if coll.processed_files > 0:
if coll.processed_files > 0 or coll.total_files == coll.processed_files:
is_all_files_failed = False

if is_all_collections_closed:
if is_all_files_failed:
work.status = WorkStatus.Failed
elif is_all_files_processed:
if is_all_files_processed:
work.status = WorkStatus.Finished
elif is_all_files_failed:
work.status = WorkStatus.Failed
else:
work.status = WorkStatus.SubFinished

Expand All @@ -1256,7 +1257,8 @@ def sync_processing(processing, agent_attributes, terminate=False, logger=None,
if terminate and work.is_terminated():
messages = generate_messages(request_id, transform_id, workload_id, work, msg_type='work')
if work.is_finished():
processing['status'] = ProcessingStatus.Finished
# processing['status'] = ProcessingStatus.Finished
processing['status'] = processing['substatus']
elif work.is_subfinished():
processing['status'] = ProcessingStatus.SubFinished
elif work.is_failed():
Expand Down
2 changes: 1 addition & 1 deletion main/lib/idds/agents/transformer/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ def handle_update_transform_real(self, transform, event):
work.sync_processing(processing, processing_model)
proc = processing_model['processing_metadata']['processing']
work.sync_work_data(status=processing_model['status'], substatus=processing_model['substatus'],
work=proc.work, output_data=processing_model['output_metadata'])
work=proc.work, output_data=processing_model['output_metadata'], processing=proc)
# processing_metadata = processing_model['processing_metadata']
if processing_model['errors']:
work.set_terminated_msg(processing_model['errors'])
Expand Down
4 changes: 2 additions & 2 deletions main/lib/idds/core/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ def get_requests_by_status_type(status, request_type=None, time_period=None, loc
only_return_id=True, session=session)
if req_ids:
req2s = orm_requests.get_requests_by_status_type(status, request_type, time_period, request_ids=req_ids,
locking=locking, locking_for_update=True, bulk_size=None,
locking=locking, locking_for_update=False, bulk_size=None,
to_json=to_json,
new_poll=new_poll, update_poll=update_poll,
by_substatus=by_substatus, session=session)
Expand All @@ -380,7 +380,7 @@ def get_requests_by_status_type(status, request_type=None, time_period=None, loc
else:
reqs = []
else:
reqs = orm_requests.get_requests_by_status_type(status, request_type, time_period, locking=locking, locking_for_update=locking,
reqs = orm_requests.get_requests_by_status_type(status, request_type, time_period, locking=locking, locking_for_update=False,
bulk_size=bulk_size,
new_poll=new_poll, update_poll=update_poll, only_return_id=only_return_id,
to_json=to_json, by_substatus=by_substatus, session=session)
Expand Down
4 changes: 2 additions & 2 deletions main/lib/idds/core/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def get_transforms_by_status(status, period=None, locking=False, bulk_size=None,
by_substatus=by_substatus, session=session)
if tf_ids:
transform2s = orm_transforms.get_transforms_by_status(status=status, period=period, locking=locking,
bulk_size=None, locking_for_update=True,
bulk_size=None, locking_for_update=False,
to_json=to_json, transform_ids=tf_ids,
new_poll=new_poll, update_poll=update_poll,
by_substatus=by_substatus, session=session)
Expand All @@ -198,7 +198,7 @@ def get_transforms_by_status(status, period=None, locking=False, bulk_size=None,
transforms = []
else:
transforms = orm_transforms.get_transforms_by_status(status=status, period=period, locking=locking,
locking_for_update=locking,
locking_for_update=False,
bulk_size=bulk_size, to_json=to_json,
new_poll=new_poll, update_poll=update_poll,
only_return_id=only_return_id,
Expand Down
14 changes: 7 additions & 7 deletions main/lib/idds/rest/v1/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,20 +243,20 @@ def put(self, request_id, workload_id=None, task_id=None):
reqs = get_requests(request_id=request_id, workload_id=workload_id, with_request=True)

if not reqs:
return self.generate_http_response(HTTP_STATUS_CODE.OK, data={'status': -1, 'message': 'No match requests'})
return self.generate_http_response(HTTP_STATUS_CODE.OK, data=(-1, {'status': -1, 'message': 'No match requests'}))
matched_transform_id = None
if task_id:
for req in reqs:
if str(req['processing_workload_id']) == str(task_id):
matched_transform_id = req['transform_id']
if matched_transform_id:
return self.generate_http_response(HTTP_STATUS_CODE.OK, data={'status': -1, 'message': 'No match tasks'})
return self.generate_http_response(HTTP_STATUS_CODE.OK, data=(-1, {'status': -1, 'message': 'No match tasks'}))

for req in reqs:
if req['username'] and req['username'] != username and not authenticate_is_super_user(username):
msg = "User %s has no permission to update request %s" % (username, req['request_id'])
# raise exceptions.AuthenticationNoPermission(msg)
return self.generate_http_response(HTTP_STATUS_CODE.OK, data={'status': -1, 'message': msg})
return self.generate_http_response(HTTP_STATUS_CODE.OK, data=(-1, {'status': -1, 'message': msg}))
except exceptions.AuthenticationNoPermission as error:
return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=error.__class__.__name__, exc_msg=error)
except Exception as error:
Expand All @@ -283,7 +283,7 @@ def put(self, request_id, workload_id=None, task_id=None):
print(format_exc())
return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=exceptions.CoreException.__name__, exc_msg=error)

return self.generate_http_response(HTTP_STATUS_CODE.OK, data={'status': 0, 'message': 'Command registered successfully'})
return self.generate_http_response(HTTP_STATUS_CODE.OK, data=(0, {'status': 0, 'message': 'Command registered successfully'}))


class RequestRetry(IDDSController):
Expand All @@ -308,13 +308,13 @@ def put(self, request_id, workload_id=None):
username = self.get_username()
reqs = get_requests(request_id=request_id, workload_id=workload_id, with_request=True)
if not reqs:
return self.generate_http_response(HTTP_STATUS_CODE.OK, data={'status': -1, 'message': 'No match requests'})
return self.generate_http_response(HTTP_STATUS_CODE.OK, data=(-1, {'status': -1, 'message': 'No match requests'}))

for req in reqs:
if req['username'] and req['username'] != username and not authenticate_is_super_user(username):
msg = "User %s has no permission to update request %s" % (username, req['request_id'])
# raise exceptions.AuthenticationNoPermission(msg)
return self.generate_http_response(HTTP_STATUS_CODE.OK, data={'status': -1, 'message': msg})
return self.generate_http_response(HTTP_STATUS_CODE.OK, data=(-1, {'status': -1, 'message': msg}))
except exceptions.AuthenticationNoPermission as error:
return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=error.__class__.__name__, exc_msg=error)
except Exception as error:
Expand All @@ -336,7 +336,7 @@ def put(self, request_id, workload_id=None):
print(format_exc())
return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=exceptions.CoreException.__name__, exc_msg=error)

return self.generate_http_response(HTTP_STATUS_CODE.OK, data={'status': 0, 'message': 'Command registered successfully'})
return self.generate_http_response(HTTP_STATUS_CODE.OK, data=(0, {'status': 0, 'message': 'Command registered successfully'}))


"""----------------------
Expand Down
20 changes: 11 additions & 9 deletions main/lib/idds/tests/core_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,31 +112,33 @@ def show_works(req):
# reqs = get_requests(request_id=965, with_request=True, with_detail=False, with_metadata=True)
# reqs = get_requests(request_id=350695, with_request=True, with_detail=False, with_metadata=True)

"""
reqs = get_requests(request_id=350723, with_request=True, with_detail=False, with_metadata=True)
# reqs = get_requests(request_id=370028, with_request=True, with_detail=False, with_metadata=True)
# reqs = get_requests(request_id=370400, with_request=True, with_detail=False, with_metadata=True)
# reqs = get_requests(request_id=371204, with_request=True, with_detail=False, with_metadata=True)
reqs = get_requests(request_id=372678, with_request=True, with_detail=False, with_metadata=True)
for req in reqs:
# print(req['request_id'])
print(req)
# print(rets)
# print(json_dumps(req, sort_keys=True, indent=4))
print(json_dumps(req, sort_keys=True, indent=4))
# show_works(req)
pass
workflow = req['request_metadata']['workflow']
# workflow.get_new_works()
print(workflow.runs.keys())
# print(workflow.runs["1"])
# print(json_dumps(workflow.runs["1"], sort_keys=True, indent=4))

print(workflow.runs["1"].works.keys())
# print(workflow.runs["1"].works["7bdcf871"])
# print(json_dumps(workflow.runs["1"].works["7bdcf871"], indent=4))
print(workflow.runs["1"].works["7bdcf871"].runs.keys())
print(json_dumps(workflow.runs["1"].works["7bdcf871"].runs["2"], indent=4))
print(workflow.runs["1"].works["048a1811"])
# print(json_dumps(workflow.runs["1"].works["048a1811"], indent=4))
print(workflow.runs["1"].works["048a1811"].runs.keys())
print(json_dumps(workflow.runs["1"].works["048a1811"].runs["2"], indent=4))
if hasattr(workflow, 'get_relation_map'):
# print(json_dumps(workflow.get_relation_map(), sort_keys=True, indent=4))
pass

sys.exit(0)
"""

"""
# reqs = get_requests()
Expand All @@ -153,7 +155,7 @@ def show_works(req):
"""


tfs = get_transforms(request_id=350723)
tfs = get_transforms(request_id=370028)
# tfs = get_transforms(transform_id=350723)
for tf in tfs:
# print(tf)
Expand Down
4 changes: 3 additions & 1 deletion main/lib/idds/tests/test_migrate_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ def migrate():
doma_google_host = 'https://34.133.138.229:443/idds' # noqa F841

cm1 = ClientManager(host=atlas_host)
cm1 = ClientManager(host=doma_host)
# cm1 = ClientManager(host=doma_host)
# reqs = cm1.get_requests(request_id=290)
# old_request_id = 298163
# old_request_id = 350723
old_request_id = 359383
# old_request_id = 349
old_request_id = 2400
old_request_id = 371204

# for old_request_id in [152]:
# for old_request_id in [60]: # noqa E115
Expand All @@ -52,6 +53,7 @@ def migrate():

cm2 = ClientManager(host=dev_host)
cm2 = ClientManager(host=doma_host)
cm2 = ClientManager(host=atlas_host)
# print(reqs)

print("num requests: %s" % len(reqs))
Expand Down
22 changes: 20 additions & 2 deletions workflow/lib/idds/workflow/work.py
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,14 @@ def transforming(self):
def transforming(self, value):
self.add_metadata_item('transforming', value)

@property
def submitted(self):
return self.get_metadata_item('submitted', False)

@submitted.setter
def submitted(self, value):
self.add_metadata_item('submitted', value)

@property
def workdir(self):
return self.get_metadata_item('workdir', None)
Expand Down Expand Up @@ -1357,7 +1365,7 @@ def get_backup_to_release_inputs(self):
return to_release_inputs

def is_started(self):
return self.started
return self.started or self.submitted

def is_running(self):
if self.status in [WorkStatus.Running, WorkStatus.Transforming]:
Expand Down Expand Up @@ -2156,7 +2164,7 @@ def syn_work_status(self, input_output_maps, all_updates_flushed=True, output_st
self.started = True
self.logger.debug("syn_work_status(%s): work.status: %s" % (str(self.get_processing_ids()), str(self.status)))

def sync_work_data(self, status, substatus, work, workload_id=None, output_data=None):
def sync_work_data(self, status, substatus, work, workload_id=None, output_data=None, processing=None):
# self.status = work.status
work.work_id = self.work_id
work.transforming = self.transforming
Expand All @@ -2178,6 +2186,16 @@ def sync_work_data(self, status, substatus, work, workload_id=None, output_data=
self.substatus = get_work_status_from_transform_processing_status(substatus)
if workload_id:
self.workload_id = workload_id
if processing is not None:
# called by transformer to sync from processing
if processing.submitted_at:
self.submitted = True
else:
# called by clerk to syn from transform
self.submitted = work.submitted

if self.submitted:
self.started = True

"""
self.status = WorkStatus(status.value)
Expand Down
Loading

0 comments on commit 708511d

Please sign in to comment.