From 2f2ab330a2e6ddb5c5c39895af49b2d7263e3982 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 13 Feb 2023 18:24:18 +0100 Subject: [PATCH 1/6] fix to trigger other transforms --- main/lib/idds/agents/carrier/utils.py | 3 ++- main/lib/idds/core/catalog.py | 4 ++-- main/lib/idds/orm/contents.py | 2 +- main/lib/idds/tests/test_domapanda.py | 7 ++++++- 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/main/lib/idds/agents/carrier/utils.py b/main/lib/idds/agents/carrier/utils.py index 975c852f..6a1468f3 100644 --- a/main/lib/idds/agents/carrier/utils.py +++ b/main/lib/idds/agents/carrier/utils.py @@ -1025,7 +1025,8 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= # return processing['substatus'], content_updates, ret_msgs, {}, {}, {}, new_update_contents, ret_update_transforms # return processing['substatus'], content_updates, ret_msgs, {}, update_dep_contents_status_name, update_dep_contents_status, [], ret_update_transforms - return processing['substatus'], content_updates, ret_msgs, {}, {}, {}, [], ret_update_transforms + # return processing['substatus'], content_updates, ret_msgs, {}, {}, {}, [], ret_update_transforms + return processing['substatus'], content_updates, ret_msgs, {}, {}, {}, new_update_contents, ret_update_transforms def get_content_status_from_panda_msg_status(status): diff --git a/main/lib/idds/core/catalog.py b/main/lib/idds/core/catalog.py index b96a2717..b468432b 100644 --- a/main/lib/idds/core/catalog.py +++ b/main/lib/idds/core/catalog.py @@ -622,7 +622,7 @@ def get_output_contents_by_request_id_status(request_id, name, content_status, l @read_session -def get_updated_transforms_by_content_status(request_id=None, transform_id=None): +def get_updated_transforms_by_content_status(request_id=None, transform_id=None, session=None): """ Get updated transform ids by content status @@ -631,7 +631,7 @@ def get_updated_transforms_by_content_status(request_id=None, transform_id=None) :returns list """ - return orm_contents.get_updated_transforms_by_content_status(request_id=request_id, transform_id=transform_id) + return orm_contents.get_updated_transforms_by_content_status(request_id=request_id, transform_id=transform_id, session=session) @transactional_session diff --git a/main/lib/idds/orm/contents.py b/main/lib/idds/orm/contents.py index 731c53b2..93f8bdb7 100644 --- a/main/lib/idds/orm/contents.py +++ b/main/lib/idds/orm/contents.py @@ -543,7 +543,7 @@ def get_updated_transforms_by_content_status(request_id=None, transform_id=None, query = query.with_hint(models.Content, "INDEX(CONTENTS CONTENTS_ID_NAME_IDX)", 'oracle') if request_id: - query = query.filter(models.Content.request_id_id == request_id) + query = query.filter(models.Content.request_id == request_id) if transform_id: query = query.filter(models.Content.transform_id == transform_id) diff --git a/main/lib/idds/tests/test_domapanda.py b/main/lib/idds/tests/test_domapanda.py index 44b62503..7b00702e 100644 --- a/main/lib/idds/tests/test_domapanda.py +++ b/main/lib/idds/tests/test_domapanda.py @@ -44,7 +44,12 @@ # task_queue = 'SLAC_TEST' # task_queue = 'DOMA_LSST_SLAC_TEST' task_queue = 'SLAC_Rubin' -# task_queue = 'CC-IN2P3_TEST' + +task_cloud = 'EU' +task_queue = 'CC-IN2P3_TEST' + +task_cloud = 'EU' +task_queue = 'LANCS_TEST' def randStr(chars=string.ascii_lowercase + string.digits, N=10): From 22a650fe661b5c6c118448692904c6064b26afdf Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 13 Feb 2023 21:56:01 +0100 Subject: [PATCH 2/6] fix to trigger other transforms --- main/etc/sql/oracle_update.sql | 4 ++-- main/lib/idds/agents/carrier/trigger.py | 16 +++++++++++----- main/lib/idds/orm/contents.py | 2 +- main/lib/idds/tests/core_tests_dep_id.py | 10 +++++++--- monitor/data/conf.js | 12 ++++++------ 5 files changed, 27 insertions(+), 17 deletions(-) diff --git a/main/etc/sql/oracle_update.sql b/main/etc/sql/oracle_update.sql index 6a0594c1..b4dced42 100644 --- a/main/etc/sql/oracle_update.sql +++ b/main/etc/sql/oracle_update.sql @@ -274,7 +274,7 @@ END; CREATE OR REPLACE procedure update_contents_from_others(request_id_in NUMBER, transform_id_in NUMBER) AS BEGIN update (select c.content_id, c.substatus as c_substatus, t.substatus as t_substatus from contents c inner join - (select content_id, substatus from contents where request_id = request_id_in and content_relation_type = 1 and status != 0) t + (select content_id, substatus from contents where request_id = request_id_in and content_relation_type = 1) t on c.content_dep_id = t.content_id where c.request_id = request_id_in and c.transform_id = transform_id_in and c.substatus != t.substatus) set c_substatus = t_substatus; END; @@ -282,6 +282,6 @@ END; CREATE OR REPLACE procedure update_contents_to_others(request_id_in NUMBER, transform_id_in NUMBER) AS BEGIN update (select c.content_id, c.substatus as c_substatus, t.substatus as t_substatus from contents c inner join - (select content_id, substatus from contents where request_id = request_id_in and transform_id = transform_id_in and content_relation_type = 1 and status != substatus) t + (select content_id, substatus from contents where request_id = request_id_in and transform_id = transform_id_in and content_relation_type = 1) t on c.content_dep_id = t.content_id where c.request_id = request_id_in and c.substatus != t.substatus) set c_substatus = t_substatus; END; diff --git a/main/lib/idds/agents/carrier/trigger.py b/main/lib/idds/agents/carrier/trigger.py index 47c50983..92752f02 100644 --- a/main/lib/idds/agents/carrier/trigger.py +++ b/main/lib/idds/agents/carrier/trigger.py @@ -181,11 +181,17 @@ def process_trigger_processing(self, event): for update_transform in update_transforms: if 'transform_id' in update_transform: update_transform_id = update_transform['transform_id'] - event = UpdateTransformEvent(publisher_id=self.id, - transform_id=update_transform_id, - content={'event': 'Trigger'}) - self.logger.info(log_pre + "Trigger UpdateTransformEvent(transform_id: %s" % update_transform_id) - self.event_bus.send(event) + if update_transform_id != pr['transform_id']: + event = UpdateTransformEvent(publisher_id=self.id, + transform_id=update_transform_id, + content={'event': 'Trigger'}) + self.logger.info(log_pre + "Trigger UpdateTransformEvent(transform_id: %s)" % update_transform_id) + self.event_bus.send(event) + else: + ret1 = self.handle_trigger_processing(pr) + new_update_contents = ret1.get('new_update_contents', None) + ret1['new_update_contents'] = None + self.update_processing(ret1, pr) if (('processing_status' in ret and ret['processing_status'] == ProcessingStatus.Terminating) or (event._content and 'Terminated' in event._content and event._content['Terminated'])): # noqa W503 diff --git a/main/lib/idds/orm/contents.py b/main/lib/idds/orm/contents.py index 93f8bdb7..47f53551 100644 --- a/main/lib/idds/orm/contents.py +++ b/main/lib/idds/orm/contents.py @@ -505,7 +505,7 @@ def update_contents_to_others_by_dep_id(request_id=None, transform_id=None, sess :param transfomr_id: The transform id. """ try: - idds_proc = sqlalchemy.text("CALL %s.update_contents_from_others(:request_id, :transform_id)" % session.schema) + idds_proc = sqlalchemy.text("CALL %s.update_contents_to_others(:request_id, :transform_id)" % session.schema) session.execute(idds_proc, {"request_id": request_id, "transform_id": transform_id}) except Exception as ex: raise ex diff --git a/main/lib/idds/tests/core_tests_dep_id.py b/main/lib/idds/tests/core_tests_dep_id.py index c0a8dc45..9eefe916 100644 --- a/main/lib/idds/tests/core_tests_dep_id.py +++ b/main/lib/idds/tests/core_tests_dep_id.py @@ -13,12 +13,16 @@ from idds.workflowv2.work import Work # noqa F401 from idds.orm import contents as orm_contents # noqa F401 +from idds.core import catalog as core_catalog # noqa F401 setup_logging(__name__) -request_id = 486 -transform_id = 3027 # 3028, 3029 +request_id = 3347 +transform_id = 26788 # 3028, 3029 -ret = orm_contents.update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id) +# ret = core_catalog.update_contents_to_others_by_dep_id(request_id=request_id, transform_id=transform_id) +# print(ret) + +ret = core_catalog.update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id) print(ret) diff --git a/monitor/data/conf.js b/monitor/data/conf.js index 69e7d923..44ff3c83 100644 --- a/monitor/data/conf.js +++ b/monitor/data/conf.js @@ -1,9 +1,9 @@ var appConfig = { - 'iddsAPI_request': "https://lxplus8s15.cern.ch:443/idds/monitor_request/null/null", - 'iddsAPI_transform': "https://lxplus8s15.cern.ch:443/idds/monitor_transform/null/null", - 'iddsAPI_processing': "https://lxplus8s15.cern.ch:443/idds/monitor_processing/null/null", - 'iddsAPI_request_detail': "https://lxplus8s15.cern.ch:443/idds/monitor/null/null/true/false/false", - 'iddsAPI_transform_detail': "https://lxplus8s15.cern.ch:443/idds/monitor/null/null/false/true/false", - 'iddsAPI_processing_detail': "https://lxplus8s15.cern.ch:443/idds/monitor/null/null/false/false/true" + 'iddsAPI_request': "https://lxplus8s19.cern.ch:443/idds/monitor_request/null/null", + 'iddsAPI_transform': "https://lxplus8s19.cern.ch:443/idds/monitor_transform/null/null", + 'iddsAPI_processing': "https://lxplus8s19.cern.ch:443/idds/monitor_processing/null/null", + 'iddsAPI_request_detail': "https://lxplus8s19.cern.ch:443/idds/monitor/null/null/true/false/false", + 'iddsAPI_transform_detail': "https://lxplus8s19.cern.ch:443/idds/monitor/null/null/false/true/false", + 'iddsAPI_processing_detail': "https://lxplus8s19.cern.ch:443/idds/monitor/null/null/false/false/true" } From b8ae964b311cd4e6506f02d1f4f00ee29f841b59 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Wed, 15 Feb 2023 10:39:00 +0100 Subject: [PATCH 3/6] fix idds trigger which was slow --- main/etc/sql/oracle_update.sql | 6 ++++++ main/lib/idds/agents/carrier/poller.py | 1 + main/lib/idds/agents/carrier/trigger.py | 14 +++++++++++--- main/lib/idds/agents/carrier/utils.py | 7 +++++++ main/lib/idds/orm/base/models.py | 2 +- main/lib/idds/orm/contents.py | 2 +- main/lib/idds/tests/core_tests_dep_id.py | 5 ++++- monitor/data/conf.js | 12 ++++++------ 8 files changed, 37 insertions(+), 12 deletions(-) diff --git a/main/etc/sql/oracle_update.sql b/main/etc/sql/oracle_update.sql index b4dced42..e0ab29d9 100644 --- a/main/etc/sql/oracle_update.sql +++ b/main/etc/sql/oracle_update.sql @@ -285,3 +285,9 @@ BEGIN (select content_id, substatus from contents where request_id = request_id_in and transform_id = transform_id_in and content_relation_type = 1) t on c.content_dep_id = t.content_id where c.request_id = request_id_in and c.substatus != t.substatus) set c_substatus = t_substatus; END; + + + +-- 2023.02.14 +drop index CONTENTS_REQ_TF_COLL_IDX +CREATE INDEX CONTENTS_REQ_TF_COLL_IDX ON CONTENTS (request_id, transform_id, workload_id, coll_id, status, substatus) LOCAL; diff --git a/main/lib/idds/agents/carrier/poller.py b/main/lib/idds/agents/carrier/poller.py index 5eede04f..7661ee47 100644 --- a/main/lib/idds/agents/carrier/poller.py +++ b/main/lib/idds/agents/carrier/poller.py @@ -372,6 +372,7 @@ def process_update_processing(self, event): event_content['has_updates'] = True if is_process_terminated(pr['substatus']): event_content['Terminated'] = True + self.logger.info(log_pre + "TriggerProcessingEvent(processing_id: %s)" % pr['processing_id']) event = TriggerProcessingEvent(publisher_id=self.id, processing_id=pr['processing_id'], content=event_content, counter=original_event._counter) self.event_bus.send(event) diff --git a/main/lib/idds/agents/carrier/trigger.py b/main/lib/idds/agents/carrier/trigger.py index 92752f02..7baa3379 100644 --- a/main/lib/idds/agents/carrier/trigger.py +++ b/main/lib/idds/agents/carrier/trigger.py @@ -44,9 +44,10 @@ def get_trigger_processings(self): Get trigger processing """ try: + self.show_queue_size() if not self.is_ok_to_run_more_processings(): return [] - self.show_queue_size() + # self.show_queue_size() processing_status = [ProcessingStatus.ToTrigger, ProcessingStatus.Triggering] processings = core_processings.get_processings_by_status(status=processing_status, @@ -171,10 +172,14 @@ def process_trigger_processing(self, event): new_update_contents = ret.get('new_update_contents', None) ret['new_update_contents'] = None + ret_update_contents = ret.get('update_contents', None) self.update_processing(ret, pr) - if new_update_contents: + if new_update_contents or ret_update_contents: + self.logger.info(log_pre + "update_contents_to_others_by_dep_id") core_catalog.update_contents_to_others_by_dep_id(request_id=pr['request_id'], transform_id=pr['transform_id']) + self.logger.info(log_pre + "update_contents_to_others_by_dep_id done") + # core_catalog.delete_contents_update(request_id=pr['request_id'], transform_id=pr['transform_id']) update_transforms = core_catalog.get_updated_transforms_by_content_status(request_id=pr['request_id']) self.logger.info(log_pre + "update_transforms: %s" % str(update_transforms)) @@ -192,11 +197,14 @@ def process_trigger_processing(self, event): new_update_contents = ret1.get('new_update_contents', None) ret1['new_update_contents'] = None self.update_processing(ret1, pr) + # pass if (('processing_status' in ret and ret['processing_status'] == ProcessingStatus.Terminating) or (event._content and 'Terminated' in event._content and event._content['Terminated'])): # noqa W503 self.logger.info(log_pre + "TerminatedProcessingEvent(processing_id: %s)" % pr['processing_id']) - event = TerminatedProcessingEvent(publisher_id=self.id, processing_id=pr['processing_id'], content=event._content, + event = TerminatedProcessingEvent(publisher_id=self.id, + processing_id=pr['processing_id'], + content=event._content, counter=original_event._counter) self.event_bus.send(event) else: diff --git a/main/lib/idds/agents/carrier/utils.py b/main/lib/idds/agents/carrier/utils.py index 6a1468f3..8d898f8b 100644 --- a/main/lib/idds/agents/carrier/utils.py +++ b/main/lib/idds/agents/carrier/utils.py @@ -993,7 +993,10 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= # logger.debug(log_prefix + "delete_contents_update: %s" % str(ret_update_transforms)) pass + logger.debug(log_prefix + "update_contents_from_others_by_dep_id") core_catalog.update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id) + logger.debug(log_prefix + "update_contents_from_others_by_dep_id done") + input_output_maps = get_input_output_maps(transform_id, work) logger.debug(log_prefix + "input_output_maps.keys[:2]: %s" % str(list(input_output_maps.keys())[:2])) @@ -1176,6 +1179,7 @@ def handle_messages_processing(messages, logger=None, log_prefix=''): continue logger.debug(log_prefix + "Received message: %s" % str(ori_msg)) + logger.debug(log_prefix + "(request_id, transform_id, processing_id): %s" % str(ret_req_tf_pr_id)) if msg['msg_type'] in ['task_status']: workload_id = msg['taskid'] @@ -1185,11 +1189,13 @@ def handle_messages_processing(messages, logger=None, log_prefix=''): # new_processings.append((req_id, tf_id, processing_id, workload_id, status)) if processing_id not in update_processings: update_processings.append(processing_id) + logger.debug(log_prefix + "Add to update processing: %s" % str(processing_id)) elif status in ['finished', 'done']: req_id, tf_id, processing_id = ret_req_tf_pr_id # update_processings.append((processing_id, status)) if processing_id not in update_processings: terminated_processings.append(processing_id) + logger.debug(log_prefix + "Add to terminated processing: %s" % str(processing_id)) if msg['msg_type'] in ['job_status']: workload_id = msg['taskid'] @@ -1213,6 +1219,7 @@ def handle_messages_processing(messages, logger=None, log_prefix=''): update_contents.append(u_content) if processing_id not in update_processings: update_processings.append(processing_id) + logger.debug(log_prefix + "Add to update processing: %s" % str(processing_id)) return update_processings, terminated_processings, update_contents, [] diff --git a/main/lib/idds/orm/base/models.py b/main/lib/idds/orm/base/models.py index 979ed359..139fed9f 100644 --- a/main/lib/idds/orm/base/models.py +++ b/main/lib/idds/orm/base/models.py @@ -561,7 +561,7 @@ class Content(BASE, ModelBase): Index('CONTENTS_STATUS_UPDATED_IDX', 'status', 'locking', 'updated_at', 'created_at'), Index('CONTENTS_ID_NAME_IDX', 'coll_id', 'scope', 'name', 'status'), Index('CONTENTS_DEP_IDX', 'request_id', 'transform_id', 'content_dep_id'), - Index('CONTENTS_REQ_TF_COLL_IDX', 'request_id', 'transform_id', 'coll_id', 'status')) + Index('CONTENTS_REQ_TF_COLL_IDX', 'request_id', 'transform_id', 'workload_id', 'coll_id', 'status', 'substatus')) class Content_update(BASE, ModelBase): diff --git a/main/lib/idds/orm/contents.py b/main/lib/idds/orm/contents.py index 47f53551..9a24290c 100644 --- a/main/lib/idds/orm/contents.py +++ b/main/lib/idds/orm/contents.py @@ -540,7 +540,7 @@ def get_updated_transforms_by_content_status(request_id=None, transform_id=None, models.Content.transform_id, models.Content.workload_id, models.Content.coll_id) - query = query.with_hint(models.Content, "INDEX(CONTENTS CONTENTS_ID_NAME_IDX)", 'oracle') + query = query.with_hint(models.Content, "INDEX(CONTENTS CONTENTS_REQ_TF_COLL_IDX)", 'oracle') if request_id: query = query.filter(models.Content.request_id == request_id) diff --git a/main/lib/idds/tests/core_tests_dep_id.py b/main/lib/idds/tests/core_tests_dep_id.py index 9eefe916..7d44d84f 100644 --- a/main/lib/idds/tests/core_tests_dep_id.py +++ b/main/lib/idds/tests/core_tests_dep_id.py @@ -24,5 +24,8 @@ # ret = core_catalog.update_contents_to_others_by_dep_id(request_id=request_id, transform_id=transform_id) # print(ret) -ret = core_catalog.update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id) +# ret = core_catalog.update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id) +# print(ret) + +ret = core_catalog.get_updated_transforms_by_content_status(request_id=3350) print(ret) diff --git a/monitor/data/conf.js b/monitor/data/conf.js index 44ff3c83..9b9020c8 100644 --- a/monitor/data/conf.js +++ b/monitor/data/conf.js @@ -1,9 +1,9 @@ var appConfig = { - 'iddsAPI_request': "https://lxplus8s19.cern.ch:443/idds/monitor_request/null/null", - 'iddsAPI_transform': "https://lxplus8s19.cern.ch:443/idds/monitor_transform/null/null", - 'iddsAPI_processing': "https://lxplus8s19.cern.ch:443/idds/monitor_processing/null/null", - 'iddsAPI_request_detail': "https://lxplus8s19.cern.ch:443/idds/monitor/null/null/true/false/false", - 'iddsAPI_transform_detail': "https://lxplus8s19.cern.ch:443/idds/monitor/null/null/false/true/false", - 'iddsAPI_processing_detail': "https://lxplus8s19.cern.ch:443/idds/monitor/null/null/false/false/true" + 'iddsAPI_request': "https://lxplus8s16.cern.ch:443/idds/monitor_request/null/null", + 'iddsAPI_transform': "https://lxplus8s16.cern.ch:443/idds/monitor_transform/null/null", + 'iddsAPI_processing': "https://lxplus8s16.cern.ch:443/idds/monitor_processing/null/null", + 'iddsAPI_request_detail': "https://lxplus8s16.cern.ch:443/idds/monitor/null/null/true/false/false", + 'iddsAPI_transform_detail': "https://lxplus8s16.cern.ch:443/idds/monitor/null/null/false/true/false", + 'iddsAPI_processing_detail': "https://lxplus8s16.cern.ch:443/idds/monitor/null/null/false/false/true" } From bf2a1bc3a9d7a039eb23cdaa362891da17fcef03 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Wed, 15 Feb 2023 16:48:53 +0100 Subject: [PATCH 4/6] not process pending tasks when the task is already pending --- .../lib/idds/doma/workflowv2/domapandawork.py | 4 ++- main/lib/idds/agents/carrier/utils.py | 27 ++++++++++++------- main/lib/idds/tests/panda_test.py | 7 +++-- 3 files changed, 25 insertions(+), 13 deletions(-) diff --git a/doma/lib/idds/doma/workflowv2/domapandawork.py b/doma/lib/idds/doma/workflowv2/domapandawork.py index c4d11978..72a11561 100644 --- a/doma/lib/idds/doma/workflowv2/domapandawork.py +++ b/doma/lib/idds/doma/workflowv2/domapandawork.py @@ -658,7 +658,9 @@ def poll_panda_task_status(self, processing): def get_processing_status_from_panda_status(self, task_status): if task_status in ['registered', 'defined', 'assigning']: processing_status = ProcessingStatus.Submitting - elif task_status in ['ready', 'pending', 'scouting', 'scouted', 'prepared', 'topreprocess', 'preprocessing']: + elif task_status in ['ready', 'scouting', 'scouted', 'prepared', 'topreprocess', 'preprocessing']: + processing_status = ProcessingStatus.Submitting + elif task_status in ['pending']: processing_status = ProcessingStatus.Submitted elif task_status in ['running', 'toretry', 'toincexec', 'throttled']: processing_status = ProcessingStatus.Running diff --git a/main/lib/idds/agents/carrier/utils.py b/main/lib/idds/agents/carrier/utils.py index 8d898f8b..5481b3af 100644 --- a/main/lib/idds/agents/carrier/utils.py +++ b/main/lib/idds/agents/carrier/utils.py @@ -1075,7 +1075,7 @@ def get_workload_id_transform_id_map(workload_id): return None request_ids = [] - if not workload_id_transform_id_map or workload_id not in workload_id_transform_id_map: + if not workload_id_transform_id_map or workload_id not in workload_id_transform_id_map or len(workload_id_transform_id_map[workload_id]) < 5: processing_status = [ProcessingStatus.New, ProcessingStatus.Submitting, ProcessingStatus.Submitted, ProcessingStatus.Running, ProcessingStatus.FinishedOnExec, @@ -1091,7 +1091,11 @@ def get_workload_id_transform_id_map(workload_id): processing = proc['processing_metadata']['processing'] work = processing.work if work.use_dependency_to_release_jobs(): - workload_id_transform_id_map[proc['workload_id']] = (proc['request_id'], proc['transform_id'], proc['processing_id']) + workload_id_transform_id_map[proc['workload_id']] = (proc['request_id'], + proc['transform_id'], + proc['processing_id'], + proc['status'].value, + proc['substatus'].value) if proc['request_id'] not in request_ids: request_ids.append(proc['request_id']) @@ -1179,19 +1183,22 @@ def handle_messages_processing(messages, logger=None, log_prefix=''): continue logger.debug(log_prefix + "Received message: %s" % str(ori_msg)) - logger.debug(log_prefix + "(request_id, transform_id, processing_id): %s" % str(ret_req_tf_pr_id)) + logger.debug(log_prefix + "(request_id, transform_id, processing_id, status, substatus): %s" % str(ret_req_tf_pr_id)) if msg['msg_type'] in ['task_status']: workload_id = msg['taskid'] status = msg['status'] if status in ['pending']: # 'prepared' - req_id, tf_id, processing_id = ret_req_tf_pr_id - # new_processings.append((req_id, tf_id, processing_id, workload_id, status)) - if processing_id not in update_processings: - update_processings.append(processing_id) - logger.debug(log_prefix + "Add to update processing: %s" % str(processing_id)) + req_id, tf_id, processing_id, r_status, r_substatus = ret_req_tf_pr_id + if r_status < ProcessingStatus.Submitted.value: + # new_processings.append((req_id, tf_id, processing_id, workload_id, status)) + if processing_id not in update_processings: + update_processings.append(processing_id) + logger.debug(log_prefix + "Add to update processing: %s" % str(processing_id)) + else: + logger.debug(log_prefix + "Processing %s is already in status(%s), not add it to update processing" % (str(processing_id), r_status)) elif status in ['finished', 'done']: - req_id, tf_id, processing_id = ret_req_tf_pr_id + req_id, tf_id, processing_id, r_status, r_substatus = ret_req_tf_pr_id # update_processings.append((processing_id, status)) if processing_id not in update_processings: terminated_processings.append(processing_id) @@ -1203,7 +1210,7 @@ def handle_messages_processing(messages, logger=None, log_prefix=''): status = msg['status'] inputs = msg['inputs'] if inputs and status in ['finished']: - req_id, tf_id, processing_id = ret_req_tf_pr_id + req_id, tf_id, processing_id, r_status, r_substatus = ret_req_tf_pr_id content_id, to_update_jobid = get_content_id_from_job_id(req_id, workload_id, tf_id, job_id, inputs) if content_id: if to_update_jobid: diff --git a/main/lib/idds/tests/panda_test.py b/main/lib/idds/tests/panda_test.py index a2f48a33..2f4508d3 100644 --- a/main/lib/idds/tests/panda_test.py +++ b/main/lib/idds/tests/panda_test.py @@ -30,9 +30,11 @@ """ jediTaskID = 10517 # 10607 -jediTaskID = 59725 +jediTaskID = 146329 ret = Client.getJediTaskDetails({'jediTaskID': jediTaskID}, True, True, verbose=False) print(ret) +ret = Client.getTaskStatus(jediTaskID, verbose=False) +print(ret) # ret = Client.getTaskStatus(jediTaskID, verbose=False) # print(ret) @@ -97,7 +99,8 @@ # task_ids = [i for i in range(142507, 142651)] # task_ids = [i for i in range(140349, 140954)] + [142268, 142651] # task_ids = [1851] + [i for i in range(4336, 4374)] + [i for i in range(133965, 136025)] -task_ids = [832, 2347, 3045, 66860, 67036] + [i for i in range(121273, 140349)] +# task_ids = [832, 2347, 3045, 66860, 67036] + [i for i in range(121273, 140349)] +task_ids = [] for task_id in task_ids: print("Killing %s" % task_id) Client.killTask(task_id) From 744792309e5db103a4cddd724d71f3b998a6e317 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Tue, 21 Feb 2023 14:04:13 +0100 Subject: [PATCH 5/6] discard pending messages if it is already processed --- main/lib/idds/agents/carrier/poller.py | 9 +++++++- main/lib/idds/agents/carrier/receiver.py | 3 ++- main/lib/idds/agents/carrier/utils.py | 29 +++++++++++++++++++++--- main/lib/idds/tests/panda_test.py | 7 +++--- 4 files changed, 40 insertions(+), 8 deletions(-) diff --git a/main/lib/idds/agents/carrier/poller.py b/main/lib/idds/agents/carrier/poller.py index 7661ee47..f6b1e926 100644 --- a/main/lib/idds/agents/carrier/poller.py +++ b/main/lib/idds/agents/carrier/poller.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2022 +# - Wen Guan, , 2019 - 2023 import datetime import random @@ -184,6 +184,13 @@ def update_processing(self, processing, processing_model): processing['update_processing']['parameters']['locking'] = ProcessingLocking.Idle # self.logger.debug("wen: %s" % str(processing)) processing['update_processing']['parameters']['updated_at'] = datetime.datetime.utcnow() + # check update_processing status + if 'status' in processing['update_processing']['parameters']: + new_status = processing['update_processing']['parameters']['status'] + if new_status == ProcessingStatus.Submitting and processing_model['status'].value > ProcessingStatus.Submitting.value: + processing['update_processing']['parameters']['status'] = ProcessingStatus.Submitted + + self.logger.info(log_prefix + "update_processing: %s" % (processing['update_processing']['parameters'])) retry = True retry_num = 0 diff --git a/main/lib/idds/agents/carrier/receiver.py b/main/lib/idds/agents/carrier/receiver.py index 1d62cf4a..ea2e3373 100644 --- a/main/lib/idds/agents/carrier/receiver.py +++ b/main/lib/idds/agents/carrier/receiver.py @@ -36,7 +36,7 @@ class Receiver(BaseAgent): Receiver works to receive workload management messages to update task/job status. """ - def __init__(self, num_threads=1, bulk_message_delay=5, bulk_message_size=2000, + def __init__(self, num_threads=1, bulk_message_delay=30, bulk_message_size=2000, random_delay=None, **kwargs): super(Receiver, self).__init__(num_threads=num_threads, name='Receiver', **kwargs) self.config_section = Sections.Carrier @@ -118,6 +118,7 @@ def run(self): self.event_bus.send(event) time_delay = self.bulk_message_delay - (time.time() - time_start) + time_delay = self.bulk_message_delay if time_delay > 0: time.sleep(time_delay) except IDDSException as error: diff --git a/main/lib/idds/agents/carrier/utils.py b/main/lib/idds/agents/carrier/utils.py index 5481b3af..7188e99e 100644 --- a/main/lib/idds/agents/carrier/utils.py +++ b/main/lib/idds/agents/carrier/utils.py @@ -10,7 +10,7 @@ import json import logging - +import time from idds.common.constants import (ProcessingStatus, CollectionStatus, @@ -1162,6 +1162,29 @@ def get_content_id_from_job_id(request_id, workload_id, transform_id, job_id, in return content_id, to_update_jobid +def whether_to_process_pending_workload_id(workload_id, logger=None, log_prefix=''): + cache = get_redis_cache() + processed_pending_workload_id_map_key = "processed_pending_workload_id_map" + processed_pending_workload_id_map = cache.get(processed_pending_workload_id_map_key, default={}) + processed_pending_workload_id_map_time_key = "processed_pending_workload_id_map_time" + processed_pending_workload_id_map_time = cache.get(processed_pending_workload_id_map_time_key, default=None) + + workload_id = str(workload_id) + if workload_id in processed_pending_workload_id_map: + return False + + processed_pending_workload_id_map[workload_id] = time.time() + if processed_pending_workload_id_map_time is None or processed_pending_workload_id_map_time + 86400 < time.time(): + cache.set(processed_pending_workload_id_map_time_key, int(time.time()), expire_seconds=86400) + + for workload_id in processed_pending_workload_id_map.keys(): + if processed_pending_workload_id_map[workload_id] + 86400 < time.time(): + del processed_pending_workload_id_map[workload_id] + + cache.set(processed_pending_workload_id_map_key, processed_pending_workload_id_map, expire_seconds=86400) + return True + + def handle_messages_processing(messages, logger=None, log_prefix=''): logger = get_logger(logger) if not log_prefix: @@ -1190,13 +1213,13 @@ def handle_messages_processing(messages, logger=None, log_prefix=''): status = msg['status'] if status in ['pending']: # 'prepared' req_id, tf_id, processing_id, r_status, r_substatus = ret_req_tf_pr_id - if r_status < ProcessingStatus.Submitted.value: + if whether_to_process_pending_workload_id(workload_id, logger=logger, log_prefix=log_prefix): # new_processings.append((req_id, tf_id, processing_id, workload_id, status)) if processing_id not in update_processings: update_processings.append(processing_id) logger.debug(log_prefix + "Add to update processing: %s" % str(processing_id)) else: - logger.debug(log_prefix + "Processing %s is already in status(%s), not add it to update processing" % (str(processing_id), r_status)) + logger.debug(log_prefix + "Processing %s is already processed, not add it to update processing" % (str(processing_id))) elif status in ['finished', 'done']: req_id, tf_id, processing_id, r_status, r_substatus = ret_req_tf_pr_id # update_processings.append((processing_id, status)) diff --git a/main/lib/idds/tests/panda_test.py b/main/lib/idds/tests/panda_test.py index 2f4508d3..a82f9196 100644 --- a/main/lib/idds/tests/panda_test.py +++ b/main/lib/idds/tests/panda_test.py @@ -33,8 +33,8 @@ jediTaskID = 146329 ret = Client.getJediTaskDetails({'jediTaskID': jediTaskID}, True, True, verbose=False) print(ret) -ret = Client.getTaskStatus(jediTaskID, verbose=False) -print(ret) +# ret = Client.getTaskStatus(jediTaskID, verbose=False) +# print(ret) # ret = Client.getTaskStatus(jediTaskID, verbose=False) # print(ret) @@ -100,7 +100,8 @@ # task_ids = [i for i in range(140349, 140954)] + [142268, 142651] # task_ids = [1851] + [i for i in range(4336, 4374)] + [i for i in range(133965, 136025)] # task_ids = [832, 2347, 3045, 66860, 67036] + [i for i in range(121273, 140349)] -task_ids = [] +task_ids = [i for i in range(144088, 144111)] + [144891, 144892] +# task_ids = [] for task_id in task_ids: print("Killing %s" % task_id) Client.killTask(task_id) From 340a2a64aec85b234bb43720e2522c84c3551c7d Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Tue, 21 Feb 2023 16:57:49 +0100 Subject: [PATCH 6/6] add MsgTriggerProcessingEvent and optimize number of threads --- main/lib/idds/agents/carrier/finisher.py | 5 ++++ main/lib/idds/agents/carrier/poller.py | 3 +++ main/lib/idds/agents/carrier/receiver.py | 10 +++---- main/lib/idds/agents/carrier/submitter.py | 3 +++ main/lib/idds/agents/carrier/trigger.py | 27 +++++++++++++++++-- main/lib/idds/agents/clerk/clerk.py | 4 ++- main/lib/idds/agents/common/baseagent.py | 7 +++++ main/lib/idds/agents/common/eventbus/event.py | 14 +++++++++- .../idds/agents/transformer/transformer.py | 4 ++- 9 files changed, 67 insertions(+), 10 deletions(-) diff --git a/main/lib/idds/agents/carrier/finisher.py b/main/lib/idds/agents/carrier/finisher.py index 036f7197..c89a89eb 100644 --- a/main/lib/idds/agents/carrier/finisher.py +++ b/main/lib/idds/agents/carrier/finisher.py @@ -32,6 +32,11 @@ class Finisher(Poller): def __init__(self, num_threads=1, poll_time_period=10, retries=3, retrieve_bulk_size=2, message_bulk_size=1000, **kwargs): + self.set_max_workers() + if hasattr(self, 'finisher_max_number_workers'): + self.max_number_workers = int(self.finisher_max_number_workers) + num_threads = self.max_number_workers + super(Finisher, self).__init__(num_threads=num_threads, name='Finisher', poll_time_period=poll_time_period, retries=retries, retrieve_bulk_size=retrieve_bulk_size, diff --git a/main/lib/idds/agents/carrier/poller.py b/main/lib/idds/agents/carrier/poller.py index f6b1e926..d3645a2c 100644 --- a/main/lib/idds/agents/carrier/poller.py +++ b/main/lib/idds/agents/carrier/poller.py @@ -36,6 +36,9 @@ class Poller(BaseAgent): def __init__(self, num_threads=1, poll_period=10, retries=3, retrieve_bulk_size=2, name='Poller', message_bulk_size=1000, **kwargs): + self.set_max_workers() + num_threads = self.max_number_workers + super(Poller, self).__init__(num_threads=num_threads, name=name, **kwargs) self.config_section = Sections.Carrier self.poll_period = int(poll_period) diff --git a/main/lib/idds/agents/carrier/receiver.py b/main/lib/idds/agents/carrier/receiver.py index ea2e3373..5a991f20 100644 --- a/main/lib/idds/agents/carrier/receiver.py +++ b/main/lib/idds/agents/carrier/receiver.py @@ -24,7 +24,7 @@ from idds.core import messages as core_messages, catalog as core_catalog from idds.agents.common.baseagent import BaseAgent # from idds.agents.common.eventbus.event import TerminatedProcessingEvent -from idds.agents.common.eventbus.event import TriggerProcessingEvent +from idds.agents.common.eventbus.event import MsgTriggerProcessingEvent from .utils import handle_messages_processing @@ -108,13 +108,13 @@ def run(self): for pr_id in update_processings: # self.logger.info(log_prefix + "TerminatedProcessingEvent(processing_id: %s)" % pr_id) # event = TerminatedProcessingEvent(publisher_id=self.id, processing_id=pr_id) - self.logger.info(log_prefix + "TriggerProcessingEvent(processing_id: %s)" % pr_id) - event = TriggerProcessingEvent(publisher_id=self.id, processing_id=pr_id) + self.logger.info(log_prefix + "MsgTriggerProcessingEvent(processing_id: %s)" % pr_id) + event = MsgTriggerProcessingEvent(publisher_id=self.id, processing_id=pr_id) self.event_bus.send(event) for pr_id in terminated_processings: - self.logger.info(log_prefix + "TriggerProcessingEvent(processing_id: %s)" % pr_id) - event = TriggerProcessingEvent(publisher_id=self.id, processing_id=pr_id, content={'Terminated': True, 'source': 'Receiver'}) + self.logger.info(log_prefix + "MsgTriggerProcessingEvent(processing_id: %s)" % pr_id) + event = MsgTriggerProcessingEvent(publisher_id=self.id, processing_id=pr_id, content={'Terminated': True, 'source': 'Receiver'}) self.event_bus.send(event) time_delay = self.bulk_message_delay - (time.time() - time_start) diff --git a/main/lib/idds/agents/carrier/submitter.py b/main/lib/idds/agents/carrier/submitter.py index 3d79561c..f6d170a6 100644 --- a/main/lib/idds/agents/carrier/submitter.py +++ b/main/lib/idds/agents/carrier/submitter.py @@ -32,6 +32,9 @@ class Submitter(Poller): def __init__(self, num_threads=1, poll_period=10, retries=3, retrieve_bulk_size=2, name='Submitter', message_bulk_size=1000, **kwargs): + self.set_max_workers() + num_threads = self.max_number_workers + super(Submitter, self).__init__(num_threads=num_threads, name=name, **kwargs) def get_new_processings(self): diff --git a/main/lib/idds/agents/carrier/trigger.py b/main/lib/idds/agents/carrier/trigger.py index 7baa3379..779241a5 100644 --- a/main/lib/idds/agents/carrier/trigger.py +++ b/main/lib/idds/agents/carrier/trigger.py @@ -34,10 +34,21 @@ class Trigger(Poller): def __init__(self, num_threads=1, poll_period=10, retries=3, retrieve_bulk_size=2, name='Trigger', message_bulk_size=1000, **kwargs): + self.set_max_workers() + if hasattr(self, 'trigger_max_number_workers'): + self.max_number_workers = int(self.trigger_max_number_workers) + + num_threads = self.max_number_workers * 2 super(Trigger, self).__init__(num_threads=num_threads, name=name, **kwargs) if hasattr(self, 'trigger_max_number_workers'): self.max_number_workers = int(self.trigger_max_number_workers) + self.number_msg_workers = 0 + + def is_ok_to_run_more_msg_processings(self): + if self.number_msg_workers >= self.max_number_workers: + return False + return True def get_trigger_processings(self): """ @@ -154,8 +165,7 @@ def handle_trigger_processing(self, processing, trigger_new_updates=False): 'update_contents': []} return ret - def process_trigger_processing(self, event): - self.number_workers += 1 + def process_trigger_processing_real(self, event): try: if event: original_event = event @@ -219,13 +229,26 @@ def process_trigger_processing(self, event): except Exception as ex: self.logger.error(ex) self.logger.error(traceback.format_exc()) + + def process_trigger_processing(self, event): + self.number_workers += 1 + self.process_trigger_processing_real(event) self.number_workers -= 1 + def process_msg_trigger_processing(self, event): + self.number_msg_workers += 1 + self.process_trigger_processing_real(event) + self.number_msg_workers -= 1 + def init_event_function_map(self): self.event_func_map = { EventType.TriggerProcessing: { 'pre_check': self.is_ok_to_run_more_processings, 'exec_func': self.process_trigger_processing + }, + EventType.MsgTriggerProcessing: { + 'pre_check': self.is_ok_to_run_more_msg_processings, + 'exec_func': self.process_msg_trigger_processing } } diff --git a/main/lib/idds/agents/clerk/clerk.py b/main/lib/idds/agents/clerk/clerk.py index 8686b00d..9e085ba1 100644 --- a/main/lib/idds/agents/clerk/clerk.py +++ b/main/lib/idds/agents/clerk/clerk.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2022 +# - Wen Guan, , 2019 - 2023 import datetime import random @@ -42,6 +42,8 @@ class Clerk(BaseAgent): """ def __init__(self, num_threads=1, poll_period=10, retrieve_bulk_size=10, pending_time=None, **kwargs): + self.set_max_workers() + num_threads = self.max_number_workers super(Clerk, self).__init__(num_threads=num_threads, name='Clerk', **kwargs) self.poll_period = int(poll_period) self.retrieve_bulk_size = int(retrieve_bulk_size) diff --git a/main/lib/idds/agents/common/baseagent.py b/main/lib/idds/agents/common/baseagent.py index 14e1f65f..9f2f99dc 100644 --- a/main/lib/idds/agents/common/baseagent.py +++ b/main/lib/idds/agents/common/baseagent.py @@ -66,6 +66,13 @@ def __init__(self, num_threads=1, name=None, logger=None, **kwargs): self.cache = get_redis_cache() + def set_max_workers(self): + self.number_workers = 0 + if not hasattr(self, 'max_number_workers') or not self.max_number_workers: + self.max_number_workers = 3 + else: + self.max_number_workers = int(self.max_number_workers) + def get_event_bus(self): self.event_bus diff --git a/main/lib/idds/agents/common/eventbus/event.py b/main/lib/idds/agents/common/eventbus/event.py index f133ff0e..5f2af27c 100644 --- a/main/lib/idds/agents/common/eventbus/event.py +++ b/main/lib/idds/agents/common/eventbus/event.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2022 +# - Wen Guan, , 2022 - 2023 import time import uuid @@ -46,6 +46,7 @@ class EventType(Enum): SyncProcessing = 34 TerminatedProcessing = 35 TriggerProcessing = 36 + MsgTriggerProcessing = 37 UpdateCommand = 40 @@ -278,3 +279,14 @@ def to_json(self): ret = super(TriggerProcessingEvent, self).to_json() ret['processing_id'] = self._processing_id return ret + + +class MsgTriggerProcessingEvent(Event): + def __init__(self, publisher_id, processing_id, content=None, counter=1): + super(MsgTriggerProcessingEvent, self).__init__(publisher_id, event_type=EventType.MsgTriggerProcessing, content=content, counter=counter) + self._processing_id = processing_id + + def to_json(self): + ret = super(MsgTriggerProcessingEvent, self).to_json() + ret['processing_id'] = self._processing_id + return ret diff --git a/main/lib/idds/agents/transformer/transformer.py b/main/lib/idds/agents/transformer/transformer.py index df295cc8..a140c9d8 100644 --- a/main/lib/idds/agents/transformer/transformer.py +++ b/main/lib/idds/agents/transformer/transformer.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2022 +# - Wen Guan, , 2019 - 2023 import copy import datetime @@ -40,6 +40,8 @@ class Transformer(BaseAgent): def __init__(self, num_threads=1, poll_period=1800, retries=3, retrieve_bulk_size=10, message_bulk_size=10000, **kwargs): + self.set_max_workers() + num_threads = self.max_number_workers super(Transformer, self).__init__(num_threads=num_threads, name='Transformer', **kwargs) self.config_section = Sections.Transformer self.poll_period = int(poll_period)